JODA  0.13.1 (59b41972)
JSON On-Demand Analysis
blockingconcurrentqueue.h
Go to the documentation of this file.
1 // Provides an efficient blocking version of moodycamel::ConcurrentQueue.
2 // ©2015-2016 Cameron Desrochers. Distributed under the terms of the simplified
3 // BSD license, available at the top of concurrentqueue.h.
4 // Uses Jeff Preshing's semaphore implementation (under the terms of its
5 // separate zlib license, embedded below).
6 
7 #pragma once
8 
9 #include <cerrno>
10 #include <chrono>
11 #include <ctime>
12 #include <memory>
13 #include <type_traits>
14 #include "concurrentqueue.h"
15 
16 #if defined(_WIN32)
17 // Avoid including windows.h in a header; we only need a handful of
18 // items, so we'll redeclare them here (this is relatively safe since
19 // the API generally has to remain stable between Windows versions).
20 // I know this is an ugly hack but it still beats polluting the global
21 // namespace with thousands of generic names or adding a .cpp for nothing.
22 extern "C" {
23 struct _SECURITY_ATTRIBUTES;
24 __declspec(dllimport) void *__stdcall CreateSemaphoreW(
25  _SECURITY_ATTRIBUTES *lpSemaphoreAttributes, long lInitialCount,
26  long lMaximumCount, const wchar_t *lpName);
27 __declspec(dllimport) int __stdcall CloseHandle(void *hObject);
28 __declspec(dllimport) unsigned long __stdcall WaitForSingleObject(
29  void *hHandle, unsigned long dwMilliseconds);
30 __declspec(dllimport) int __stdcall ReleaseSemaphore(void *hSemaphore,
31  long lReleaseCount,
32  long *lpPreviousCount);
33 }
34 #elif defined(__MACH__)
35 #include <mach/mach.h>
36 #elif defined(__unix__)
37 #include <semaphore.h>
38 #endif
39 
40 namespace moodycamel {
41 namespace details {
42 // Code in the mpmc_sema namespace below is an adaptation of Jeff Preshing's
43 // portable + lightweight semaphore implementations, originally from
44 // https://github.com/preshing/cpp11-on-multicore/blob/master/common/sema.h
45 // LICENSE:
46 // Copyright (c) 2015 Jeff Preshing
47 //
48 // This software is provided 'as-is', without any express or implied
49 // warranty. In no event will the authors be held liable for any damages
50 // arising from the use of this software.
51 //
52 // Permission is granted to anyone to use this software for any purpose,
53 // including commercial applications, and to alter it and redistribute it
54 // freely, subject to the following restrictions:
55 //
56 // 1. The origin of this software must not be misrepresented; you must not
57 // claim that you wrote the original software. If you use this software
58 // in a product, an acknowledgement in the product documentation would be
59 // appreciated but is not required.
60 // 2. Altered source versions must be plainly marked as such, and must not be
61 // misrepresented as being the original software.
62 // 3. This notice may not be removed or altered from any source distribution.
63 namespace mpmc_sema {
64 #if defined(_WIN32)
65 class Semaphore {
66  private:
67  void *m_hSema;
68 
69  Semaphore(const Semaphore &other) MOODYCAMEL_DELETE_FUNCTION;
70  Semaphore &operator=(const Semaphore &other) MOODYCAMEL_DELETE_FUNCTION;
71 
72  public:
73  Semaphore(int initialCount = 0) {
74  assert(initialCount >= 0);
75  const long maxLong = 0x7fffffff;
76  m_hSema = CreateSemaphoreW(nullptr, initialCount, maxLong, nullptr);
77  }
78 
79  ~Semaphore() { CloseHandle(m_hSema); }
80 
81  void wait() {
82  const unsigned long infinite = 0xffffffff;
83  WaitForSingleObject(m_hSema, infinite);
84  }
85 
86  bool try_wait() {
87  const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
88  return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
89  }
90 
91  bool timed_wait(std::uint64_t usecs) {
92  const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
93  return WaitForSingleObject(m_hSema, (unsigned long)(usecs / 1000)) !=
94  RC_WAIT_TIMEOUT;
95  }
96 
97  void signal(int count = 1) { ReleaseSemaphore(m_hSema, count, nullptr); }
98 };
99 #elif defined(__MACH__)
100 //---------------------------------------------------------
101 // Semaphore (Apple iOS and OSX)
102 // Can't use POSIX semaphores due to
103 // http://lists.apple.com/archives/darwin-kernel/2009/Apr/msg00010.html
104 //---------------------------------------------------------
105 class Semaphore {
106  private:
107  semaphore_t m_sema;
108 
109  Semaphore(const Semaphore &other) MOODYCAMEL_DELETE_FUNCTION;
110  Semaphore &operator=(const Semaphore &other) MOODYCAMEL_DELETE_FUNCTION;
111 
112  public:
113  Semaphore(int initialCount = 0) {
114  assert(initialCount >= 0);
115  semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
116  }
117 
118  ~Semaphore() { semaphore_destroy(mach_task_self(), m_sema); }
119 
120  void wait() { semaphore_wait(m_sema); }
121 
122  bool try_wait() { return timed_wait(0); }
123 
124  bool timed_wait(std::uint64_t timeout_usecs) {
125  mach_timespec_t ts;
126  ts.tv_sec = static_cast<unsigned int>(timeout_usecs / 1000000);
127  ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
128 
129  // added in OSX 10.10:
130  // https://developer.apple.com/library/prerelease/mac/documentation/General/Reference/APIDiffsMacOSX10_10SeedDiff/modules/Darwin.html
131  kern_return_t rc = semaphore_timedwait(m_sema, ts);
132 
133  return rc != KERN_OPERATION_TIMED_OUT && rc != KERN_ABORTED;
134  }
135 
136  void signal() { semaphore_signal(m_sema); }
137 
138  void signal(int count) {
139  while (count-- > 0) {
140  semaphore_signal(m_sema);
141  }
142  }
143 };
144 #elif defined(__unix__)
145 
146 //---------------------------------------------------------
147 // Semaphore (POSIX, Linux)
148 //---------------------------------------------------------
149 class Semaphore {
150  private:
151  sem_t m_sema;
152 
153  Semaphore(const Semaphore &other) MOODYCAMEL_DELETE_FUNCTION;
154  Semaphore &operator=(const Semaphore &other) MOODYCAMEL_DELETE_FUNCTION;
155 
156  public:
157  Semaphore(int initialCount = 0) {
158  assert(initialCount >= 0);
159  sem_init(&m_sema, 0, initialCount);
160  }
161 
162  ~Semaphore() { sem_destroy(&m_sema); }
163 
164  void wait() {
165  // http://stackoverflow.com/questions/2013181/gdb-causes-sem-wait-to-fail-with-eintr-error
166  int rc;
167  do {
168  rc = sem_wait(&m_sema);
169  } while (rc == -1 && errno == EINTR);
170  }
171 
172  bool try_wait() {
173  int rc;
174  do {
175  rc = sem_trywait(&m_sema);
176  } while (rc == -1 && errno == EINTR);
177  return !(rc == -1 && errno == EAGAIN);
178  }
179 
180  bool timed_wait(std::uint64_t usecs) {
181  struct timespec ts;
182  const int usecs_in_1_sec = 1000000;
183  const int nsecs_in_1_sec = 1000000000;
184  clock_gettime(CLOCK_REALTIME, &ts);
185  ts.tv_sec += usecs / usecs_in_1_sec;
186  ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
187  // sem_timedwait bombs if you have more than 1e9 in tv_nsec
188  // so we have to clean things up before passing it in
189  if (ts.tv_nsec >= nsecs_in_1_sec) {
190  ts.tv_nsec -= nsecs_in_1_sec;
191  ++ts.tv_sec;
192  }
193 
194  int rc;
195  do {
196  rc = sem_timedwait(&m_sema, &ts);
197  } while (rc == -1 && errno == EINTR);
198  return !(rc == -1 && errno == ETIMEDOUT);
199  }
200 
201  void signal() { sem_post(&m_sema); }
202 
203  void signal(int count) {
204  while (count-- > 0) {
205  sem_post(&m_sema);
206  }
207  }
208 };
209 #else
210 #error Unsupported platform! (No semaphore wrapper available)
211 #endif
212 
213 //---------------------------------------------------------
214 // LightweightSemaphore
215 //---------------------------------------------------------
217  public:
218  typedef std::make_signed<std::size_t>::type ssize_t;
219 
220  private:
221  std::atomic<ssize_t> m_count;
222  Semaphore m_sema;
223 
224  bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1) {
225  ssize_t oldCount;
226  // Is there a better way to set the initial spin count?
227  // If we lower it to 1000, testBenaphore becomes 15x slower on my Core
228  // i7-5930K Windows PC, as threads start hitting the kernel semaphore.
229  int spin = 10000;
230  while (--spin >= 0) {
231  oldCount = m_count.load(std::memory_order_relaxed);
232  if ((oldCount > 0) &&
233  m_count.compare_exchange_strong(oldCount, oldCount - 1,
236  return true;
237  std::atomic_signal_fence(
238  std::memory_order_acquire); // Prevent the compiler from collapsing
239  // the loop.
240  }
241  oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
242  if (oldCount > 0) return true;
243  if (timeout_usecs < 0) {
244  m_sema.wait();
245  return true;
246  }
247  if (m_sema.timed_wait((std::uint64_t)timeout_usecs)) return true;
248  // At this point, we've timed out waiting for the semaphore, but the
249  // count is still decremented indicating we may still be waiting on
250  // it. So we have to re-adjust the count, but only if the semaphore
251  // wasn't signaled enough times for us too since then. If it was, we
252  // need to release the semaphore too.
253  while (true) {
254  oldCount = m_count.load(std::memory_order_acquire);
255  if (oldCount >= 0 && m_sema.try_wait()) return true;
256  if (oldCount < 0 && m_count.compare_exchange_strong(
257  oldCount, oldCount + 1, std::memory_order_relaxed,
259  return false;
260  }
261  }
262 
263  ssize_t waitManyWithPartialSpinning(ssize_t max,
264  std::int64_t timeout_usecs = -1) {
265  assert(max > 0);
266  ssize_t oldCount;
267  int spin = 10000;
268  while (--spin >= 0) {
269  oldCount = m_count.load(std::memory_order_relaxed);
270  if (oldCount > 0) {
271  ssize_t newCount = oldCount > max ? oldCount - max : 0;
272  if (m_count.compare_exchange_strong(oldCount, newCount,
275  return oldCount - newCount;
276  }
277  std::atomic_signal_fence(std::memory_order_acquire);
278  }
279  oldCount = m_count.fetch_sub(1, std::memory_order_acquire);
280  if (oldCount <= 0) {
281  if (timeout_usecs < 0)
282  m_sema.wait();
283  else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs)) {
284  while (true) {
285  oldCount = m_count.load(std::memory_order_acquire);
286  if (oldCount >= 0 && m_sema.try_wait()) break;
287  if (oldCount < 0 &&
288  m_count.compare_exchange_strong(oldCount, oldCount + 1,
291  return 0;
292  }
293  }
294  }
295  if (max > 1) return 1 + tryWaitMany(max - 1);
296  return 1;
297  }
298 
299  public:
300  LightweightSemaphore(ssize_t initialCount = 0) : m_count(initialCount) {
301  assert(initialCount >= 0);
302  }
303 
304  bool tryWait() {
305  ssize_t oldCount = m_count.load(std::memory_order_relaxed);
306  while (oldCount > 0) {
307  if (m_count.compare_exchange_weak(oldCount, oldCount - 1,
310  return true;
311  }
312  return false;
313  }
314 
315  void wait() {
316  if (!tryWait()) waitWithPartialSpinning();
317  }
318 
319  bool wait(std::int64_t timeout_usecs) {
320  return tryWait() || waitWithPartialSpinning(timeout_usecs);
321  }
322 
323  // Acquires between 0 and (greedily) max, inclusive
325  assert(max >= 0);
326  ssize_t oldCount = m_count.load(std::memory_order_relaxed);
327  while (oldCount > 0) {
328  ssize_t newCount = oldCount > max ? oldCount - max : 0;
329  if (m_count.compare_exchange_weak(oldCount, newCount,
332  return oldCount - newCount;
333  }
334  return 0;
335  }
336 
337  // Acquires at least one, and (greedily) at most max
338  ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs) {
339  assert(max >= 0);
340  ssize_t result = tryWaitMany(max);
341  if (result == 0 && max > 0)
342  result = waitManyWithPartialSpinning(max, timeout_usecs);
343  return result;
344  }
345 
347  ssize_t result = waitMany(max, -1);
348  assert(result > 0);
349  return result;
350  }
351 
352  void signal(ssize_t count = 1) {
353  assert(count >= 0);
354  ssize_t oldCount = m_count.fetch_add(count, std::memory_order_release);
355  ssize_t toRelease = -oldCount < count ? -oldCount : count;
356  if (toRelease > 0) {
357  m_sema.signal((int)toRelease);
358  }
359  }
360 
362  ssize_t count = m_count.load(std::memory_order_relaxed);
363  return count > 0 ? count : 0;
364  }
365 };
366 } // end namespace mpmc_sema
367 } // end namespace details
368 
369 // This is a blocking version of the queue. It has an almost identical interface
370 // to the normal non-blocking version, with the addition of various
371 // wait_dequeue() methods and the removal of producer-specific dequeue methods.
372 template <typename T, typename Traits = ConcurrentQueueDefaultTraits>
374  private:
375  typedef ::moodycamel::ConcurrentQueue<T, Traits> ConcurrentQueue;
377 
378  public:
381 
383  typedef typename ConcurrentQueue::size_t size_t;
384  typedef typename std::make_signed<size_t>::type ssize_t;
385 
389  static const size_t EXPLICIT_INITIAL_INDEX_SIZE =
391  static const size_t IMPLICIT_INITIAL_INDEX_SIZE =
398 
399  public:
400  // Creates a queue with at least `capacity` element slots; note that the
401  // actual number of elements that can be inserted without additional memory
402  // allocation depends on the number of producers and the block size (e.g. if
403  // the block size is equal to `capacity`, only a single block will be
404  // allocated up-front, which means only a single producer will be able to
405  // enqueue elements without an extra allocation -- blocks aren't shared
406  // between producers). This method is not thread safe -- it is up to the user
407  // to ensure that the queue is fully constructed before it starts being used
408  // by other threads (this includes making the memory effects of construction
409  // visible, possibly with a memory barrier).
410  explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
411  : inner(capacity),
412  sema(create<LightweightSemaphore>(),
413  &BlockingConcurrentQueue::template destroy<LightweightSemaphore>) {
414  assert(reinterpret_cast<ConcurrentQueue *>((BlockingConcurrentQueue *)1) ==
415  &((BlockingConcurrentQueue *)1)->inner &&
416  "BlockingConcurrentQueue must have ConcurrentQueue as its first "
417  "member");
418  if (!sema) {
419  MOODYCAMEL_THROW(std::bad_alloc());
420  }
421  }
422 
423  BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers,
424  size_t maxImplicitProducers)
425  : inner(minCapacity, maxExplicitProducers, maxImplicitProducers),
426  sema(create<LightweightSemaphore>(),
427  &BlockingConcurrentQueue::template destroy<LightweightSemaphore>) {
428  assert(reinterpret_cast<ConcurrentQueue *>((BlockingConcurrentQueue *)1) ==
429  &((BlockingConcurrentQueue *)1)->inner &&
430  "BlockingConcurrentQueue must have ConcurrentQueue as its first "
431  "member");
432  if (!sema) {
433  MOODYCAMEL_THROW(std::bad_alloc());
434  }
435  }
436 
437  // Disable copying and copy assignment
442 
443  // Moving is supported, but note that it is *not* a thread-safe operation.
444  // Nobody can use the queue while it's being moved, and the memory effects
445  // of that move must be propagated to other threads before they can use it.
446  // Note: When a queue is moved, its tokens are still valid but can only be
447  // used with the destination queue (i.e. semantically they are moved along
448  // with the queue itself).
450  : inner(std::move(other.inner)),
451  sema(std::move(other.sema)) {}
452 
455  return swap_internal(other);
456  }
457 
458  // Swaps this queue's state with the other's. Not thread-safe.
459  // Swapping two queues does not invalidate their tokens, however
460  // the tokens that were created for one queue must be used with
461  // only the swapped queue (i.e. the tokens are tied to the
462  // queue's movable state, not the object itself).
464  swap_internal(other);
465  }
466 
467  private:
468  BlockingConcurrentQueue &swap_internal(BlockingConcurrentQueue &other) {
469  if (this == &other) {
470  return *this;
471  }
472 
473  inner.swap(other.inner);
474  sema.swap(other.sema);
475  return *this;
476  }
477 
478  public:
479  // Enqueues a single item (by copying it).
480  // Allocates memory if required. Only fails if memory allocation fails (or
481  // implicit production is disabled because
482  // Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0, or
483  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
484  // Thread-safe.
485  inline bool enqueue(T const &item) {
486  if ((details::likely)(inner.enqueue(item))) {
487  sema->signal();
488  return true;
489  }
490  return false;
491  }
492 
493  // Enqueues a single item (by moving it, if possible).
494  // Allocates memory if required. Only fails if memory allocation fails (or
495  // implicit production is disabled because
496  // Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0, or
497  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
498  // Thread-safe.
499  inline bool enqueue(T &&item) {
500  if ((details::likely)(inner.enqueue(std::move(item)))) {
501  sema->signal();
502  return true;
503  }
504  return false;
505  }
506 
507  // Enqueues a single item (by copying it) using an explicit producer token.
508  // Allocates memory if required. Only fails if memory allocation fails (or
509  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
510  // Thread-safe.
511  inline bool enqueue(producer_token_t const &token, T const &item) {
512  if ((details::likely)(inner.enqueue(token, item))) {
513  sema->signal();
514  return true;
515  }
516  return false;
517  }
518 
519  // Enqueues a single item (by moving it, if possible) using an explicit
520  // producer token. Allocates memory if required. Only fails if memory
521  // allocation fails (or Traits::MAX_SUBQUEUE_SIZE has been defined and would
522  // be surpassed). Thread-safe.
523  inline bool enqueue(producer_token_t const &token, T &&item) {
524  if ((details::likely)(inner.enqueue(token, std::move(item)))) {
525  sema->signal();
526  return true;
527  }
528  return false;
529  }
530 
531  // Enqueues several items.
532  // Allocates memory if required. Only fails if memory allocation fails (or
533  // implicit production is disabled because
534  // Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0, or
535  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). Note:
536  // Use std::make_move_iterator if the elements should be moved instead of
537  // copied. Thread-safe.
538  template <typename It>
539  inline bool enqueue_bulk(It itemFirst, size_t count) {
540  if ((details::likely)(
541  inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
542  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
543  return true;
544  }
545  return false;
546  }
547 
548  // Enqueues several items using an explicit producer token.
549  // Allocates memory if required. Only fails if memory allocation fails
550  // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
551  // Note: Use std::make_move_iterator if the elements should be moved
552  // instead of copied.
553  // Thread-safe.
554  template <typename It>
555  inline bool enqueue_bulk(producer_token_t const &token, It itemFirst,
556  size_t count) {
557  if ((details::likely)(
558  inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
559  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
560  return true;
561  }
562  return false;
563  }
564 
565  // Enqueues a single item (by copying it).
566  // Does not allocate memory. Fails if not enough room to enqueue (or implicit
567  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
568  // is 0).
569  // Thread-safe.
570  inline bool try_enqueue(T const &item) {
571  if (inner.try_enqueue(item)) {
572  sema->signal();
573  return true;
574  }
575  return false;
576  }
577 
578  // Enqueues a single item (by moving it, if possible).
579  // Does not allocate memory (except for one-time implicit producer).
580  // Fails if not enough room to enqueue (or implicit production is
581  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
582  // Thread-safe.
583  inline bool try_enqueue(T &&item) {
584  if (inner.try_enqueue(std::move(item))) {
585  sema->signal();
586  return true;
587  }
588  return false;
589  }
590 
591  // Enqueues a single item (by copying it) using an explicit producer token.
592  // Does not allocate memory. Fails if not enough room to enqueue.
593  // Thread-safe.
594  inline bool try_enqueue(producer_token_t const &token, T const &item) {
595  if (inner.try_enqueue(token, item)) {
596  sema->signal();
597  return true;
598  }
599  return false;
600  }
601 
602  // Enqueues a single item (by moving it, if possible) using an explicit
603  // producer token. Does not allocate memory. Fails if not enough room to
604  // enqueue. Thread-safe.
605  inline bool try_enqueue(producer_token_t const &token, T &&item) {
606  if (inner.try_enqueue(token, std::move(item))) {
607  sema->signal();
608  return true;
609  }
610  return false;
611  }
612 
613  // Enqueues several items.
614  // Does not allocate memory (except for one-time implicit producer).
615  // Fails if not enough room to enqueue (or implicit production is
616  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
617  // Note: Use std::make_move_iterator if the elements should be moved
618  // instead of copied.
619  // Thread-safe.
620  template <typename It>
621  inline bool try_enqueue_bulk(It itemFirst, size_t count) {
622  if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
623  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
624  return true;
625  }
626  return false;
627  }
628 
629  // Enqueues several items using an explicit producer token.
630  // Does not allocate memory. Fails if not enough room to enqueue.
631  // Note: Use std::make_move_iterator if the elements should be moved
632  // instead of copied.
633  // Thread-safe.
634  template <typename It>
635  inline bool try_enqueue_bulk(producer_token_t const &token, It itemFirst,
636  size_t count) {
637  if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
638  sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
639  return true;
640  }
641  return false;
642  }
643 
644  // Attempts to dequeue from the queue.
645  // Returns false if all producer streams appeared empty at the time they
646  // were checked (so, the queue is likely but not guaranteed to be empty).
647  // Never allocates. Thread-safe.
648  template <typename U>
649  inline bool try_dequeue(U &item) {
650  if (sema->tryWait()) {
651  while (!inner.try_dequeue(item)) {
652  continue;
653  }
654  return true;
655  }
656  return false;
657  }
658 
659  // Attempts to dequeue from the queue using an explicit consumer token.
660  // Returns false if all producer streams appeared empty at the time they
661  // were checked (so, the queue is likely but not guaranteed to be empty).
662  // Never allocates. Thread-safe.
663  template <typename U>
664  inline bool try_dequeue(consumer_token_t &token, U &item) {
665  if (sema->tryWait()) {
666  while (!inner.try_dequeue(token, item)) {
667  continue;
668  }
669  return true;
670  }
671  return false;
672  }
673 
674  // Attempts to dequeue several elements from the queue.
675  // Returns the number of items actually dequeued.
676  // Returns 0 if all producer streams appeared empty at the time they
677  // were checked (so, the queue is likely but not guaranteed to be empty).
678  // Never allocates. Thread-safe.
679  template <typename It>
680  inline size_t try_dequeue_bulk(It itemFirst, size_t max) {
681  size_t count = 0;
682  max =
683  (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
684  while (count != max) {
685  count += inner.template try_dequeue_bulk<It &>(itemFirst, max - count);
686  }
687  return count;
688  }
689 
690  // Attempts to dequeue several elements from the queue using an explicit
691  // consumer token. Returns the number of items actually dequeued. Returns 0 if
692  // all producer streams appeared empty at the time they were checked (so, the
693  // queue is likely but not guaranteed to be empty). Never allocates.
694  // Thread-safe.
695  template <typename It>
696  inline size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst,
697  size_t max) {
698  size_t count = 0;
699  max =
700  (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
701  while (count != max) {
702  count +=
703  inner.template try_dequeue_bulk<It &>(token, itemFirst, max - count);
704  }
705  return count;
706  }
707 
708  // Blocks the current thread until there's something to dequeue, then
709  // dequeues it.
710  // Never allocates. Thread-safe.
711  template <typename U>
712  inline void wait_dequeue(U &item) {
713  sema->wait();
714  while (!inner.try_dequeue(item)) {
715  continue;
716  }
717  }
718 
719  // Blocks the current thread until either there's something to dequeue
720  // or the timeout (specified in microseconds) expires. Returns false
721  // without setting `item` if the timeout expires, otherwise assigns
722  // to `item` and returns true.
723  // Using a negative timeout indicates an indefinite timeout,
724  // and is thus functionally equivalent to calling wait_dequeue.
725  // Never allocates. Thread-safe.
726  template <typename U>
727  inline bool wait_dequeue_timed(U &item, std::int64_t timeout_usecs) {
728  if (!sema->wait(timeout_usecs)) {
729  return false;
730  }
731  while (!inner.try_dequeue(item)) {
732  continue;
733  }
734  return true;
735  }
736 
737  // Blocks the current thread until either there's something to dequeue
738  // or the timeout expires. Returns false without setting `item` if the
739  // timeout expires, otherwise assigns to `item` and returns true.
740  // Never allocates. Thread-safe.
741  template <typename U, typename Rep, typename Period>
742  inline bool wait_dequeue_timed(
743  U &item, std::chrono::duration<Rep, Period> const &timeout) {
744  return wait_dequeue_timed(
745  item,
746  std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
747  }
748 
749  // Blocks the current thread until there's something to dequeue, then
750  // dequeues it using an explicit consumer token.
751  // Never allocates. Thread-safe.
752  template <typename U>
753  inline void wait_dequeue(consumer_token_t &token, U &item) {
754  sema->wait();
755  while (!inner.try_dequeue(token, item)) {
756  continue;
757  }
758  }
759 
760  // Blocks the current thread until either there's something to dequeue
761  // or the timeout (specified in microseconds) expires. Returns false
762  // without setting `item` if the timeout expires, otherwise assigns
763  // to `item` and returns true.
764  // Using a negative timeout indicates an indefinite timeout,
765  // and is thus functionally equivalent to calling wait_dequeue.
766  // Never allocates. Thread-safe.
767  template <typename U>
768  inline bool wait_dequeue_timed(consumer_token_t &token, U &item,
769  std::int64_t timeout_usecs) {
770  if (!sema->wait(timeout_usecs)) {
771  return false;
772  }
773  while (!inner.try_dequeue(token, item)) {
774  continue;
775  }
776  return true;
777  }
778 
779  // Blocks the current thread until either there's something to dequeue
780  // or the timeout expires. Returns false without setting `item` if the
781  // timeout expires, otherwise assigns to `item` and returns true.
782  // Never allocates. Thread-safe.
783  template <typename U, typename Rep, typename Period>
784  inline bool wait_dequeue_timed(
785  consumer_token_t &token, U &item,
786  std::chrono::duration<Rep, Period> const &timeout) {
787  return wait_dequeue_timed(
788  token, item,
789  std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
790  }
791 
792  // Attempts to dequeue several elements from the queue.
793  // Returns the number of items actually dequeued, which will
794  // always be at least one (this method blocks until the queue
795  // is non-empty) and at most max.
796  // Never allocates. Thread-safe.
797  template <typename It>
798  inline size_t wait_dequeue_bulk(It itemFirst, size_t max) {
799  size_t count = 0;
800  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
801  while (count != max) {
802  count += inner.template try_dequeue_bulk<It &>(itemFirst, max - count);
803  }
804  return count;
805  }
806 
807  // Attempts to dequeue several elements from the queue.
808  // Returns the number of items actually dequeued, which can
809  // be 0 if the timeout expires while waiting for elements,
810  // and at most max.
811  // Using a negative timeout indicates an indefinite timeout,
812  // and is thus functionally equivalent to calling wait_dequeue_bulk.
813  // Never allocates. Thread-safe.
814  template <typename It>
815  inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max,
816  std::int64_t timeout_usecs) {
817  size_t count = 0;
818  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max,
819  timeout_usecs);
820  while (count != max) {
821  count += inner.template try_dequeue_bulk<It &>(itemFirst, max - count);
822  }
823  return count;
824  }
825 
826  // Attempts to dequeue several elements from the queue.
827  // Returns the number of items actually dequeued, which can
828  // be 0 if the timeout expires while waiting for elements,
829  // and at most max.
830  // Never allocates. Thread-safe.
831  template <typename It, typename Rep, typename Period>
832  inline size_t wait_dequeue_bulk_timed(
833  It itemFirst, size_t max,
834  std::chrono::duration<Rep, Period> const &timeout) {
835  return wait_dequeue_bulk_timed<It &>(
836  itemFirst, max,
837  std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
838  }
839 
840  // Attempts to dequeue several elements from the queue using an explicit
841  // consumer token. Returns the number of items actually dequeued, which will
842  // always be at least one (this method blocks until the queue
843  // is non-empty) and at most max.
844  // Never allocates. Thread-safe.
845  template <typename It>
846  inline size_t wait_dequeue_bulk(consumer_token_t &token, It itemFirst,
847  size_t max) {
848  size_t count = 0;
849  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
850  while (count != max) {
851  count +=
852  inner.template try_dequeue_bulk<It &>(token, itemFirst, max - count);
853  }
854  return count;
855  }
856 
857  // Attempts to dequeue several elements from the queue using an explicit
858  // consumer token. Returns the number of items actually dequeued, which can be
859  // 0 if the timeout expires while waiting for elements, and at most max. Using
860  // a negative timeout indicates an indefinite timeout, and is thus
861  // functionally equivalent to calling wait_dequeue_bulk. Never allocates.
862  // Thread-safe.
863  template <typename It>
864  inline size_t wait_dequeue_bulk_timed(consumer_token_t &token, It itemFirst,
865  size_t max,
866  std::int64_t timeout_usecs) {
867  size_t count = 0;
868  max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max,
869  timeout_usecs);
870  while (count != max) {
871  count +=
872  inner.template try_dequeue_bulk<It &>(token, itemFirst, max - count);
873  }
874  return count;
875  }
876 
877  // Attempts to dequeue several elements from the queue using an explicit
878  // consumer token. Returns the number of items actually dequeued, which can be
879  // 0 if the timeout expires while waiting for elements, and at most max. Never
880  // allocates. Thread-safe.
881  template <typename It, typename Rep, typename Period>
882  inline size_t wait_dequeue_bulk_timed(
883  consumer_token_t &token, It itemFirst, size_t max,
884  std::chrono::duration<Rep, Period> const &timeout) {
885  return wait_dequeue_bulk_timed<It &>(
886  token, itemFirst, max,
887  std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
888  }
889 
890  // Returns an estimate of the total number of elements currently in the queue.
891  // This estimate is only accurate if the queue has completely stabilized
892  // before it is called (i.e. all enqueue and dequeue operations have completed
893  // and their memory effects are visible on the calling thread, and no further
894  // operations start while this method is being called). Thread-safe.
895  inline size_t size_approx() const { return (size_t)sema->availableApprox(); }
896 
897  // Returns true if the underlying atomic variables used by
898  // the queue are lock-free (they should be on most platforms).
899  // Thread-safe.
900  static bool is_lock_free() { return ConcurrentQueue::is_lock_free(); }
901 
902  private:
903  template <typename U>
904  static inline U *create() {
905  auto p = (Traits::malloc)(sizeof(U));
906  return p != nullptr ? new (p) U : nullptr;
907  }
908 
909  template <typename U, typename A1>
910  static inline U *create(A1 &&a1) {
911  auto p = (Traits::malloc)(sizeof(U));
912  return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
913  }
914 
915  template <typename U>
916  static inline void destroy(U *p) {
917  if (p != nullptr) {
918  p->~U();
919  }
920  (Traits::free)(p);
921  }
922 
923  private:
924  ConcurrentQueue inner;
925  std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore *)> sema;
926 };
927 
928 template <typename T, typename Traits>
931  a.swap(b);
932 }
933 
934 } // end namespace moodycamel
Definition: blockingconcurrentqueue.h:373
size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration< Rep, Period > const &timeout)
Definition: blockingconcurrentqueue.h:832
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: blockingconcurrentqueue.h:555
void wait_dequeue(U &item)
Definition: blockingconcurrentqueue.h:712
bool wait_dequeue_timed(consumer_token_t &token, U &item, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:768
ConcurrentQueue::consumer_token_t consumer_token_t
Definition: blockingconcurrentqueue.h:380
void swap(BlockingConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:463
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: blockingconcurrentqueue.h:395
bool try_enqueue(T &&item)
Definition: blockingconcurrentqueue.h:583
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: blockingconcurrentqueue.h:635
size_t try_dequeue_bulk(It itemFirst, size_t max)
Definition: blockingconcurrentqueue.h:680
BlockingConcurrentQueue(BlockingConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:449
BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
Definition: blockingconcurrentqueue.h:423
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: blockingconcurrentqueue.h:391
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: blockingconcurrentqueue.h:621
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: blockingconcurrentqueue.h:389
bool wait_dequeue_timed(U &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: blockingconcurrentqueue.h:742
size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:815
BlockingConcurrentQueue & operator=(BlockingConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
size_t size_approx() const
Definition: blockingconcurrentqueue.h:895
BlockingConcurrentQueue & operator=(BlockingConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:453
BlockingConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
Definition: blockingconcurrentqueue.h:410
std::make_signed< size_t >::type ssize_t
Definition: blockingconcurrentqueue.h:384
static const size_t BLOCK_SIZE
Definition: blockingconcurrentqueue.h:386
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: blockingconcurrentqueue.h:393
bool enqueue(producer_token_t const &token, T &&item)
Definition: blockingconcurrentqueue.h:523
bool try_dequeue(U &item)
Definition: blockingconcurrentqueue.h:649
static const size_t MAX_SUBQUEUE_SIZE
Definition: blockingconcurrentqueue.h:397
size_t wait_dequeue_bulk(It itemFirst, size_t max)
Definition: blockingconcurrentqueue.h:798
bool enqueue(T const &item)
Definition: blockingconcurrentqueue.h:485
bool enqueue_bulk(It itemFirst, size_t count)
Definition: blockingconcurrentqueue.h:539
bool enqueue(producer_token_t const &token, T const &item)
Definition: blockingconcurrentqueue.h:511
void wait_dequeue(consumer_token_t &token, U &item)
Definition: blockingconcurrentqueue.h:753
bool wait_dequeue_timed(U &item, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:727
static bool is_lock_free()
Definition: blockingconcurrentqueue.h:900
size_t wait_dequeue_bulk_timed(consumer_token_t &token, It itemFirst, size_t max, std::chrono::duration< Rep, Period > const &timeout)
Definition: blockingconcurrentqueue.h:882
bool wait_dequeue_timed(consumer_token_t &token, U &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: blockingconcurrentqueue.h:784
bool enqueue(T &&item)
Definition: blockingconcurrentqueue.h:499
bool try_enqueue(producer_token_t const &token, T &&item)
Definition: blockingconcurrentqueue.h:605
bool try_enqueue(T const &item)
Definition: blockingconcurrentqueue.h:570
bool try_enqueue(producer_token_t const &token, T const &item)
Definition: blockingconcurrentqueue.h:594
ConcurrentQueue::producer_token_t producer_token_t
Definition: blockingconcurrentqueue.h:379
ConcurrentQueue::size_t size_t
Definition: blockingconcurrentqueue.h:383
size_t wait_dequeue_bulk_timed(consumer_token_t &token, It itemFirst, size_t max, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:864
ConcurrentQueue::index_t index_t
Definition: blockingconcurrentqueue.h:382
size_t wait_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
Definition: blockingconcurrentqueue.h:846
bool try_dequeue(consumer_token_t &token, U &item)
Definition: blockingconcurrentqueue.h:664
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: blockingconcurrentqueue.h:387
BlockingConcurrentQueue(BlockingConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
Definition: blockingconcurrentqueue.h:696
Definition: concurrentqueue.h:802
bool try_dequeue(U &item)
Definition: concurrentqueue.h:1197
static const size_t MAX_SUBQUEUE_SIZE
Definition: concurrentqueue.h:828
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: concurrentqueue.h:817
bool enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1118
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:1038
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: concurrentqueue.h:819
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:815
bool try_enqueue(T const &item)
Definition: concurrentqueue.h:1139
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: concurrentqueue.h:811
Traits::size_t size_t
Definition: concurrentqueue.h:808
bool enqueue(T const &item)
Definition: concurrentqueue.h:1078
static bool is_lock_free()
Definition: concurrentqueue.h:1418
Traits::index_t index_t
Definition: concurrentqueue.h:807
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:813
static const size_t BLOCK_SIZE
Definition: concurrentqueue.h:810
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1176
Definition: blockingconcurrentqueue.h:216
ssize_t availableApprox() const
Definition: blockingconcurrentqueue.h:361
ssize_t tryWaitMany(ssize_t max)
Definition: blockingconcurrentqueue.h:324
bool wait(std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:319
void wait()
Definition: blockingconcurrentqueue.h:315
std::make_signed< std::size_t >::type ssize_t
Definition: blockingconcurrentqueue.h:218
ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:338
ssize_t waitMany(ssize_t max)
Definition: blockingconcurrentqueue.h:346
LightweightSemaphore(ssize_t initialCount=0)
Definition: blockingconcurrentqueue.h:300
bool tryWait()
Definition: blockingconcurrentqueue.h:304
void signal(ssize_t count=1)
Definition: blockingconcurrentqueue.h:352
#define MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:252
#define MOODYCAMEL_THROW(expr)
Definition: concurrentqueue.h:203
#define MOODYCAMEL_DELETE_FUNCTION
Definition: concurrentqueue.h:289
static bool() likely(bool x)
Definition: concurrentqueue.h:302
Definition: atomicops.h:69
void swap(BlockingConcurrentQueue< T, Traits > &a, BlockingConcurrentQueue< T, Traits > &b) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:929
@ memory_order_acquire
Definition: atomicops.h:73
@ memory_order_relaxed
Definition: atomicops.h:72
@ memory_order_release
Definition: atomicops.h:74
Definition: concurrentqueue.h:748
Definition: concurrentqueue.h:687