JODA  0.13.1 (59b41972)
JSON On-Demand Analysis
concurrentqueue.h
Go to the documentation of this file.
1 // Provides a C++11 implementation of a multi-producer, multi-consumer lock-free
2 // queue. An overview, including benchmark results, is provided here:
3 // http://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++
4 // The full design is also described in excruciating detail at:
5 // http://moodycamel.com/blog/2014/detailed-design-of-a-lock-free-queue
6 
7 // Simplified BSD license:
8 // Copyright (c) 2013-2016, Cameron Desrochers.
9 // All rights reserved.
10 //
11 // Redistribution and use in source and binary forms, with or without
12 // modification, are permitted provided that the following conditions are met:
13 //
14 // - Redistributions of source code must retain the above copyright notice, this
15 // list of conditions and the following disclaimer.
16 // - Redistributions in binary form must reproduce the above copyright notice,
17 // this list of conditions and the following disclaimer in the documentation
18 // and/or other materials provided with the distribution.
19 //
20 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24 // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 // POSSIBILITY OF SUCH DAMAGE.
31 
32 #pragma once
33 
34 #if defined(__GNUC__)
35 // Disable -Wconversion warnings (spuriously triggered when Traits::size_t and
36 // Traits::index_t are set to < 32 bits, causing integer promotion, causing
37 // warnings upon assigning any computed values)
38 #pragma GCC diagnostic push
39 #pragma GCC diagnostic ignored "-Wconversion"
40 
41 #ifdef MCDBGQ_USE_RELACY
42 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
43 #endif
44 #endif
45 
46 #if defined(__APPLE__)
47 #include "TargetConditionals.h"
48 #endif
49 
50 #ifdef MCDBGQ_USE_RELACY
51 #include "relacy/relacy_std.hpp"
52 #include "relacy_shims.h"
53 // We only use malloc/free anyway, and the delete macro messes up `= delete`
54 // method declarations. We'll override the default trait malloc ourselves
55 // without a macro.
56 #undef new
57 #undef delete
58 #undef malloc
59 #undef free
60 #else
61 #include <atomic> // Requires C++11. Sorry VS2010.
62 #include <cassert>
63 #endif
64 #include <algorithm>
65 #include <array>
66 #include <climits> // for CHAR_BIT
67 #include <cstddef> // for max_align_t
68 #include <cstdint>
69 #include <cstdlib>
70 #include <limits>
71 #include <thread> // partly for __WINPTHREADS_VERSION if on MinGW-w64 w/ POSIX threading
72 #include <type_traits>
73 #include <utility>
74 
75 // Platform-specific definitions of a numeric thread ID type and an invalid
76 // value
77 namespace moodycamel {
78 namespace details {
79 template <typename thread_id_t>
83 
84  static thread_id_hash_t prehash(thread_id_t const &x) { return x; }
85 };
86 } // namespace details
87 } // namespace moodycamel
88 #if defined(MCDBGQ_USE_RELACY)
89 namespace moodycamel {
90 namespace details {
91 typedef std::uint32_t thread_id_t;
92 static const thread_id_t invalid_thread_id = 0xFFFFFFFFU;
93 static const thread_id_t invalid_thread_id2 = 0xFFFFFFFEU;
94 static inline thread_id_t thread_id() { return rl::thread_index(); }
95 } // namespace details
96 } // namespace moodycamel
97 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
98 // No sense pulling in windows.h in a header, we'll manually declare the
99 // function we use and rely on backwards-compatibility for this not to break
100 extern "C" __declspec(dllimport) unsigned long __stdcall GetCurrentThreadId(
101  void);
102 namespace moodycamel {
103 namespace details {
104 static_assert(sizeof(unsigned long) == sizeof(std::uint32_t),
105  "Expected size of unsigned long to be 32 bits on Windows");
106 typedef std::uint32_t thread_id_t;
107 static const thread_id_t invalid_thread_id =
108  0; // See http://blogs.msdn.com/b/oldnewthing/archive/2004/02/23/78395.aspx
109 static const thread_id_t invalid_thread_id2 =
110  0xFFFFFFFFU; // Not technically guaranteed to be invalid, but is never used
111  // in practice. Note that all Win32 thread IDs are presently
112  // multiples of 4.
113 static inline thread_id_t thread_id() {
114  return static_cast<thread_id_t>(::GetCurrentThreadId());
115 }
116 } // namespace details
117 } // namespace moodycamel
118 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || \
119  (defined(__APPLE__) && TARGET_OS_IPHONE)
120 namespace moodycamel {
121 namespace details {
122 static_assert(sizeof(std::thread::id) == 4 || sizeof(std::thread::id) == 8,
123  "std::thread::id is expected to be either 4 or 8 bytes");
124 
125 typedef std::thread::id thread_id_t;
126 static const thread_id_t invalid_thread_id; // Default ctor creates invalid ID
127 
128 // Note we don't define a invalid_thread_id2 since std::thread::id doesn't have
129 // one; it's only used if MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is defined
130 // anyway, which it won't be.
131 static inline thread_id_t thread_id() { return std::this_thread::get_id(); }
132 
133 template <std::size_t>
134 struct thread_id_size {};
135 template <>
136 struct thread_id_size<4> {
137  typedef std::uint32_t numeric_t;
138 };
139 template <>
140 struct thread_id_size<8> {
141  typedef std::uint64_t numeric_t;
142 };
143 
144 template <>
145 struct thread_id_converter<thread_id_t> {
146  typedef thread_id_size<sizeof(thread_id_t)>::numeric_t
148 #ifndef __APPLE__
149  typedef std::size_t thread_id_hash_t;
150 #else
152 #endif
153 
154  static thread_id_hash_t prehash(thread_id_t const &x) {
155 #ifndef __APPLE__
156  return std::hash<std::thread::id>()(x);
157 #else
158  return *reinterpret_cast<thread_id_hash_t const *>(&x);
159 #endif
160  }
161 };
162 } // namespace details
163 } // namespace moodycamel
164 #else
165 // Use a nice trick from this answer: http://stackoverflow.com/a/8438730/21475
166 // In order to get a numeric thread ID in a platform-independent way, we use a
167 // thread-local static variable's address as a thread identifier :-)
168 #if defined(__GNUC__) || defined(__INTEL_COMPILER)
169 #define MOODYCAMEL_THREADLOCAL __thread
170 #elif defined(_MSC_VER)
171 #define MOODYCAMEL_THREADLOCAL __declspec(thread)
172 #else
173 // Assume C++11 compliant compiler
174 #define MOODYCAMEL_THREADLOCAL thread_local
175 #endif
176 namespace moodycamel {
177 namespace details {
178 typedef std::uintptr_t thread_id_t;
179 static const thread_id_t invalid_thread_id = 0; // Address can't be nullptr
181  1; // Member accesses off a null pointer are also generally invalid. Plus
182  // it's not aligned.
183 static inline thread_id_t thread_id() {
184  static MOODYCAMEL_THREADLOCAL int x;
185  return reinterpret_cast<thread_id_t>(&x);
186 }
187 } // namespace details
188 } // namespace moodycamel
189 #endif
190 
191 // Exceptions
192 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
193 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || \
194  (defined(__GNUC__) && defined(__EXCEPTIONS)) || \
195  (!defined(_MSC_VER) && !defined(__GNUC__))
196 #define MOODYCAMEL_EXCEPTIONS_ENABLED
197 #endif
198 #endif
199 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
200 #define MOODYCAMEL_TRY try
201 #define MOODYCAMEL_CATCH(...) catch (__VA_ARGS__)
202 #define MOODYCAMEL_RETHROW throw
203 #define MOODYCAMEL_THROW(expr) throw(expr)
204 #else
205 #define MOODYCAMEL_TRY if (true)
206 #define MOODYCAMEL_CATCH(...) else if (false)
207 #define MOODYCAMEL_RETHROW
208 #define MOODYCAMEL_THROW(expr)
209 #endif
210 
211 #ifndef MOODYCAMEL_NOEXCEPT
212 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
213 #define MOODYCAMEL_NOEXCEPT
214 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
215 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
216 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
217 // VS2012's std::is_nothrow_[move_]constructible is broken and returns true when
218 // it shouldn't :-( We have to assume *all* non-trivial constructors may throw
219 // on VS2012!
220 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
221 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) \
222  (std::is_rvalue_reference<valueType>::value && \
223  std::is_move_constructible<type>::value \
224  ? std::is_trivially_move_constructible<type>::value \
225  : std::is_trivially_copy_constructible<type>::value)
226 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) \
227  ((std::is_rvalue_reference<valueType>::value && \
228  std::is_move_assignable<type>::value \
229  ? std::is_trivially_move_assignable<type>::value || \
230  std::is_nothrow_move_assignable<type>::value \
231  : std::is_trivially_copy_assignable<type>::value || \
232  std::is_nothrow_copy_assignable<type>::value) && \
233  MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
234 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
235 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
236 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) \
237  (std::is_rvalue_reference<valueType>::value && \
238  std::is_move_constructible<type>::value \
239  ? std::is_trivially_move_constructible<type>::value || \
240  std::is_nothrow_move_constructible<type>::value \
241  : std::is_trivially_copy_constructible<type>::value || \
242  std::is_nothrow_copy_constructible<type>::value)
243 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) \
244  ((std::is_rvalue_reference<valueType>::value && \
245  std::is_move_assignable<type>::value \
246  ? std::is_trivially_move_assignable<type>::value || \
247  std::is_nothrow_move_assignable<type>::value \
248  : std::is_trivially_copy_assignable<type>::value || \
249  std::is_nothrow_copy_assignable<type>::value) && \
250  MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
251 #else
252 #define MOODYCAMEL_NOEXCEPT noexcept
253 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
254 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
255 #endif
256 #endif
257 
258 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
259 #ifdef MCDBGQ_USE_RELACY
260 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
261 #else
262 // VS2013 doesn't support `thread_local`, and MinGW-w64 w/ POSIX threading has a
263 // crippling bug: http://sourceforge.net/p/mingw-w64/bugs/445 g++ <=4.7 doesn't
264 // support thread_local either. Finally, iOS/ARM doesn't have support for it
265 // either, and g++/ARM allows it to compile but it's unconfirmed to actually
266 // work
267 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && \
268  (!defined(__MINGW32__) && !defined(__MINGW64__) || \
269  !defined(__WINPTHREADS_VERSION)) && \
270  (!defined(__GNUC__) || __GNUC__ > 4 || \
271  (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && \
272  (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && \
273  !defined(_M_ARM) && !defined(__aarch64__)
274 // Assume `thread_local` is fully supported in all other C++11
275 // compilers/platforms
276 //#define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED // always disabled for now
277 // since several users report having problems with it on
278 #endif
279 #endif
280 #endif
281 
282 // VS2012 doesn't support deleted functions.
283 // In this case, we declare the function normally but don't define it. A link
284 // error will be generated if the function is called.
285 #ifndef MOODYCAMEL_DELETE_FUNCTION
286 #if defined(_MSC_VER) && _MSC_VER < 1800
287 #define MOODYCAMEL_DELETE_FUNCTION
288 #else
289 #define MOODYCAMEL_DELETE_FUNCTION = delete
290 #endif
291 #endif
292 
293 // Compiler-specific likely/unlikely hints
294 namespace moodycamel {
295 namespace details {
296 #if defined(__GNUC__)
297 
298 static inline bool(likely)(bool x) { return __builtin_expect((x), true); }
299 
300 static inline bool(unlikely)(bool x) { return __builtin_expect((x), false); }
301 #else
302 static inline bool(likely)(bool x) { return x; }
303 static inline bool(unlikely)(bool x) { return x; }
304 #endif
305 } // namespace details
306 } // namespace moodycamel
307 
308 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
309 #include "internal/concurrentqueue_internal_debug.h"
310 #endif
311 
312 namespace moodycamel {
313 namespace details {
314 template <typename T>
316  static_assert(std::is_integral<T>::value,
317  "const_numeric_max can only be used with integers");
318  static const T value =
319  std::numeric_limits<T>::is_signed
320  ? (static_cast<T>(1) << (sizeof(T) * CHAR_BIT - 1)) -
321  static_cast<T>(1)
322  : static_cast<T>(-1);
323 };
324 
325 #if defined(__GLIBCXX__)
326 typedef ::max_align_t
327  std_max_align_t; // libstdc++ forgot to add it to std:: for a while
328 #else
329 typedef std::max_align_t std_max_align_t; // Others (e.g. MSVC) insist it can
330  // *only* be accessed via std::
331 #endif
332 
333 // Some platforms have incorrectly set max_align_t to a type with <8 bytes
334 // alignment even while supporting 8-byte aligned scalar values (*cough* 32-bit
335 // iOS). Work around this with our own union. See issue #64.
336 typedef union {
338  long long y;
339  void *z;
340 } max_align_t;
341 } // namespace details
342 
343 // Default traits for the ConcurrentQueue. To change some of the
344 // traits without re-implementing all of them, inherit from this
345 // struct and shadow the declarations you wish to be different;
346 // since the traits are used as a template type parameter, the
347 // shadowed declarations will be used where defined, and the defaults
348 // otherwise.
350  // General-purpose size type. std::size_t is strongly recommended.
351  typedef std::size_t size_t;
352 
353  // The type used for the enqueue and dequeue indices. Must be at least as
354  // large as size_t. Should be significantly larger than the number of elements
355  // you expect to hold at once, especially if you have a high turnover rate;
356  // for example, on 32-bit x86, if you expect to have over a hundred million
357  // elements or pump several million elements through your queue in a very
358  // short space of time, using a 32-bit type *may* trigger a race condition.
359  // A 64-bit int type is recommended in that case, and in practice will
360  // prevent a race condition no matter the usage of the queue. Note that
361  // whether the queue is lock-free with a 64-int type depends on the whether
362  // std::atomic<std::uint64_t> is lock-free, which is platform-specific.
363  typedef std::size_t index_t;
364 
365  // Internally, all elements are enqueued and dequeued from multi-element
366  // blocks; this is the smallest controllable unit. If you expect few elements
367  // but many producers, a smaller block size should be favoured. For few
368  // producers and/or many elements, a larger block size is preferred. A sane
369  // default is provided. Must be a power of 2.
370  static const size_t BLOCK_SIZE = 32;
371 
372  // For explicit producers (i.e. when using a producer token), the block is
373  // checked for being empty by iterating through a list of flags, one per
374  // element. For large block sizes, this is too inefficient, and switching to
375  // an atomic counter-based approach is faster. The switch is made for block
376  // sizes strictly larger than this threshold.
377  static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD = 32;
378 
379  // How many full blocks can be expected for a single explicit producer? This
380  // should reflect that number's maximum for optimal performance. Must be a
381  // power of 2.
382  static const size_t EXPLICIT_INITIAL_INDEX_SIZE = 32;
383 
384  // How many full blocks can be expected for a single implicit producer? This
385  // should reflect that number's maximum for optimal performance. Must be a
386  // power of 2.
387  static const size_t IMPLICIT_INITIAL_INDEX_SIZE = 32;
388 
389  // The initial size of the hash table mapping thread IDs to implicit
390  // producers. Note that the hash is resized every time it becomes half full.
391  // Must be a power of two, and either 0 or at least 1. If 0, implicit
392  // production (using the enqueue methods without an explicit producer token)
393  // is disabled.
394  static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE = 32;
395 
396  // Controls the number of items that an explicit consumer (i.e. one with a
397  // token) must consume before it causes all consumers to rotate and move on to
398  // the next internal queue.
400  256;
401 
402  // The maximum number of elements (inclusive) that can be enqueued to a
403  // sub-queue. Enqueue operations that would cause this limit to be surpassed
404  // will fail. Note that this limit is enforced at the block level (for
405  // performance reasons), i.e. it's rounded up to the nearest block size.
406  static const size_t MAX_SUBQUEUE_SIZE =
408 
409 #ifndef MCDBGQ_USE_RELACY
410  // Memory allocation can be customized if needed.
411  // malloc should return nullptr on failure, and handle alignment like
412  // std::malloc.
413 #if defined(malloc) || defined(free)
414  // Gah, this is 2015, stop defining macros that break standard code already!
415  // Work around malloc/free being special macros:
416  static inline void *WORKAROUND_malloc(size_t size) { return malloc(size); }
417  static inline void WORKAROUND_free(void *ptr) { return free(ptr); }
418  static inline void *(malloc)(size_t size) { return WORKAROUND_malloc(size); }
419  static inline void(free)(void *ptr) { return WORKAROUND_free(ptr); }
420 #else
421 
422  static inline void *malloc(size_t size) { return std::malloc(size); }
423 
424  static inline void free(void *ptr) { return std::free(ptr); }
425 #endif
426 #else
427  // Debug versions when running under the Relacy race detector (ignore
428  // these in user code)
429  static inline void *malloc(size_t size) { return rl::rl_malloc(size, $); }
430  static inline void free(void *ptr) { return rl::rl_free(ptr, $); }
431 #endif
432 };
433 
434 // When producing or consuming many elements, the most efficient way is to:
435 // 1) Use one of the bulk-operation methods of the queue with a token
436 // 2) Failing that, use the bulk-operation methods without a token
437 // 3) Failing that, create a token and use that with the single-item methods
438 // 4) Failing that, use the single-parameter methods of the queue
439 // Having said that, don't create tokens willy-nilly -- ideally there should be
440 // a maximum of one token per thread (of each kind).
441 struct ProducerToken;
442 struct ConsumerToken;
443 
444 template <typename T, typename Traits>
445 class ConcurrentQueue;
446 template <typename T, typename Traits>
447 class BlockingConcurrentQueue;
448 class ConcurrentQueueTests;
449 
450 namespace details {
453  std::atomic<bool> inactive;
455 
457  : next(nullptr), inactive(false), token(nullptr) {}
458 };
459 
460 template <bool use32>
462  static inline std::uint32_t hash(std::uint32_t h) {
463  // MurmurHash3 finalizer -- see
464  // https://code.google.com/p/smhasher/source/browse/trunk/MurmurHash3.cpp
465  // Since the thread ID is already unique, all we really want to do is
466  // propagate that uniqueness evenly across all the bits, so that we can use
467  // a subset of the bits while reducing collisions significantly
468  h ^= h >> 16;
469  h *= 0x85ebca6b;
470  h ^= h >> 13;
471  h *= 0xc2b2ae35;
472  return h ^ (h >> 16);
473  }
474 };
475 
476 template <>
477 struct _hash_32_or_64<1> {
478  static inline std::uint64_t hash(std::uint64_t h) {
479  h ^= h >> 33;
480  h *= 0xff51afd7ed558ccd;
481  h ^= h >> 33;
482  h *= 0xc4ceb9fe1a85ec53;
483  return h ^ (h >> 33);
484  }
485 };
486 
487 template <std::size_t size>
488 struct hash_32_or_64 : public _hash_32_or_64<(size > 4)> {};
489 
490 static inline size_t hash_thread_id(thread_id_t id) {
491  static_assert(
492  sizeof(thread_id_t) <= 8,
493  "Expected a platform where thread IDs are at most 64-bit values");
494  return static_cast<size_t>(
495  hash_32_or_64<sizeof(
498 }
499 
500 template <typename T>
501 static inline bool circular_less_than(T a, T b) {
502 #ifdef _MSC_VER
503 #pragma warning(push)
504 #pragma warning(disable : 4554)
505 #endif
506  static_assert(
507  std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
508  "circular_less_than is intended to be used only with unsigned integer "
509  "types");
510  return static_cast<T>(a - b) >
511  static_cast<T>(static_cast<T>(1)
512  << static_cast<T>(sizeof(T) * CHAR_BIT - 1));
513 #ifdef _MSC_VER
514 #pragma warning(pop)
515 #endif
516 }
517 
518 template <typename U>
519 static inline char *align_for(char *ptr) {
520  const std::size_t alignment = std::alignment_of<U>::value;
521  return ptr +
522  (alignment - (reinterpret_cast<std::uintptr_t>(ptr) % alignment)) %
523  alignment;
524 }
525 
526 template <typename T>
527 static inline T ceil_to_pow_2(T x) {
528  static_assert(
529  std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
530  "ceil_to_pow_2 is intended to be used only with unsigned integer types");
531 
532  // Adapted from
533  // http://graphics.stanford.edu/~seander/bithacks.html#RoundUpPowerOf2
534  --x;
535  x |= x >> 1;
536  x |= x >> 2;
537  x |= x >> 4;
538  for (std::size_t i = 1; i < sizeof(T); i <<= 1) {
539  x |= x >> (i << 3);
540  }
541  ++x;
542  return x;
543 }
544 
545 template <typename T>
546 static inline void swap_relaxed(std::atomic<T> &left, std::atomic<T> &right) {
547  T temp = std::move(left.load(std::memory_order_relaxed));
548  left.store(std::move(right.load(std::memory_order_relaxed)),
550  right.store(std::move(temp), std::memory_order_relaxed);
551 }
552 
553 template <typename T>
554 static inline T const &nomove(T const &x) {
555  return x;
556 }
557 
558 template <bool Enable>
559 struct nomove_if {
560  template <typename T>
561  static inline T const &eval(T const &x) {
562  return x;
563  }
564 };
565 
566 template <>
567 struct nomove_if<false> {
568  template <typename U>
569  static inline auto eval(U &&x) -> decltype(std::forward<U>(x)) {
570  return std::forward<U>(x);
571  }
572 };
573 
574 template <typename It>
575 static inline auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it) {
576  return *it;
577 }
578 
579 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || \
580  (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
581 template <typename T>
582 struct is_trivially_destructible : std::is_trivially_destructible<T> {};
583 #else
584 template <typename T>
585 struct is_trivially_destructible : std::has_trivial_destructor<T> {};
586 #endif
587 
588 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
589 #ifdef MCDBGQ_USE_RELACY
590 typedef RelacyThreadExitListener ThreadExitListener;
591 typedef RelacyThreadExitNotifier ThreadExitNotifier;
592 #else
593 struct ThreadExitListener {
594  typedef void (*callback_t)(void *);
595  callback_t callback;
596  void *userData;
597 
598  ThreadExitListener *next; // reserved for use by the ThreadExitNotifier
599 };
600 
601 class ThreadExitNotifier {
602  public:
603  static void subscribe(ThreadExitListener *listener) {
604  auto &tlsInst = instance();
605  listener->next = tlsInst.tail;
606  tlsInst.tail = listener;
607  }
608 
609  static void unsubscribe(ThreadExitListener *listener) {
610  auto &tlsInst = instance();
611  ThreadExitListener **prev = &tlsInst.tail;
612  for (auto ptr = tlsInst.tail; ptr != nullptr; ptr = ptr->next) {
613  if (ptr == listener) {
614  *prev = ptr->next;
615  break;
616  }
617  prev = &ptr->next;
618  }
619  }
620 
621  private:
622  ThreadExitNotifier() : tail(nullptr) {}
623  ThreadExitNotifier(ThreadExitNotifier const &) MOODYCAMEL_DELETE_FUNCTION;
624  ThreadExitNotifier &operator=(ThreadExitNotifier const &)
626 
627  ~ThreadExitNotifier() {
628  // This thread is about to exit, let everyone know!
629  assert(this == &instance() &&
630  "If this assert fails, you likely have a buggy compiler! Change the "
631  "preprocessor conditions such that "
632  "MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
633  for (auto ptr = tail; ptr != nullptr; ptr = ptr->next) {
634  ptr->callback(ptr->userData);
635  }
636  }
637 
638  // Thread-local
639  static inline ThreadExitNotifier &instance() {
640  static thread_local ThreadExitNotifier notifier;
641  return notifier;
642  }
643 
644  private:
645  ThreadExitListener *tail;
646 };
647 #endif
648 #endif
649 
650 template <typename T>
652  enum { value = 0 };
653 };
654 template <>
655 struct static_is_lock_free_num<signed char> {
656  enum { value = ATOMIC_CHAR_LOCK_FREE };
657 };
658 template <>
659 struct static_is_lock_free_num<short> {
660  enum { value = ATOMIC_SHORT_LOCK_FREE };
661 };
662 template <>
664  enum { value = ATOMIC_INT_LOCK_FREE };
665 };
666 template <>
668  enum { value = ATOMIC_LONG_LOCK_FREE };
669 };
670 template <>
671 struct static_is_lock_free_num<long long> {
672  enum { value = ATOMIC_LLONG_LOCK_FREE };
673 };
674 template <typename T>
676  : static_is_lock_free_num<typename std::make_signed<T>::type> {};
677 template <>
678 struct static_is_lock_free<bool> {
679  enum { value = ATOMIC_BOOL_LOCK_FREE };
680 };
681 template <typename U>
682 struct static_is_lock_free<U *> {
683  enum { value = ATOMIC_POINTER_LOCK_FREE };
684 };
685 } // namespace details
686 
688  template <typename T, typename Traits>
689  explicit ProducerToken(ConcurrentQueue<T, Traits> &queue);
690 
691  template <typename T, typename Traits>
693 
695  : producer(other.producer) {
696  other.producer = nullptr;
697  if (producer != nullptr) {
698  producer->token = this;
699  }
700  }
701 
703  swap(other);
704  return *this;
705  }
706 
708  std::swap(producer, other.producer);
709  if (producer != nullptr) {
710  producer->token = this;
711  }
712  if (other.producer != nullptr) {
713  other.producer->token = &other;
714  }
715  }
716 
717  // A token is always valid unless:
718  // 1) Memory allocation failed during construction
719  // 2) It was moved via the move constructor
720  // (Note: assignment does a swap, leaving both potentially valid)
721  // 3) The associated queue was destroyed
722  // Note that if valid() returns true, that only indicates
723  // that the token is valid for use with a specific queue,
724  // but not which one; that's up to the user to track.
725  inline bool valid() const { return producer != nullptr; }
726 
728  if (producer != nullptr) {
729  producer->token = nullptr;
731  }
732  }
733 
734  // Disable copying and assignment
737 
738  private:
739  template <typename T, typename Traits>
740  friend class ConcurrentQueue;
741 
742  friend class ConcurrentQueueTests;
743 
744  protected:
746 };
747 
749  template <typename T, typename Traits>
751 
752  template <typename T, typename Traits>
754 
756  : initialOffset(other.initialOffset),
757  lastKnownGlobalOffset(other.lastKnownGlobalOffset),
758  itemsConsumedFromCurrent(other.itemsConsumedFromCurrent),
759  currentProducer(other.currentProducer),
760  desiredProducer(other.desiredProducer) {}
761 
763  swap(other);
764  return *this;
765  }
766 
768  std::swap(initialOffset, other.initialOffset);
769  std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
770  std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
771  std::swap(currentProducer, other.currentProducer);
772  std::swap(desiredProducer, other.desiredProducer);
773  }
774 
775  // Disable copying and assignment
778 
779  private:
780  template <typename T, typename Traits>
781  friend class ConcurrentQueue;
782 
783  friend class ConcurrentQueueTests;
784 
785  private: // but shared with ConcurrentQueue
786  std::uint32_t initialOffset;
787  std::uint32_t lastKnownGlobalOffset;
788  std::uint32_t itemsConsumedFromCurrent;
791 };
792 
793 // Need to forward-declare this swap because it's in a namespace.
794 // See
795 // http://stackoverflow.com/questions/4492062/why-does-a-c-friend-class-need-a-forward-declaration-only-in-other-namespaces
796 template <typename T, typename Traits>
800 
801 template <typename T, typename Traits = ConcurrentQueueDefaultTraits>
803  public:
804  typedef ::moodycamel::ProducerToken producer_token_t;
805  typedef ::moodycamel::ConsumerToken consumer_token_t;
806 
807  typedef typename Traits::index_t index_t;
808  typedef typename Traits::size_t size_t;
809 
810  static const size_t BLOCK_SIZE = static_cast<size_t>(Traits::BLOCK_SIZE);
812  static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
813  static const size_t EXPLICIT_INITIAL_INDEX_SIZE =
814  static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
815  static const size_t IMPLICIT_INITIAL_INDEX_SIZE =
816  static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
818  static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
820  static_cast<std::uint32_t>(
821  Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
822 #ifdef _MSC_VER
823 #pragma warning(push)
824 #pragma warning(disable : 4307) // + integral constant overflow (that's what
825  // the ternary expression is for!)
826 #pragma warning(disable : 4309) // static_cast: Truncation of constant value
827 #endif
828  static const size_t MAX_SUBQUEUE_SIZE =
830  static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) <
831  BLOCK_SIZE)
833  : ((static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) +
834  (BLOCK_SIZE - 1)) /
836 #ifdef _MSC_VER
837 #pragma warning(pop)
838 #endif
839 
840  static_assert(!std::numeric_limits<size_t>::is_signed &&
841  std::is_integral<size_t>::value,
842  "Traits::size_t must be an unsigned integral type");
843  static_assert(!std::numeric_limits<index_t>::is_signed &&
844  std::is_integral<index_t>::value,
845  "Traits::index_t must be an unsigned integral type");
846  static_assert(sizeof(index_t) >= sizeof(size_t),
847  "Traits::index_t must be at least as wide as Traits::size_t");
848  static_assert((BLOCK_SIZE > 1) && !(BLOCK_SIZE & (BLOCK_SIZE - 1)),
849  "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
850  static_assert((EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD > 1) &&
853  "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a "
854  "power of 2 (and greater than 1)");
855  static_assert((EXPLICIT_INITIAL_INDEX_SIZE > 1) &&
858  "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and "
859  "greater than 1)");
860  static_assert((IMPLICIT_INITIAL_INDEX_SIZE > 1) &&
863  "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and "
864  "greater than 1)");
865  static_assert(
869  "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
870  static_assert(INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0 ||
872  "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least "
873  "1 (or 0 to disable implicit enqueueing)");
874 
875  public:
876  // Creates a queue with at least `capacity` element slots; note that the
877  // actual number of elements that can be inserted without additional memory
878  // allocation depends on the number of producers and the block size (e.g. if
879  // the block size is equal to `capacity`, only a single block will be
880  // allocated up-front, which means only a single producer will be able to
881  // enqueue elements without an extra allocation -- blocks aren't shared
882  // between producers). This method is not thread safe -- it is up to the user
883  // to ensure that the queue is fully constructed before it starts being used
884  // by other threads (this includes making the memory effects of construction
885  // visible, possibly with a memory barrier).
886  explicit ConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
887  : producerListTail(nullptr),
888  producerCount(0),
889  initialBlockPoolIndex(0),
890  nextExplicitConsumerId(0),
891  globalExplicitConsumerOffset(0) {
892  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
893  populate_initial_implicit_producer_hash();
894  populate_initial_block_list(capacity / BLOCK_SIZE +
895  ((capacity & (BLOCK_SIZE - 1)) == 0 ? 0 : 1));
896 
897 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
898  // Track all the producers using a fully-resolved typed list for
899  // each kind; this makes it possible to debug them starting from
900  // the root queue object (otherwise wacky casts are needed that
901  // don't compile in the debugger's expression evaluator).
902  explicitProducers.store(nullptr, std::memory_order_relaxed);
903  implicitProducers.store(nullptr, std::memory_order_relaxed);
904 #endif
905  }
906 
907  // Computes the correct amount of pre-allocated blocks for you based
908  // on the minimum number of elements you want available at any given
909  // time, and the maximum concurrent number of each type of producer.
910  ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers,
911  size_t maxImplicitProducers)
912  : producerListTail(nullptr),
913  producerCount(0),
914  initialBlockPoolIndex(0),
915  nextExplicitConsumerId(0),
916  globalExplicitConsumerOffset(0) {
917  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
918  populate_initial_implicit_producer_hash();
919  size_t blocks = (((minCapacity + BLOCK_SIZE - 1) / BLOCK_SIZE) - 1) *
920  (maxExplicitProducers + 1) +
921  2 * (maxExplicitProducers + maxImplicitProducers);
922  populate_initial_block_list(blocks);
923 
924 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
925  explicitProducers.store(nullptr, std::memory_order_relaxed);
926  implicitProducers.store(nullptr, std::memory_order_relaxed);
927 #endif
928  }
929 
930  // Note: The queue should not be accessed concurrently while it's
931  // being deleted. It's up to the user to synchronize this.
932  // This method is not thread safe.
934  // Destroy producers
935  auto ptr = producerListTail.load(std::memory_order_relaxed);
936  while (ptr != nullptr) {
937  auto next = ptr->next_prod();
938  if (ptr->token != nullptr) {
939  ptr->token->producer = nullptr;
940  }
941  destroy(ptr);
942  ptr = next;
943  }
944 
945  // Destroy implicit producer hash tables
947  auto hash = implicitProducerHash.load(std::memory_order_relaxed);
948  while (hash != nullptr) {
949  auto prev = hash->prev;
950  if (prev != nullptr) { // The last hash is part of this object and was
951  // not allocated dynamically
952  for (size_t i = 0; i != hash->capacity; ++i) {
953  hash->entries[i].~ImplicitProducerKVP();
954  }
955  hash->~ImplicitProducerHash();
956  (Traits::free)(hash);
957  }
958  hash = prev;
959  }
960  }
961 
962  // Destroy global free list
963  auto block = freeList.head_unsafe();
964  while (block != nullptr) {
965  auto next = block->freeListNext.load(std::memory_order_relaxed);
966  if (block->dynamicallyAllocated) {
967  destroy(block);
968  }
969  block = next;
970  }
971 
972  // Destroy initial free list
973  destroy_array(initialBlockPool, initialBlockPoolSize);
974  }
975 
976  // Disable copying and copy assignment
980 
981  // Moving is supported, but note that it is *not* a thread-safe operation.
982  // Nobody can use the queue while it's being moved, and the memory effects
983  // of that move must be propagated to other threads before they can use it.
984  // Note: When a queue is moved, its tokens are still valid but can only be
985  // used with the destination queue (i.e. semantically they are moved along
986  // with the queue itself).
988  : producerListTail(
989  other.producerListTail.load(std::memory_order_relaxed)),
990  producerCount(other.producerCount.load(std::memory_order_relaxed)),
991  initialBlockPoolIndex(
992  other.initialBlockPoolIndex.load(std::memory_order_relaxed)),
993  initialBlockPool(other.initialBlockPool),
994  initialBlockPoolSize(other.initialBlockPoolSize),
995  freeList(std::move(other.freeList)),
996  nextExplicitConsumerId(
997  other.nextExplicitConsumerId.load(std::memory_order_relaxed)),
998  globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(
1000  // Move the other one into this, and leave the other one as an empty queue
1001  implicitProducerHashResizeInProgress.clear(std::memory_order_relaxed);
1002  populate_initial_implicit_producer_hash();
1003  swap_implicit_producer_hashes(other);
1004 
1005  other.producerListTail.store(nullptr, std::memory_order_relaxed);
1006  other.producerCount.store(0, std::memory_order_relaxed);
1007  other.nextExplicitConsumerId.store(0, std::memory_order_relaxed);
1008  other.globalExplicitConsumerOffset.store(0, std::memory_order_relaxed);
1009 
1010 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1011  explicitProducers.store(
1012  other.explicitProducers.load(std::memory_order_relaxed),
1014  other.explicitProducers.store(nullptr, std::memory_order_relaxed);
1015  implicitProducers.store(
1016  other.implicitProducers.load(std::memory_order_relaxed),
1018  other.implicitProducers.store(nullptr, std::memory_order_relaxed);
1019 #endif
1020 
1021  other.initialBlockPoolIndex.store(0, std::memory_order_relaxed);
1022  other.initialBlockPoolSize = 0;
1023  other.initialBlockPool = nullptr;
1024 
1025  reown_producers();
1026  }
1027 
1030  return swap_internal(other);
1031  }
1032 
1033  // Swaps this queue's state with the other's. Not thread-safe.
1034  // Swapping two queues does not invalidate their tokens, however
1035  // the tokens that were created for one queue must be used with
1036  // only the swapped queue (i.e. the tokens are tied to the
1037  // queue's movable state, not the object itself).
1039  swap_internal(other);
1040  }
1041 
1042  private:
1043  ConcurrentQueue &swap_internal(ConcurrentQueue &other) {
1044  if (this == &other) {
1045  return *this;
1046  }
1047 
1048  details::swap_relaxed(producerListTail, other.producerListTail);
1049  details::swap_relaxed(producerCount, other.producerCount);
1050  details::swap_relaxed(initialBlockPoolIndex, other.initialBlockPoolIndex);
1051  std::swap(initialBlockPool, other.initialBlockPool);
1052  std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
1053  freeList.swap(other.freeList);
1054  details::swap_relaxed(nextExplicitConsumerId, other.nextExplicitConsumerId);
1055  details::swap_relaxed(globalExplicitConsumerOffset,
1056  other.globalExplicitConsumerOffset);
1057 
1058  swap_implicit_producer_hashes(other);
1059 
1060  reown_producers();
1061  other.reown_producers();
1062 
1063 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1064  details::swap_relaxed(explicitProducers, other.explicitProducers);
1065  details::swap_relaxed(implicitProducers, other.implicitProducers);
1066 #endif
1067 
1068  return *this;
1069  }
1070 
1071  public:
1072  // Enqueues a single item (by copying it).
1073  // Allocates memory if required. Only fails if memory allocation fails (or
1074  // implicit production is disabled because
1075  // Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0, or
1076  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1077  // Thread-safe.
1078  inline bool enqueue(T const &item) {
1079  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1080  return inner_enqueue<CanAlloc>(item);
1081  }
1082 
1083  // Enqueues a single item (by moving it, if possible).
1084  // Allocates memory if required. Only fails if memory allocation fails (or
1085  // implicit production is disabled because
1086  // Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0, or
1087  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1088  // Thread-safe.
1089  inline bool enqueue(T &&item) {
1090  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1091  return inner_enqueue<CanAlloc>(std::move(item));
1092  }
1093 
1094  // Enqueues a single item (by copying it) using an explicit producer token.
1095  // Allocates memory if required. Only fails if memory allocation fails (or
1096  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1097  // Thread-safe.
1098  inline bool enqueue(producer_token_t const &token, T const &item) {
1099  return inner_enqueue<CanAlloc>(token, item);
1100  }
1101 
1102  // Enqueues a single item (by moving it, if possible) using an explicit
1103  // producer token. Allocates memory if required. Only fails if memory
1104  // allocation fails (or Traits::MAX_SUBQUEUE_SIZE has been defined and would
1105  // be surpassed). Thread-safe.
1106  inline bool enqueue(producer_token_t const &token, T &&item) {
1107  return inner_enqueue<CanAlloc>(token, std::move(item));
1108  }
1109 
1110  // Enqueues several items.
1111  // Allocates memory if required. Only fails if memory allocation fails (or
1112  // implicit production is disabled because
1113  // Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0, or
1114  // Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed). Note:
1115  // Use std::make_move_iterator if the elements should be moved instead of
1116  // copied. Thread-safe.
1117  template <typename It>
1118  bool enqueue_bulk(It itemFirst, size_t count) {
1119  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1120  return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1121  }
1122 
1123  // Enqueues several items using an explicit producer token.
1124  // Allocates memory if required. Only fails if memory allocation fails
1125  // (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
1126  // Note: Use std::make_move_iterator if the elements should be moved
1127  // instead of copied.
1128  // Thread-safe.
1129  template <typename It>
1130  bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count) {
1131  return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1132  }
1133 
1134  // Enqueues a single item (by copying it).
1135  // Does not allocate memory. Fails if not enough room to enqueue (or implicit
1136  // production is disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
1137  // is 0).
1138  // Thread-safe.
1139  inline bool try_enqueue(T const &item) {
1140  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1141  return inner_enqueue<CannotAlloc>(item);
1142  }
1143 
1144  // Enqueues a single item (by moving it, if possible).
1145  // Does not allocate memory (except for one-time implicit producer).
1146  // Fails if not enough room to enqueue (or implicit production is
1147  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1148  // Thread-safe.
1149  inline bool try_enqueue(T &&item) {
1150  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1151  return inner_enqueue<CannotAlloc>(std::move(item));
1152  }
1153 
1154  // Enqueues a single item (by copying it) using an explicit producer token.
1155  // Does not allocate memory. Fails if not enough room to enqueue.
1156  // Thread-safe.
1157  inline bool try_enqueue(producer_token_t const &token, T const &item) {
1158  return inner_enqueue<CannotAlloc>(token, item);
1159  }
1160 
1161  // Enqueues a single item (by moving it, if possible) using an explicit
1162  // producer token. Does not allocate memory. Fails if not enough room to
1163  // enqueue. Thread-safe.
1164  inline bool try_enqueue(producer_token_t const &token, T &&item) {
1165  return inner_enqueue<CannotAlloc>(token, std::move(item));
1166  }
1167 
1168  // Enqueues several items.
1169  // Does not allocate memory (except for one-time implicit producer).
1170  // Fails if not enough room to enqueue (or implicit production is
1171  // disabled because Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE is 0).
1172  // Note: Use std::make_move_iterator if the elements should be moved
1173  // instead of copied.
1174  // Thread-safe.
1175  template <typename It>
1176  bool try_enqueue_bulk(It itemFirst, size_t count) {
1177  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return false;
1178  return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1179  }
1180 
1181  // Enqueues several items using an explicit producer token.
1182  // Does not allocate memory. Fails if not enough room to enqueue.
1183  // Note: Use std::make_move_iterator if the elements should be moved
1184  // instead of copied.
1185  // Thread-safe.
1186  template <typename It>
1187  bool try_enqueue_bulk(producer_token_t const &token, It itemFirst,
1188  size_t count) {
1189  return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1190  }
1191 
1192  // Attempts to dequeue from the queue.
1193  // Returns false if all producer streams appeared empty at the time they
1194  // were checked (so, the queue is likely but not guaranteed to be empty).
1195  // Never allocates. Thread-safe.
1196  template <typename U>
1197  bool try_dequeue(U &item) {
1198  // Instead of simply trying each producer in turn (which could cause
1199  // needless contention on the first producer), we score them heuristically.
1200  size_t nonEmptyCount = 0;
1201  ProducerBase *best = nullptr;
1202  size_t bestSize = 0;
1203  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1204  nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1205  auto size = ptr->size_approx();
1206  if (size > 0) {
1207  if (size > bestSize) {
1208  bestSize = size;
1209  best = ptr;
1210  }
1211  ++nonEmptyCount;
1212  }
1213  }
1214 
1215  // If there was at least one non-empty queue but it appears empty at the
1216  // time we try to dequeue from it, we need to make sure every queue's been
1217  // tried
1218  if (nonEmptyCount > 0) {
1219  if ((details::likely)(best->dequeue(item))) {
1220  return true;
1221  }
1222  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1223  ptr != nullptr; ptr = ptr->next_prod()) {
1224  if (ptr != best && ptr->dequeue(item)) {
1225  return true;
1226  }
1227  }
1228  }
1229  return false;
1230  }
1231 
1232  // Attempts to dequeue from the queue.
1233  // Returns false if all producer streams appeared empty at the time they
1234  // were checked (so, the queue is likely but not guaranteed to be empty).
1235  // This differs from the try_dequeue(item) method in that this one does
1236  // not attempt to reduce contention by interleaving the order that producer
1237  // streams are dequeued from. So, using this method can reduce overall
1238  // throughput under contention, but will give more predictable results in
1239  // single-threaded consumer scenarios. This is mostly only useful for internal
1240  // unit tests. Never allocates. Thread-safe.
1241  template <typename U>
1243  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1244  ptr != nullptr; ptr = ptr->next_prod()) {
1245  if (ptr->dequeue(item)) {
1246  return true;
1247  }
1248  }
1249  return false;
1250  }
1251 
1252  // Attempts to dequeue from the queue using an explicit consumer token.
1253  // Returns false if all producer streams appeared empty at the time they
1254  // were checked (so, the queue is likely but not guaranteed to be empty).
1255  // Never allocates. Thread-safe.
1256  template <typename U>
1257  bool try_dequeue(consumer_token_t &token, U &item) {
1258  // The idea is roughly as follows:
1259  // Every 256 items from one producer, make everyone rotate (increase the
1260  // global offset) -> this means the highest efficiency consumer dictates the
1261  // rotation speed of everyone else, more or less If you see that the global
1262  // offset has changed, you must reset your consumption counter and move to
1263  // your designated place If there's no items where you're supposed to be,
1264  // keep moving until you find a producer with some items If the global
1265  // offset has not changed but you've run out of items to consume, move over
1266  // from your current position until you find an producer with something in
1267  // it
1268 
1269  if (token.desiredProducer == nullptr ||
1270  token.lastKnownGlobalOffset !=
1271  globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1272  if (!update_current_producer_after_rotation(token)) {
1273  return false;
1274  }
1275  }
1276 
1277  // If there was at least one non-empty queue but it appears empty at the
1278  // time we try to dequeue from it, we need to make sure every queue's been
1279  // tried
1280  if (static_cast<ProducerBase *>(token.currentProducer)->dequeue(item)) {
1281  if (++token.itemsConsumedFromCurrent ==
1283  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1284  }
1285  return true;
1286  }
1287 
1288  auto tail = producerListTail.load(std::memory_order_acquire);
1289  auto ptr = static_cast<ProducerBase *>(token.currentProducer)->next_prod();
1290  if (ptr == nullptr) {
1291  ptr = tail;
1292  }
1293  while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1294  if (ptr->dequeue(item)) {
1295  token.currentProducer = ptr;
1296  token.itemsConsumedFromCurrent = 1;
1297  return true;
1298  }
1299  ptr = ptr->next_prod();
1300  if (ptr == nullptr) {
1301  ptr = tail;
1302  }
1303  }
1304  return false;
1305  }
1306 
1307  // Attempts to dequeue several elements from the queue.
1308  // Returns the number of items actually dequeued.
1309  // Returns 0 if all producer streams appeared empty at the time they
1310  // were checked (so, the queue is likely but not guaranteed to be empty).
1311  // Never allocates. Thread-safe.
1312  template <typename It>
1313  size_t try_dequeue_bulk(It itemFirst, size_t max) {
1314  size_t count = 0;
1315  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1316  ptr != nullptr; ptr = ptr->next_prod()) {
1317  count += ptr->dequeue_bulk(itemFirst, max - count);
1318  if (count == max) {
1319  break;
1320  }
1321  }
1322  return count;
1323  }
1324 
1325  // Attempts to dequeue several elements from the queue using an explicit
1326  // consumer token. Returns the number of items actually dequeued. Returns 0 if
1327  // all producer streams appeared empty at the time they were checked (so, the
1328  // queue is likely but not guaranteed to be empty). Never allocates.
1329  // Thread-safe.
1330  template <typename It>
1331  size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max) {
1332  if (token.desiredProducer == nullptr ||
1333  token.lastKnownGlobalOffset !=
1334  globalExplicitConsumerOffset.load(std::memory_order_relaxed)) {
1335  if (!update_current_producer_after_rotation(token)) {
1336  return 0;
1337  }
1338  }
1339 
1340  size_t count = static_cast<ProducerBase *>(token.currentProducer)
1341  ->dequeue_bulk(itemFirst, max);
1342  if (count == max) {
1343  if ((token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(max)) >=
1345  globalExplicitConsumerOffset.fetch_add(1, std::memory_order_relaxed);
1346  }
1347  return max;
1348  }
1349  token.itemsConsumedFromCurrent += static_cast<std::uint32_t>(count);
1350  max -= count;
1351 
1352  auto tail = producerListTail.load(std::memory_order_acquire);
1353  auto ptr = static_cast<ProducerBase *>(token.currentProducer)->next_prod();
1354  if (ptr == nullptr) {
1355  ptr = tail;
1356  }
1357  while (ptr != static_cast<ProducerBase *>(token.currentProducer)) {
1358  auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1359  count += dequeued;
1360  if (dequeued != 0) {
1361  token.currentProducer = ptr;
1362  token.itemsConsumedFromCurrent = static_cast<std::uint32_t>(dequeued);
1363  }
1364  if (dequeued == max) {
1365  break;
1366  }
1367  max -= dequeued;
1368  ptr = ptr->next_prod();
1369  if (ptr == nullptr) {
1370  ptr = tail;
1371  }
1372  }
1373  return count;
1374  }
1375 
1376  // Attempts to dequeue from a specific producer's inner queue.
1377  // If you happen to know which producer you want to dequeue from, this
1378  // is significantly faster than using the general-case try_dequeue methods.
1379  // Returns false if the producer's queue appeared empty at the time it
1380  // was checked (so, the queue is likely but not guaranteed to be empty).
1381  // Never allocates. Thread-safe.
1382  template <typename U>
1383  inline bool try_dequeue_from_producer(producer_token_t const &producer,
1384  U &item) {
1385  return static_cast<ExplicitProducer *>(producer.producer)->dequeue(item);
1386  }
1387 
1388  // Attempts to dequeue several elements from a specific producer's inner
1389  // queue. Returns the number of items actually dequeued. If you happen to know
1390  // which producer you want to dequeue from, this is significantly faster than
1391  // using the general-case try_dequeue methods. Returns 0 if the producer's
1392  // queue appeared empty at the time it was checked (so, the queue is likely
1393  // but not guaranteed to be empty). Never allocates. Thread-safe.
1394  template <typename It>
1395  inline size_t try_dequeue_bulk_from_producer(producer_token_t const &producer,
1396  It itemFirst, size_t max) {
1397  return static_cast<ExplicitProducer *>(producer.producer)
1398  ->dequeue_bulk(itemFirst, max);
1399  }
1400 
1401  // Returns an estimate of the total number of elements currently in the queue.
1402  // This estimate is only accurate if the queue has completely stabilized
1403  // before it is called (i.e. all enqueue and dequeue operations have completed
1404  // and their memory effects are visible on the calling thread, and no further
1405  // operations start while this method is being called). Thread-safe.
1406  size_t size_approx() const {
1407  size_t size = 0;
1408  for (auto ptr = producerListTail.load(std::memory_order_acquire);
1409  ptr != nullptr; ptr = ptr->next_prod()) {
1410  size += ptr->size_approx();
1411  }
1412  return size;
1413  }
1414 
1415  // Returns true if the underlying atomic variables used by
1416  // the queue are lock-free (they should be on most platforms).
1417  // Thread-safe.
1418  static bool is_lock_free() {
1425  details::thread_id_t>::thread_id_numeric_size_t>::value == 2;
1426  }
1427 
1428  private:
1429  friend struct ProducerToken;
1430  friend struct ConsumerToken;
1431  struct ExplicitProducer;
1432  friend struct ExplicitProducer;
1433  struct ImplicitProducer;
1434  friend struct ImplicitProducer;
1435 
1436  friend class ConcurrentQueueTests;
1437 
1438  enum AllocationMode { CanAlloc, CannotAlloc };
1439 
1441  // Queue methods
1443 
1444  template <AllocationMode canAlloc, typename U>
1445  inline bool inner_enqueue(producer_token_t const &token, U &&element) {
1446  return static_cast<ExplicitProducer *>(token.producer)
1447  ->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(
1448  std::forward<U>(element));
1449  }
1450 
1451  template <AllocationMode canAlloc, typename U>
1452  inline bool inner_enqueue(U &&element) {
1453  auto producer = get_or_add_implicit_producer();
1454  return producer == nullptr
1455  ? false
1456  : producer->ConcurrentQueue::ImplicitProducer::template enqueue<
1457  canAlloc>(std::forward<U>(element));
1458  }
1459 
1460  template <AllocationMode canAlloc, typename It>
1461  inline bool inner_enqueue_bulk(producer_token_t const &token, It itemFirst,
1462  size_t count) {
1463  return static_cast<ExplicitProducer *>(token.producer)
1464  ->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(
1465  itemFirst, count);
1466  }
1467 
1468  template <AllocationMode canAlloc, typename It>
1469  inline bool inner_enqueue_bulk(It itemFirst, size_t count) {
1470  auto producer = get_or_add_implicit_producer();
1471  return producer == nullptr
1472  ? false
1473  : producer->ConcurrentQueue::ImplicitProducer:: template enqueue_bulk<canAlloc>(itemFirst, count);
1474  }
1475 
1476  inline bool update_current_producer_after_rotation(consumer_token_t &token) {
1477  // Ah, there's been a rotation, figure out where we should be!
1478  auto tail = producerListTail.load(std::memory_order_acquire);
1479  if (token.desiredProducer == nullptr && tail == nullptr) {
1480  return false;
1481  }
1482  auto prodCount = producerCount.load(std::memory_order_relaxed);
1483  auto globalOffset =
1484  globalExplicitConsumerOffset.load(std::memory_order_relaxed);
1485  if ((details::unlikely)(token.desiredProducer == nullptr)) {
1486  // Aha, first time we're dequeueing anything.
1487  // Figure out our local position
1488  // Note: offset is from start, not end, but we're traversing from end --
1489  // subtract from count first
1490  std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1491  token.desiredProducer = tail;
1492  for (std::uint32_t i = 0; i != offset; ++i) {
1493  token.desiredProducer =
1494  static_cast<ProducerBase *>(token.desiredProducer)->next_prod();
1495  if (token.desiredProducer == nullptr) {
1496  token.desiredProducer = tail;
1497  }
1498  }
1499  }
1500 
1501  std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1502  if (delta >= prodCount) {
1503  delta = delta % prodCount;
1504  }
1505  for (std::uint32_t i = 0; i != delta; ++i) {
1506  token.desiredProducer =
1507  static_cast<ProducerBase *>(token.desiredProducer)->next_prod();
1508  if (token.desiredProducer == nullptr) {
1509  token.desiredProducer = tail;
1510  }
1511  }
1512 
1513  token.lastKnownGlobalOffset = globalOffset;
1514  token.currentProducer = token.desiredProducer;
1515  token.itemsConsumedFromCurrent = 0;
1516  return true;
1517  }
1518 
1520  // Free list
1522 
1523  template <typename N>
1524  struct FreeListNode {
1525  FreeListNode() : freeListRefs(0), freeListNext(nullptr) {}
1526 
1527  std::atomic<std::uint32_t> freeListRefs;
1528  std::atomic<N *> freeListNext;
1529  };
1530 
1531  // A simple CAS-based lock-free free list. Not the fastest thing in the world
1532  // under heavy contention, but simple and correct (assuming nodes are never
1533  // freed until after the free list is destroyed), and fairly speedy under low
1534  // contention.
1535  template <typename N> // N must inherit FreeListNode or have the same fields
1536  // (and initialization of them)
1537  struct FreeList {
1538  FreeList() : freeListHead(nullptr) {}
1539 
1540  FreeList(FreeList &&other)
1541  : freeListHead(other.freeListHead.load(std::memory_order_relaxed)) {
1542  other.freeListHead.store(nullptr, std::memory_order_relaxed);
1543  }
1544 
1545  void swap(FreeList &other) {
1546  details::swap_relaxed(freeListHead, other.freeListHead);
1547  }
1548 
1549  FreeList(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
1550  FreeList &operator=(FreeList const &) MOODYCAMEL_DELETE_FUNCTION;
1551 
1552  inline void add(N *node) {
1553 #if MCDBGQ_NOLOCKFREE_FREELIST
1554  debug::DebugLock lock(mutex);
1555 #endif
1556  // We know that the should-be-on-freelist bit is 0 at this point, so it's
1557  // safe to set it using a fetch_add
1558  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST,
1559  std::memory_order_acq_rel) == 0) {
1560  // Oh look! We were the last ones referencing this node, and we know
1561  // we want to add it to the free list, so let's do it!
1562  add_knowing_refcount_is_zero(node);
1563  }
1564  }
1565 
1566  inline N *try_get() {
1567 #if MCDBGQ_NOLOCKFREE_FREELIST
1568  debug::DebugLock lock(mutex);
1569 #endif
1570  auto head = freeListHead.load(std::memory_order_acquire);
1571  while (head != nullptr) {
1572  auto prevHead = head;
1573  auto refs = head->freeListRefs.load(std::memory_order_relaxed);
1574  if ((refs & REFS_MASK) == 0 ||
1575  !head->freeListRefs.compare_exchange_strong(
1576  refs, refs + 1, std::memory_order_acquire,
1578  head = freeListHead.load(std::memory_order_acquire);
1579  continue;
1580  }
1581 
1582  // Good, reference count has been incremented (it wasn't at zero), which
1583  // means we can read the next and not worry about it changing between
1584  // now and the time we do the CAS
1585  auto next = head->freeListNext.load(std::memory_order_relaxed);
1586  if (freeListHead.compare_exchange_strong(head, next,
1589  // Yay, got the node. This means it was on the list, which means
1590  // shouldBeOnFreeList must be false no matter the refcount (because
1591  // nobody else knows it's been taken off yet, it can't have been put
1592  // back on).
1593  assert((head->freeListRefs.load(std::memory_order_relaxed) &
1594  SHOULD_BE_ON_FREELIST) == 0);
1595 
1596  // Decrease refcount twice, once for our ref, and once for the list's
1597  // ref
1598  head->freeListRefs.fetch_sub(2, std::memory_order_release);
1599  return head;
1600  }
1601 
1602  // OK, the head must have changed on us, but we still need to decrease
1603  // the refcount we increased. Note that we don't need to release any
1604  // memory effects, but we do need to ensure that the reference count
1605  // decrement happens-after the CAS on the head.
1606  refs = prevHead->freeListRefs.fetch_sub(1, std::memory_order_acq_rel);
1607  if (refs == SHOULD_BE_ON_FREELIST + 1) {
1608  add_knowing_refcount_is_zero(prevHead);
1609  }
1610  }
1611 
1612  return nullptr;
1613  }
1614 
1615  // Useful for traversing the list when there's no contention (e.g. to
1616  // destroy remaining nodes)
1617  N *head_unsafe() const {
1618  return freeListHead.load(std::memory_order_relaxed);
1619  }
1620 
1621  private:
1622  inline void add_knowing_refcount_is_zero(N *node) {
1623  // Since the refcount is zero, and nobody can increase it once it's zero
1624  // (except us, and we run only one copy of this method per node at a time,
1625  // i.e. the single thread case), then we know we can safely change the
1626  // next pointer of the node; however, once the refcount is back above
1627  // zero, then other threads could increase it (happens under heavy
1628  // contention, when the refcount goes to zero in between a load and a
1629  // refcount increment of a node in try_get, then back up to something
1630  // non-zero, then the refcount increment is done by the other thread) --
1631  // so, if the CAS to add the node to the actual list fails, decrease the
1632  // refcount and leave the add operation to the next thread who puts the
1633  // refcount back at zero (which could be us, hence the loop).
1634  auto head = freeListHead.load(std::memory_order_relaxed);
1635  while (true) {
1636  node->freeListNext.store(head, std::memory_order_relaxed);
1637  node->freeListRefs.store(1, std::memory_order_release);
1638  if (!freeListHead.compare_exchange_strong(head, node,
1641  // Hmm, the add failed, but we can only try again when the refcount
1642  // goes back to zero
1643  if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1,
1644  std::memory_order_release) == 1) {
1645  continue;
1646  }
1647  }
1648  return;
1649  }
1650  }
1651 
1652  private:
1653  // Implemented like a stack, but where node order doesn't matter (nodes are
1654  // inserted out of order under contention)
1655  std::atomic<N *> freeListHead;
1656 
1657  static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1658  static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1659 
1660 #if MCDBGQ_NOLOCKFREE_FREELIST
1661  debug::DebugMutex mutex;
1662 #endif
1663  };
1664 
1666  // Block
1668 
1669  enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1670 
1671  struct Block {
1672  Block()
1673  : next(nullptr),
1674  elementsCompletelyDequeued(0),
1675  freeListRefs(0),
1676  freeListNext(nullptr),
1677  shouldBeOnFreeList(false),
1678  dynamicallyAllocated(true) {
1679 #if MCDBGQ_TRACKMEM
1680  owner = nullptr;
1681 #endif
1682  }
1683 
1684  template <InnerQueueContext context>
1685  inline bool is_empty() const {
1686  if (context == explicit_context &&
1688  // Check flags
1689  for (size_t i = 0; i < BLOCK_SIZE; ++i) {
1690  if (!emptyFlags[i].load(std::memory_order_relaxed)) {
1691  return false;
1692  }
1693  }
1694 
1695  // Aha, empty; make sure we have all other memory effects that happened
1696  // before the empty flags were set
1697  std::atomic_thread_fence(std::memory_order_acquire);
1698  return true;
1699  } else {
1700  // Check counter
1701  if (elementsCompletelyDequeued.load(std::memory_order_relaxed) ==
1702  BLOCK_SIZE) {
1703  std::atomic_thread_fence(std::memory_order_acquire);
1704  return true;
1705  }
1706  assert(elementsCompletelyDequeued.load(std::memory_order_relaxed) <=
1707  BLOCK_SIZE);
1708  return false;
1709  }
1710  }
1711 
1712  // Returns true if the block is now empty (does not apply in explicit
1713  // context)
1714  template <InnerQueueContext context>
1715  inline bool set_empty(index_t i) {
1716  if (context == explicit_context &&
1718  // Set flag
1719  assert(!emptyFlags[BLOCK_SIZE - 1 -
1720  static_cast<size_t>(
1721  i & static_cast<index_t>(BLOCK_SIZE - 1))]
1722  .load(std::memory_order_relaxed));
1723  emptyFlags[BLOCK_SIZE - 1 -
1724  static_cast<size_t>(i &
1725  static_cast<index_t>(BLOCK_SIZE - 1))]
1726  .store(true, std::memory_order_release);
1727  return false;
1728  } else {
1729  // Increment counter
1730  auto prevVal =
1731  elementsCompletelyDequeued.fetch_add(1, std::memory_order_release);
1732  assert(prevVal < BLOCK_SIZE);
1733  return prevVal == BLOCK_SIZE - 1;
1734  }
1735  }
1736 
1737  // Sets multiple contiguous item statuses to 'empty' (assumes no wrapping
1738  // and count > 0). Returns true if the block is now empty (does not apply in
1739  // explicit context).
1740  template <InnerQueueContext context>
1741  inline bool set_many_empty(index_t i, size_t count) {
1742  if (context == explicit_context &&
1744  // Set flags
1745  std::atomic_thread_fence(std::memory_order_release);
1746  i = BLOCK_SIZE - 1 -
1747  static_cast<size_t>(i & static_cast<index_t>(BLOCK_SIZE - 1)) -
1748  count + 1;
1749  for (size_t j = 0; j != count; ++j) {
1750  assert(!emptyFlags[i + j].load(std::memory_order_relaxed));
1751  emptyFlags[i + j].store(true, std::memory_order_relaxed);
1752  }
1753  return false;
1754  } else {
1755  // Increment counter
1756  auto prevVal = elementsCompletelyDequeued.fetch_add(
1757  count, std::memory_order_release);
1758  assert(prevVal + count <= BLOCK_SIZE);
1759  return prevVal + count == BLOCK_SIZE;
1760  }
1761  }
1762 
1763  template <InnerQueueContext context>
1764  inline void set_all_empty() {
1765  if (context == explicit_context &&
1767  // Set all flags
1768  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1769  emptyFlags[i].store(true, std::memory_order_relaxed);
1770  }
1771  } else {
1772  // Reset counter
1773  elementsCompletelyDequeued.store(BLOCK_SIZE, std::memory_order_relaxed);
1774  }
1775  }
1776 
1777  template <InnerQueueContext context>
1778  inline void reset_empty() {
1779  if (context == explicit_context &&
1781  // Reset flags
1782  for (size_t i = 0; i != BLOCK_SIZE; ++i) {
1783  emptyFlags[i].store(false, std::memory_order_relaxed);
1784  }
1785  } else {
1786  // Reset counter
1787  elementsCompletelyDequeued.store(0, std::memory_order_relaxed);
1788  }
1789  }
1790 
1791  inline T *operator[](index_t idx) MOODYCAMEL_NOEXCEPT {
1792  return static_cast<T *>(static_cast<void *>(elements)) +
1793  static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1794  }
1795 
1796  inline T const *operator[](index_t idx) const MOODYCAMEL_NOEXCEPT {
1797  return static_cast<T const *>(static_cast<void const *>(elements)) +
1798  static_cast<size_t>(idx & static_cast<index_t>(BLOCK_SIZE - 1));
1799  }
1800 
1801  private:
1802  // IMPORTANT: This must be the first member in Block, so that if T depends
1803  // on the alignment of addresses returned by malloc, that alignment will be
1804  // preserved. Apparently clang actually generates code that uses this
1805  // assumption for AVX instructions in some cases. Ideally, we should also
1806  // align Block to the alignment of T in case it's higher than malloc's
1807  // 16-byte alignment, but this is hard to do in a cross-platform way. Assert
1808  // for this case:
1809  static_assert(
1810  std::alignment_of<T>::value <=
1811  std::alignment_of<details::max_align_t>::value,
1812  "The queue does not support super-aligned types at this time");
1813  // Additionally, we need the alignment of Block itself to be a multiple of
1814  // max_align_t since otherwise the appropriate padding will not be added at
1815  // the end of Block in order to make arrays of Blocks all be properly
1816  // aligned (not just the first one). We use a union to force this.
1817  union {
1818  char elements[sizeof(T) * BLOCK_SIZE];
1819  details::max_align_t dummy;
1820  };
1821 
1822  public:
1823  Block *next;
1824  std::atomic<size_t> elementsCompletelyDequeued;
1825  std::atomic<bool> emptyFlags
1827 
1828  public:
1829  std::atomic<std::uint32_t> freeListRefs;
1830  std::atomic<Block *> freeListNext;
1831  std::atomic<bool> shouldBeOnFreeList;
1832  bool dynamicallyAllocated; // Perhaps a better name for this would be
1833  // 'isNotPartOfInitialBlockPool'
1834 
1835 #if MCDBGQ_TRACKMEM
1836  void *owner;
1837 #endif
1838  };
1839 
1840  static_assert(std::alignment_of<Block>::value >=
1841  std::alignment_of<details::max_align_t>::value,
1842  "Internal error: Blocks must be at least as aligned as the "
1843  "type they are wrapping");
1844 
1845 #if MCDBGQ_TRACKMEM
1846  public:
1847  struct MemStats;
1848 
1849  private:
1850 #endif
1851 
1853  // Producer base
1855 
1856  struct ProducerBase : public details::ConcurrentQueueProducerTypelessBase {
1857  ProducerBase(ConcurrentQueue *parent_, bool isExplicit_)
1858  : tailIndex(0),
1859  headIndex(0),
1860  dequeueOptimisticCount(0),
1861  dequeueOvercommit(0),
1862  tailBlock(nullptr),
1863  isExplicit(isExplicit_),
1864  parent(parent_) {}
1865 
1866  virtual ~ProducerBase(){};
1867 
1868  template <typename U>
1869  inline bool dequeue(U &element) {
1870  if (isExplicit) {
1871  return static_cast<ExplicitProducer *>(this)->dequeue(element);
1872  } else {
1873  return static_cast<ImplicitProducer *>(this)->dequeue(element);
1874  }
1875  }
1876 
1877  template <typename It>
1878  inline size_t dequeue_bulk(It &itemFirst, size_t max) {
1879  if (isExplicit) {
1880  return static_cast<ExplicitProducer *>(this)->dequeue_bulk(itemFirst,
1881  max);
1882  } else {
1883  return static_cast<ImplicitProducer *>(this)->dequeue_bulk(itemFirst,
1884  max);
1885  }
1886  }
1887 
1888  inline ProducerBase *next_prod() const {
1889  return static_cast<ProducerBase *>(next);
1890  }
1891 
1892  inline size_t size_approx() const {
1893  auto tail = tailIndex.load(std::memory_order_relaxed);
1894  auto head = headIndex.load(std::memory_order_relaxed);
1895  return details::circular_less_than(head, tail)
1896  ? static_cast<size_t>(tail - head)
1897  : 0;
1898  }
1899 
1900  inline index_t getTail() const {
1901  return tailIndex.load(std::memory_order_relaxed);
1902  }
1903 
1904  protected:
1905  std::atomic<index_t> tailIndex; // Where to enqueue to next
1906  std::atomic<index_t> headIndex; // Where to dequeue from next
1907 
1908  std::atomic<index_t> dequeueOptimisticCount;
1909  std::atomic<index_t> dequeueOvercommit;
1910 
1911  Block *tailBlock;
1912 
1913  public:
1914  bool isExplicit;
1915  ConcurrentQueue *parent;
1916 
1917  protected:
1918 #if MCDBGQ_TRACKMEM
1919  friend struct MemStats;
1920 #endif
1921  };
1922 
1924  // Explicit queue
1926 
1927  struct ExplicitProducer : public ProducerBase {
1928  explicit ExplicitProducer(ConcurrentQueue *parent)
1929  : ProducerBase(parent, true),
1930  blockIndex(nullptr),
1931  pr_blockIndexSlotsUsed(0),
1932  pr_blockIndexSize(EXPLICIT_INITIAL_INDEX_SIZE >> 1),
1933  pr_blockIndexFront(0),
1934  pr_blockIndexEntries(nullptr),
1935  pr_blockIndexRaw(nullptr) {
1936  size_t poolBasedIndexSize =
1937  details::ceil_to_pow_2(parent->initialBlockPoolSize) >> 1;
1938  if (poolBasedIndexSize > pr_blockIndexSize) {
1939  pr_blockIndexSize = poolBasedIndexSize;
1940  }
1941 
1942  new_block_index(0); // This creates an index with double the number of
1943  // current entries, i.e. EXPLICIT_INITIAL_INDEX_SIZE
1944  }
1945 
1946  ~ExplicitProducer() {
1947  // Destruct any elements not yet dequeued.
1948  // Since we're in the destructor, we can assume all elements
1949  // are either completely dequeued or completely not (no halfways).
1950  if (this->tailBlock !=
1951  nullptr) { // Note this means there must be a block index too
1952  // First find the block that's partially dequeued, if any
1953  Block *halfDequeuedBlock = nullptr;
1954  if ((this->headIndex.load(std::memory_order_relaxed) &
1955  static_cast<index_t>(BLOCK_SIZE - 1)) != 0) {
1956  // The head's not on a block boundary, meaning a block somewhere is
1957  // partially dequeued (or the head block is the tail block and was
1958  // fully dequeued, but the head/tail are still not on a boundary)
1959  size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) &
1960  (pr_blockIndexSize - 1);
1961  while (details::circular_less_than<index_t>(
1962  pr_blockIndexEntries[i].base + BLOCK_SIZE,
1963  this->headIndex.load(std::memory_order_relaxed))) {
1964  i = (i + 1) & (pr_blockIndexSize - 1);
1965  }
1966  assert(details::circular_less_than<index_t>(
1967  pr_blockIndexEntries[i].base,
1968  this->headIndex.load(std::memory_order_relaxed)));
1969  halfDequeuedBlock = pr_blockIndexEntries[i].block;
1970  }
1971 
1972  // Start at the head block (note the first line in the loop gives us the
1973  // head from the tail on the first iteration)
1974  auto block = this->tailBlock;
1975  do {
1976  block = block->next;
1977  if (block->ConcurrentQueue::Block::template is_empty<
1978  explicit_context>()) {
1979  continue;
1980  }
1981 
1982  size_t i = 0; // Offset into block
1983  if (block == halfDequeuedBlock) {
1984  i = static_cast<size_t>(
1985  this->headIndex.load(std::memory_order_relaxed) &
1986  static_cast<index_t>(BLOCK_SIZE - 1));
1987  }
1988 
1989  // Walk through all the items in the block; if this is the tail block,
1990  // we need to stop when we reach the tail index
1991  auto lastValidIndex =
1992  (this->tailIndex.load(std::memory_order_relaxed) &
1993  static_cast<index_t>(BLOCK_SIZE - 1)) == 0
1994  ? BLOCK_SIZE
1995  : static_cast<size_t>(
1996  this->tailIndex.load(std::memory_order_relaxed) &
1997  static_cast<index_t>(BLOCK_SIZE - 1));
1998  while (i != BLOCK_SIZE &&
1999  (block != this->tailBlock || i != lastValidIndex)) {
2000  (*block)[i++]->~T();
2001  }
2002  } while (block != this->tailBlock);
2003  }
2004 
2005  // Destroy all blocks that we own
2006  if (this->tailBlock != nullptr) {
2007  auto block = this->tailBlock;
2008  do {
2009  auto nextBlock = block->next;
2010  if (block->dynamicallyAllocated) {
2011  destroy(block);
2012  } else {
2013  this->parent->add_block_to_free_list(block);
2014  }
2015  block = nextBlock;
2016  } while (block != this->tailBlock);
2017  }
2018 
2019  // Destroy the block indices
2020  auto header = static_cast<BlockIndexHeader *>(pr_blockIndexRaw);
2021  while (header != nullptr) {
2022  auto prev = static_cast<BlockIndexHeader *>(header->prev);
2023  header->~BlockIndexHeader();
2024  (Traits::free)(header);
2025  header = prev;
2026  }
2027  }
2028 
2029  template <AllocationMode allocMode, typename U>
2030  inline bool enqueue(U &&element) {
2031  index_t currentTailIndex =
2032  this->tailIndex.load(std::memory_order_relaxed);
2033  index_t newTailIndex = 1 + currentTailIndex;
2034  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2035  // We reached the end of a block, start a new one
2036  auto startBlock = this->tailBlock;
2037  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2038  if (this->tailBlock != nullptr &&
2039  this->tailBlock->next->ConcurrentQueue::Block::template is_empty<
2040  explicit_context>()) {
2041  // We can re-use the block ahead of us, it's empty!
2042  this->tailBlock = this->tailBlock->next;
2043  this->tailBlock->ConcurrentQueue::Block::template reset_empty<
2044  explicit_context>();
2045 
2046  // We'll put the block on the block index (guaranteed to be room since
2047  // we're conceptually removing the last block from it first -- except
2048  // instead of removing then adding, we can just overwrite). Note that
2049  // there must be a valid block index here, since even if allocation
2050  // failed in the ctor, it would have been re-attempted when adding the
2051  // first block to the queue; since there is such a block, a block
2052  // index must have been successfully allocated.
2053  } else {
2054  // Whatever head value we see here is >= the last value we saw here
2055  // (relatively), and <= its current value. Since we have the most
2056  // recent tail, the head must be
2057  // <= to it.
2058  auto head = this->headIndex.load(std::memory_order_relaxed);
2059  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2060  if (!details::circular_less_than<index_t>(
2061  head, currentTailIndex + BLOCK_SIZE) ||
2063  (MAX_SUBQUEUE_SIZE == 0 ||
2064  MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2065  // We can't enqueue in another block because there's not enough
2066  // leeway -- the tail could surpass the head by the time the block
2067  // fills up! (Or we'll exceed the size limit, if the second part of
2068  // the condition was true.)
2069  return false;
2070  }
2071  // We're going to need a new block; check that the block index has
2072  // room
2073  if (pr_blockIndexRaw == nullptr ||
2074  pr_blockIndexSlotsUsed == pr_blockIndexSize) {
2075  // Hmm, the circular block index is already full -- we'll need
2076  // to allocate a new index. Note pr_blockIndexRaw can only be
2077  // nullptr if the initial allocation failed in the constructor.
2078 
2079  if (allocMode == CannotAlloc ||
2080  !new_block_index(pr_blockIndexSlotsUsed)) {
2081  return false;
2082  }
2083  }
2084 
2085  // Insert a new block in the circular linked list
2086  auto newBlock =
2087  this->parent
2088  ->ConcurrentQueue::template requisition_block<allocMode>();
2089  if (newBlock == nullptr) {
2090  return false;
2091  }
2092 #if MCDBGQ_TRACKMEM
2093  newBlock->owner = this;
2094 #endif
2095  newBlock->ConcurrentQueue::Block::template reset_empty<
2096  explicit_context>();
2097  if (this->tailBlock == nullptr) {
2098  newBlock->next = newBlock;
2099  } else {
2100  newBlock->next = this->tailBlock->next;
2101  this->tailBlock->next = newBlock;
2102  }
2103  this->tailBlock = newBlock;
2104  ++pr_blockIndexSlotsUsed;
2105  }
2106 
2108  T, U, new (nullptr) T(std::forward<U>(element)))) {
2109  // The constructor may throw. We want the element not to appear in the
2110  // queue in that case (without corrupting the queue):
2111  MOODYCAMEL_TRY {
2112  new ((*this->tailBlock)[currentTailIndex])
2113  T(std::forward<U>(element));
2114  }
2115  MOODYCAMEL_CATCH(...) {
2116  // Revert change to the current block, but leave the new block
2117  // available for next time
2118  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2119  this->tailBlock =
2120  startBlock == nullptr ? this->tailBlock : startBlock;
2122  }
2123  } else {
2124  (void)startBlock;
2125  (void)originalBlockIndexSlotsUsed;
2126  }
2127 
2128  // Add block to block index
2129  auto &entry = blockIndex.load(std::memory_order_relaxed)
2130  ->entries[pr_blockIndexFront];
2131  entry.base = currentTailIndex;
2132  entry.block = this->tailBlock;
2133  blockIndex.load(std::memory_order_relaxed)
2134  ->front.store(pr_blockIndexFront, std::memory_order_release);
2135  pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2136 
2138  T, U, new (nullptr) T(std::forward<U>(element)))) {
2139  this->tailIndex.store(newTailIndex, std::memory_order_release);
2140  return true;
2141  }
2142  }
2143 
2144  // Enqueue
2145  new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2146 
2147  this->tailIndex.store(newTailIndex, std::memory_order_release);
2148  return true;
2149  }
2150 
2151  template <typename U>
2152  bool dequeue(U &element) {
2153  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2154  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2155  if (details::circular_less_than<index_t>(
2156  this->dequeueOptimisticCount.load(std::memory_order_relaxed) -
2157  overcommit,
2158  tail)) {
2159  // Might be something to dequeue, let's give it a try
2160 
2161  // Note that this if is purely for performance purposes in the common
2162  // case when the queue is empty and the values are eventually consistent
2163  // -- we may enter here spuriously.
2164 
2165  // Note that whatever the values of overcommit and tail are, they are
2166  // not going to change (unless we change them) and must be the same
2167  // value at this point (inside the if) as when the if condition was
2168  // evaluated.
2169 
2170  // We insert an acquire fence here to synchronize-with the release upon
2171  // incrementing dequeueOvercommit below. This ensures that whatever the
2172  // value we got loaded into overcommit, the load of dequeueOptisticCount
2173  // in the fetch_add below will result in a value at least as recent as
2174  // that (and therefore at least as large). Note that I believe a
2175  // compiler (signal) fence here would be sufficient due to the nature of
2176  // fetch_add (all read-modify-write operations are guaranteed to work on
2177  // the latest value in the modification order), but unfortunately that
2178  // can't be shown to be correct using only the C++11 standard. See
2179  // http://stackoverflow.com/questions/18223161/what-are-the-c11-memory-ordering-guarantees-in-this-corner-case
2180  std::atomic_thread_fence(std::memory_order_acquire);
2181 
2182  // Increment optimistic counter, then check if it went over the boundary
2183  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
2185 
2186  // Note that since dequeueOvercommit must be <= dequeueOptimisticCount
2187  // (because dequeueOvercommit is only ever incremented after
2188  // dequeueOptimisticCount -- this is enforced in the `else` block
2189  // below), and since we now have a version of dequeueOptimisticCount
2190  // that is at least as recent as overcommit (due to the release upon
2191  // incrementing dequeueOvercommit and the acquire above that
2192  // synchronizes with it), overcommit <= myDequeueCount. However, we
2193  // can't assert this since both dequeueOptimisticCount and
2194  // dequeueOvercommit may (independently) overflow; in such a case,
2195  // though, the logic still holds since the difference between the two is
2196  // maintained.
2197 
2198  // Note that we reload tail here in case it changed; it will be the same
2199  // value as before or greater, since this load is sequenced after
2200  // (happens after) the earlier load above. This is supported by
2201  // read-read coherency (as defined in the standard), explained here:
2202  // http://en.cppreference.com/w/cpp/atomic/memory_order
2203  tail = this->tailIndex.load(std::memory_order_acquire);
2204  if ((details::likely)(details::circular_less_than<index_t>(
2205  myDequeueCount - overcommit, tail))) {
2206  // Guaranteed to be at least one element to dequeue!
2207 
2208  // Get the index. Note that since there's guaranteed to be at least
2209  // one element, this will never exceed tail. We need to do an
2210  // acquire-release fence here since it's possible that whatever
2211  // condition got us to this point was for an earlier enqueued element
2212  // (that we already see the memory effects for), but that by the time
2213  // we increment somebody else has incremented it, and we need to see
2214  // the memory effects for *that* element, which is in such a case is
2215  // necessarily visible on the thread that incremented it in the first
2216  // place with the more current condition (they must have acquired a
2217  // tail that is at least as recent).
2218  auto index = this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2219 
2220  // Determine which block the element is in
2221 
2222  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2223  auto localBlockIndexHead =
2224  localBlockIndex->front.load(std::memory_order_acquire);
2225 
2226  // We need to be careful here about subtracting and dividing because
2227  // of index wrap-around. When an index wraps, we need to preserve the
2228  // sign of the offset when dividing it by the block size (in order to
2229  // get a correct signed block count offset in all cases):
2230  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2231  auto blockBaseIndex = index & ~static_cast<index_t>(BLOCK_SIZE - 1);
2232  auto offset = static_cast<size_t>(
2233  static_cast<typename std::make_signed<index_t>::type>(
2234  blockBaseIndex - headBase) /
2235  BLOCK_SIZE);
2236  auto block = localBlockIndex
2237  ->entries[(localBlockIndexHead + offset) &
2238  (localBlockIndex->size - 1)]
2239  .block;
2240 
2241  // Dequeue
2242  auto &el = *((*block)[index]);
2243  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
2244  // Make sure the element is still fully dequeued and destroyed even
2245  // if the assignment throws
2246  struct Guard {
2247  Block *block;
2248  index_t index;
2249 
2250  ~Guard() {
2251  (*block)[index]->~T();
2252  block->ConcurrentQueue::Block::template set_empty<
2253  explicit_context>(index);
2254  }
2255  } guard = {block, index};
2256 
2257  element = std::move(el);
2258  } else {
2259  element = std::move(el);
2260  el.~T();
2261  block->ConcurrentQueue::Block::template set_empty<explicit_context>(
2262  index);
2263  }
2264 
2265  return true;
2266  } else {
2267  // Wasn't anything to dequeue after all; make the effective dequeue
2268  // count eventually consistent
2269  this->dequeueOvercommit.fetch_add(
2270  1,
2271  std::memory_order_release); // Release so that the fetch_add on
2272  // dequeueOptimisticCount is
2273  // guaranteed to happen before this
2274  // write
2275  }
2276  }
2277 
2278  return false;
2279  }
2280 
2281  template <AllocationMode allocMode, typename It>
2282  bool enqueue_bulk(It itemFirst, size_t count) {
2283  // First, we need to make sure we have enough room to enqueue all of the
2284  // elements; this means pre-allocating blocks and putting them in the
2285  // block index (but only if all the allocations succeeded).
2286  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2287  auto startBlock = this->tailBlock;
2288  auto originalBlockIndexFront = pr_blockIndexFront;
2289  auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2290 
2291  Block *firstAllocatedBlock = nullptr;
2292 
2293  // Figure out how many blocks we'll need to allocate, and do so
2294  size_t blockBaseDiff =
2295  ((startTailIndex + count - 1) &
2296  ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2297  ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2298  index_t currentTailIndex =
2299  (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2300  if (blockBaseDiff > 0) {
2301  // Allocate as many blocks as possible from ahead
2302  while (blockBaseDiff > 0 && this->tailBlock != nullptr &&
2303  this->tailBlock->next != firstAllocatedBlock &&
2304  this->tailBlock->next->ConcurrentQueue::Block::template is_empty<
2305  explicit_context>()) {
2306  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2307  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2308 
2309  this->tailBlock = this->tailBlock->next;
2310  firstAllocatedBlock = firstAllocatedBlock == nullptr
2311  ? this->tailBlock
2312  : firstAllocatedBlock;
2313 
2314  auto &entry = blockIndex.load(std::memory_order_relaxed)
2315  ->entries[pr_blockIndexFront];
2316  entry.base = currentTailIndex;
2317  entry.block = this->tailBlock;
2318  pr_blockIndexFront =
2319  (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2320  }
2321 
2322  // Now allocate as many blocks as necessary from the block pool
2323  while (blockBaseDiff > 0) {
2324  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2325  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2326 
2327  auto head = this->headIndex.load(std::memory_order_relaxed);
2328  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2329  bool full =
2330  !details::circular_less_than<index_t>(
2331  head, currentTailIndex + BLOCK_SIZE) ||
2333  (MAX_SUBQUEUE_SIZE == 0 ||
2334  MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2335  if (pr_blockIndexRaw == nullptr ||
2336  pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2337  if (allocMode == CannotAlloc || full ||
2338  !new_block_index(originalBlockIndexSlotsUsed)) {
2339  // Failed to allocate, undo changes (but keep injected blocks)
2340  pr_blockIndexFront = originalBlockIndexFront;
2341  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2342  this->tailBlock =
2343  startBlock == nullptr ? firstAllocatedBlock : startBlock;
2344  return false;
2345  }
2346 
2347  // pr_blockIndexFront is updated inside new_block_index, so we need
2348  // to update our fallback value too (since we keep the new index
2349  // even if we later fail)
2350  originalBlockIndexFront = originalBlockIndexSlotsUsed;
2351  }
2352 
2353  // Insert a new block in the circular linked list
2354  auto newBlock =
2355  this->parent
2356  ->ConcurrentQueue::template requisition_block<allocMode>();
2357  if (newBlock == nullptr) {
2358  pr_blockIndexFront = originalBlockIndexFront;
2359  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2360  this->tailBlock =
2361  startBlock == nullptr ? firstAllocatedBlock : startBlock;
2362  return false;
2363  }
2364 
2365 #if MCDBGQ_TRACKMEM
2366  newBlock->owner = this;
2367 #endif
2368  newBlock->ConcurrentQueue::Block::template set_all_empty<
2369  explicit_context>();
2370  if (this->tailBlock == nullptr) {
2371  newBlock->next = newBlock;
2372  } else {
2373  newBlock->next = this->tailBlock->next;
2374  this->tailBlock->next = newBlock;
2375  }
2376  this->tailBlock = newBlock;
2377  firstAllocatedBlock = firstAllocatedBlock == nullptr
2378  ? this->tailBlock
2379  : firstAllocatedBlock;
2380 
2381  ++pr_blockIndexSlotsUsed;
2382 
2383  auto &entry = blockIndex.load(std::memory_order_relaxed)
2384  ->entries[pr_blockIndexFront];
2385  entry.base = currentTailIndex;
2386  entry.block = this->tailBlock;
2387  pr_blockIndexFront =
2388  (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2389  }
2390 
2391  // Excellent, all allocations succeeded. Reset each block's emptiness
2392  // before we fill them up, and publish the new block index front
2393  auto block = firstAllocatedBlock;
2394  while (true) {
2395  block->ConcurrentQueue::Block::template reset_empty<
2396  explicit_context>();
2397  if (block == this->tailBlock) {
2398  break;
2399  }
2400  block = block->next;
2401  }
2402 
2404  T, decltype(*itemFirst),
2405  new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2406  blockIndex.load(std::memory_order_relaxed)
2407  ->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
2409  }
2410  }
2411 
2412  // Enqueue, one block at a time
2413  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
2414  currentTailIndex = startTailIndex;
2415  auto endBlock = this->tailBlock;
2416  this->tailBlock = startBlock;
2417  assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
2418  firstAllocatedBlock != nullptr || count == 0);
2419  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 &&
2420  firstAllocatedBlock != nullptr) {
2421  this->tailBlock = firstAllocatedBlock;
2422  }
2423  while (true) {
2424  auto stopIndex =
2425  (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2426  static_cast<index_t>(BLOCK_SIZE);
2427  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2428  stopIndex = newTailIndex;
2429  }
2431  T, decltype(*itemFirst),
2432  new (nullptr) T(details::deref_noexcept(itemFirst)))) {
2433  while (currentTailIndex != stopIndex) {
2434  new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2435  }
2436  } else {
2437  MOODYCAMEL_TRY {
2438  while (currentTailIndex != stopIndex) {
2439  // Must use copy constructor even if move constructor is available
2440  // because we may have to revert if there's an exception.
2441  // Sorry about the horrible templated next line, but it was the
2442  // only way to disable moving *at compile time*, which is
2443  // important because a type may only define a (noexcept) move
2444  // constructor, and so calls to the cctor will not compile, even
2445  // if they are in an if branch that will never be executed
2446  new ((*this->tailBlock)[currentTailIndex])
2447  T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(
2448  T, decltype(*itemFirst),
2449  new (nullptr) T(details::deref_noexcept(
2450  itemFirst)))>::eval(*itemFirst));
2451  ++currentTailIndex;
2452  ++itemFirst;
2453  }
2454  }
2455  MOODYCAMEL_CATCH(...) {
2456  // Oh dear, an exception's been thrown -- destroy the elements that
2457  // were enqueued so far and revert the entire bulk operation (we'll
2458  // keep any allocated blocks in our linked list for later, though).
2459  auto constructedStopIndex = currentTailIndex;
2460  auto lastBlockEnqueued = this->tailBlock;
2461 
2462  pr_blockIndexFront = originalBlockIndexFront;
2463  pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2464  this->tailBlock =
2465  startBlock == nullptr ? firstAllocatedBlock : startBlock;
2466 
2467  if (!details::is_trivially_destructible<T>::value) {
2468  auto block = startBlock;
2469  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) ==
2470  0) {
2471  block = firstAllocatedBlock;
2472  }
2473  currentTailIndex = startTailIndex;
2474  while (true) {
2475  stopIndex =
2476  (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2477  static_cast<index_t>(BLOCK_SIZE);
2478  if (details::circular_less_than<index_t>(constructedStopIndex,
2479  stopIndex)) {
2480  stopIndex = constructedStopIndex;
2481  }
2482  while (currentTailIndex != stopIndex) {
2483  (*block)[currentTailIndex++]->~T();
2484  }
2485  if (block == lastBlockEnqueued) {
2486  break;
2487  }
2488  block = block->next;
2489  }
2490  }
2492  }
2493  }
2494 
2495  if (this->tailBlock == endBlock) {
2496  assert(currentTailIndex == newTailIndex);
2497  break;
2498  }
2499  this->tailBlock = this->tailBlock->next;
2500  }
2501 
2503  T, decltype(*itemFirst),
2504  new (nullptr) T(details::deref_noexcept(itemFirst))) &&
2505  firstAllocatedBlock != nullptr) {
2506  blockIndex.load(std::memory_order_relaxed)
2507  ->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
2509  }
2510 
2511  this->tailIndex.store(newTailIndex, std::memory_order_release);
2512  return true;
2513  }
2514 
2515  template <typename It>
2516  size_t dequeue_bulk(It &itemFirst, size_t max) {
2517  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2518  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
2519  auto desiredCount = static_cast<size_t>(
2520  tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) -
2521  overcommit));
2522  if (details::circular_less_than<size_t>(0, desiredCount)) {
2523  desiredCount = desiredCount < max ? desiredCount : max;
2524  std::atomic_thread_fence(std::memory_order_acquire);
2525 
2526  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
2527  desiredCount, std::memory_order_relaxed);
2528  ;
2529 
2530  tail = this->tailIndex.load(std::memory_order_acquire);
2531  auto actualCount =
2532  static_cast<size_t>(tail - (myDequeueCount - overcommit));
2533  if (details::circular_less_than<size_t>(0, actualCount)) {
2534  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2535  if (actualCount < desiredCount) {
2536  this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
2538  }
2539 
2540  // Get the first index. Note that since there's guaranteed to be at
2541  // least actualCount elements, this will never exceed tail.
2542  auto firstIndex =
2543  this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
2544 
2545  // Determine which block the first element is in
2546  auto localBlockIndex = blockIndex.load(std::memory_order_acquire);
2547  auto localBlockIndexHead =
2548  localBlockIndex->front.load(std::memory_order_acquire);
2549 
2550  auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2551  auto firstBlockBaseIndex =
2552  firstIndex & ~static_cast<index_t>(BLOCK_SIZE - 1);
2553  auto offset = static_cast<size_t>(
2554  static_cast<typename std::make_signed<index_t>::type>(
2555  firstBlockBaseIndex - headBase) /
2556  BLOCK_SIZE);
2557  auto indexIndex =
2558  (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2559 
2560  // Iterate the blocks and dequeue
2561  auto index = firstIndex;
2562  do {
2563  auto firstIndexInBlock = index;
2564  auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2565  static_cast<index_t>(BLOCK_SIZE);
2566  endIndex =
2567  details::circular_less_than<index_t>(
2568  firstIndex + static_cast<index_t>(actualCount), endIndex)
2569  ? firstIndex + static_cast<index_t>(actualCount)
2570  : endIndex;
2571  auto block = localBlockIndex->entries[indexIndex].block;
2572  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&,
2573  details::deref_noexcept(itemFirst) =
2574  std::move((*(*block)[index])))) {
2575  while (index != endIndex) {
2576  auto &el = *((*block)[index]);
2577  *itemFirst++ = std::move(el);
2578  el.~T();
2579  ++index;
2580  }
2581  } else {
2582  MOODYCAMEL_TRY {
2583  while (index != endIndex) {
2584  auto &el = *((*block)[index]);
2585  *itemFirst = std::move(el);
2586  ++itemFirst;
2587  el.~T();
2588  ++index;
2589  }
2590  }
2591  MOODYCAMEL_CATCH(...) {
2592  // It's too late to revert the dequeue, but we can make sure
2593  // that all the dequeued objects are properly destroyed and the
2594  // block index (and empty count) are properly updated before we
2595  // propagate the exception
2596  do {
2597  block = localBlockIndex->entries[indexIndex].block;
2598  while (index != endIndex) {
2599  (*block)[index++]->~T();
2600  }
2601  block->ConcurrentQueue::Block::template set_many_empty<
2602  explicit_context>(
2603  firstIndexInBlock,
2604  static_cast<size_t>(endIndex - firstIndexInBlock));
2605  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2606 
2607  firstIndexInBlock = index;
2608  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
2609  static_cast<index_t>(BLOCK_SIZE);
2610  endIndex =
2611  details::circular_less_than<index_t>(
2612  firstIndex + static_cast<index_t>(actualCount),
2613  endIndex)
2614  ? firstIndex + static_cast<index_t>(actualCount)
2615  : endIndex;
2616  } while (index != firstIndex + actualCount);
2617 
2619  }
2620  }
2621  block->ConcurrentQueue::Block::template set_many_empty<
2622  explicit_context>(
2623  firstIndexInBlock,
2624  static_cast<size_t>(endIndex - firstIndexInBlock));
2625  indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2626  } while (index != firstIndex + actualCount);
2627 
2628  return actualCount;
2629  } else {
2630  // Wasn't anything to dequeue after all; make the effective dequeue
2631  // count eventually consistent
2632  this->dequeueOvercommit.fetch_add(desiredCount,
2634  }
2635  }
2636 
2637  return 0;
2638  }
2639 
2640  private:
2641  struct BlockIndexEntry {
2642  index_t base;
2643  Block *block;
2644  };
2645 
2646  struct BlockIndexHeader {
2647  size_t size;
2648  std::atomic<size_t>
2649  front; // Current slot (not next, like pr_blockIndexFront)
2650  BlockIndexEntry *entries;
2651  void *prev;
2652  };
2653 
2654  bool new_block_index(size_t numberOfFilledSlotsToExpose) {
2655  auto prevBlockSizeMask = pr_blockIndexSize - 1;
2656 
2657  // Create the new block
2658  pr_blockIndexSize <<= 1;
2659  auto newRawPtr = static_cast<char *>((Traits::malloc)(
2660  sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value -
2661  1 + sizeof(BlockIndexEntry) * pr_blockIndexSize));
2662  if (newRawPtr == nullptr) {
2663  pr_blockIndexSize >>= 1; // Reset to allow graceful retry
2664  return false;
2665  }
2666 
2667  auto newBlockIndexEntries = reinterpret_cast<BlockIndexEntry *>(
2668  details::align_for<BlockIndexEntry>(newRawPtr +
2669  sizeof(BlockIndexHeader)));
2670 
2671  // Copy in all the old indices, if any
2672  size_t j = 0;
2673  if (pr_blockIndexSlotsUsed != 0) {
2674  auto i =
2675  (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2676  do {
2677  newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2678  i = (i + 1) & prevBlockSizeMask;
2679  } while (i != pr_blockIndexFront);
2680  }
2681 
2682  // Update everything
2683  auto header = new (newRawPtr) BlockIndexHeader;
2684  header->size = pr_blockIndexSize;
2685  header->front.store(numberOfFilledSlotsToExpose - 1,
2687  header->entries = newBlockIndexEntries;
2688  header->prev = pr_blockIndexRaw; // we link the new block to the old one
2689  // so we can free it later
2690 
2691  pr_blockIndexFront = j;
2692  pr_blockIndexEntries = newBlockIndexEntries;
2693  pr_blockIndexRaw = newRawPtr;
2694  blockIndex.store(header, std::memory_order_release);
2695 
2696  return true;
2697  }
2698 
2699  private:
2700  std::atomic<BlockIndexHeader *> blockIndex;
2701 
2702  // To be used by producer only -- consumer must use the ones in referenced
2703  // by blockIndex
2704  size_t pr_blockIndexSlotsUsed;
2705  size_t pr_blockIndexSize;
2706  size_t pr_blockIndexFront; // Next slot (not current)
2707  BlockIndexEntry *pr_blockIndexEntries;
2708  void *pr_blockIndexRaw;
2709 
2710 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2711  public:
2712  ExplicitProducer *nextExplicitProducer;
2713 
2714  private:
2715 #endif
2716 
2717 #if MCDBGQ_TRACKMEM
2718  friend struct MemStats;
2719 #endif
2720  };
2721 
2723  // Implicit queue
2725 
2726  struct ImplicitProducer : public ProducerBase {
2728  : ProducerBase(parent, false),
2729  nextBlockIndexCapacity(IMPLICIT_INITIAL_INDEX_SIZE),
2730  blockIndex(nullptr) {
2731  new_block_index();
2732  }
2733 
2734  ~ImplicitProducer() {
2735  // Note that since we're in the destructor we can assume that all
2736  // enqueue/dequeue operations completed already; this means that all
2737  // undequeued elements are placed contiguously across contiguous blocks,
2738  // and that only the first and last remaining blocks can be only partially
2739  // empty (all other remaining blocks must be completely full).
2740 
2741 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2742  // Unregister ourselves for thread termination notification
2743  if (!this->inactive.load(std::memory_order_relaxed)) {
2744  details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2745  }
2746 #endif
2747 
2748  // Destroy all remaining elements!
2749  auto tail = this->tailIndex.load(std::memory_order_relaxed);
2750  auto index = this->headIndex.load(std::memory_order_relaxed);
2751  Block *block = nullptr;
2752  assert(index == tail || details::circular_less_than(index, tail));
2753  bool forceFreeLastBlock =
2754  index != tail; // If we enter the loop, then the last (tail) block
2755  // will not be freed
2756  while (index != tail) {
2757  if ((index & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 ||
2758  block == nullptr) {
2759  if (block != nullptr) {
2760  // Free the old block
2761  this->parent->add_block_to_free_list(block);
2762  }
2763 
2764  block = get_block_index_entry_for_index(index)->value.load(
2766  }
2767 
2768  ((*block)[index])->~T();
2769  ++index;
2770  }
2771  // Even if the queue is empty, there's still one block that's not on the
2772  // free list (unless the head index reached the end of it, in which case
2773  // the tail will be poised to create a new block).
2774  if (this->tailBlock != nullptr &&
2775  (forceFreeLastBlock ||
2776  (tail & static_cast<index_t>(BLOCK_SIZE - 1)) != 0)) {
2777  this->parent->add_block_to_free_list(this->tailBlock);
2778  }
2779 
2780  // Destroy block index
2781  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
2782  if (localBlockIndex != nullptr) {
2783  for (size_t i = 0; i != localBlockIndex->capacity; ++i) {
2784  localBlockIndex->index[i]->~BlockIndexEntry();
2785  }
2786  do {
2787  auto prev = localBlockIndex->prev;
2788  localBlockIndex->~BlockIndexHeader();
2789  (Traits::free)(localBlockIndex);
2790  localBlockIndex = prev;
2791  } while (localBlockIndex != nullptr);
2792  }
2793  }
2794 
2795  template <AllocationMode allocMode, typename U>
2796  inline bool enqueue(U &&element) {
2797  index_t currentTailIndex =
2798  this->tailIndex.load(std::memory_order_relaxed);
2799  index_t newTailIndex = 1 + currentTailIndex;
2800  if ((currentTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0) {
2801  // We reached the end of a block, start a new one
2802  auto head = this->headIndex.load(std::memory_order_relaxed);
2803  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2804  if (!details::circular_less_than<index_t>(
2805  head, currentTailIndex + BLOCK_SIZE) ||
2807  (MAX_SUBQUEUE_SIZE == 0 ||
2808  MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head))) {
2809  return false;
2810  }
2811 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2812  debug::DebugLock lock(mutex);
2813 #endif
2814  // Find out where we'll be inserting this block in the block index
2815  BlockIndexEntry *idxEntry;
2816  if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2817  return false;
2818  }
2819 
2820  // Get ahold of a new block
2821  auto newBlock =
2822  this->parent
2823  ->ConcurrentQueue::template requisition_block<allocMode>();
2824  if (newBlock == nullptr) {
2825  rewind_block_index_tail();
2826  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2827  return false;
2828  }
2829 #if MCDBGQ_TRACKMEM
2830  newBlock->owner = this;
2831 #endif
2832  newBlock
2833  ->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2834 
2836  T, U, new (nullptr) T(std::forward<U>(element)))) {
2837  // May throw, try to insert now before we publish the fact that we
2838  // have this new block
2839  MOODYCAMEL_TRY {
2840  new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2841  }
2842  MOODYCAMEL_CATCH(...) {
2843  rewind_block_index_tail();
2844  idxEntry->value.store(nullptr, std::memory_order_relaxed);
2845  this->parent->add_block_to_free_list(newBlock);
2847  }
2848  }
2849 
2850  // Insert the new block into the index
2851  idxEntry->value.store(newBlock, std::memory_order_relaxed);
2852 
2853  this->tailBlock = newBlock;
2854 
2856  T, U, new (nullptr) T(std::forward<U>(element)))) {
2857  this->tailIndex.store(newTailIndex, std::memory_order_release);
2858  return true;
2859  }
2860  }
2861 
2862  // Enqueue
2863  new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2864 
2865  this->tailIndex.store(newTailIndex, std::memory_order_release);
2866  return true;
2867  }
2868 
2869  template <typename U>
2870  bool dequeue(U &element) {
2871  // See ExplicitProducer::dequeue for rationale and explanation
2872  index_t tail = this->tailIndex.load(std::memory_order_relaxed);
2873  index_t overcommit =
2874  this->dequeueOvercommit.load(std::memory_order_relaxed);
2875  if (details::circular_less_than<index_t>(
2876  this->dequeueOptimisticCount.load(std::memory_order_relaxed) -
2877  overcommit,
2878  tail)) {
2879  std::atomic_thread_fence(std::memory_order_acquire);
2880 
2881  index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(
2883  tail = this->tailIndex.load(std::memory_order_acquire);
2884  if ((details::likely)(details::circular_less_than<index_t>(
2885  myDequeueCount - overcommit, tail))) {
2886  index_t index =
2887  this->headIndex.fetch_add(1, std::memory_order_acq_rel);
2888 
2889  // Determine which block the element is in
2890  auto entry = get_block_index_entry_for_index(index);
2891 
2892  // Dequeue
2893  auto block = entry->value.load(std::memory_order_relaxed);
2894  auto &el = *((*block)[index]);
2895 
2896  if (!MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&, element = std::move(el))) {
2897 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2898  // Note: Acquiring the mutex with every dequeue instead of only when
2899  // a block is released is very sub-optimal, but it is, after all,
2900  // purely debug code.
2901  debug::DebugLock lock(producer->mutex);
2902 #endif
2903  struct Guard {
2904  Block *block;
2905  index_t index;
2906  BlockIndexEntry *entry;
2907  ConcurrentQueue *parent;
2908 
2909  ~Guard() {
2910  (*block)[index]->~T();
2911  if (block->ConcurrentQueue::Block::template set_empty<
2912  implicit_context>(index)) {
2913  entry->value.store(nullptr, std::memory_order_relaxed);
2914  parent->add_block_to_free_list(block);
2915  }
2916  }
2917  } guard = {block, index, entry, this->parent};
2918 
2919  element = std::move(el);
2920  } else {
2921  element = std::move(el);
2922  el.~T();
2923 
2924  if (block->ConcurrentQueue::Block::template set_empty<
2925  implicit_context>(index)) {
2926  {
2927 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2928  debug::DebugLock lock(mutex);
2929 #endif
2930  // Add the block back into the global free pool (and remove from
2931  // block index)
2932  entry->value.store(nullptr, std::memory_order_relaxed);
2933  }
2934  this->parent->add_block_to_free_list(
2935  block); // releases the above store
2936  }
2937  }
2938 
2939  return true;
2940  } else {
2941  this->dequeueOvercommit.fetch_add(1, std::memory_order_release);
2942  }
2943  }
2944 
2945  return false;
2946  }
2947 
2948  template <AllocationMode allocMode, typename It>
2949  bool enqueue_bulk(It itemFirst, size_t count) {
2950  // First, we need to make sure we have enough room to enqueue all of the
2951  // elements; this means pre-allocating blocks and putting them in the
2952  // block index (but only if all the allocations succeeded).
2953 
2954  // Note that the tailBlock we start off with may not be owned by us any
2955  // more; this happens if it was filled up exactly to the top (setting
2956  // tailIndex to the first index of the next block which is not yet
2957  // allocated), then dequeued completely (putting it on the free list)
2958  // before we enqueue again.
2959 
2960  index_t startTailIndex = this->tailIndex.load(std::memory_order_relaxed);
2961  auto startBlock = this->tailBlock;
2962  Block *firstAllocatedBlock = nullptr;
2963  auto endBlock = this->tailBlock;
2964 
2965  // Figure out how many blocks we'll need to allocate, and do so
2966  size_t blockBaseDiff =
2967  ((startTailIndex + count - 1) &
2968  ~static_cast<index_t>(BLOCK_SIZE - 1)) -
2969  ((startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1));
2970  index_t currentTailIndex =
2971  (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
2972  if (blockBaseDiff > 0) {
2973 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2974  debug::DebugLock lock(mutex);
2975 #endif
2976  do {
2977  blockBaseDiff -= static_cast<index_t>(BLOCK_SIZE);
2978  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
2979 
2980  // Find out where we'll be inserting this block in the block index
2981  BlockIndexEntry *idxEntry =
2982  nullptr; // initialization here unnecessary but compiler can't
2983  // always tell
2984  Block *newBlock;
2985  bool indexInserted = false;
2986  auto head = this->headIndex.load(std::memory_order_relaxed);
2987  assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2988  bool full =
2989  !details::circular_less_than<index_t>(
2990  head, currentTailIndex + BLOCK_SIZE) ||
2992  (MAX_SUBQUEUE_SIZE == 0 ||
2993  MAX_SUBQUEUE_SIZE - BLOCK_SIZE < currentTailIndex - head));
2994  if (full ||
2995  !(indexInserted = insert_block_index_entry<allocMode>(
2996  idxEntry, currentTailIndex)) ||
2997  (newBlock =
2998  this->parent->ConcurrentQueue::template requisition_block<
2999  allocMode>()) == nullptr) {
3000  // Index allocation or block allocation failed; revert any other
3001  // allocations and index insertions done so far for this operation
3002  if (indexInserted) {
3003  rewind_block_index_tail();
3004  idxEntry->value.store(nullptr, std::memory_order_relaxed);
3005  }
3006  currentTailIndex =
3007  (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
3008  for (auto block = firstAllocatedBlock; block != nullptr;
3009  block = block->next) {
3010  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
3011  idxEntry = get_block_index_entry_for_index(currentTailIndex);
3012  idxEntry->value.store(nullptr, std::memory_order_relaxed);
3013  rewind_block_index_tail();
3014  }
3015  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
3016  this->tailBlock = startBlock;
3017 
3018  return false;
3019  }
3020 
3021 #if MCDBGQ_TRACKMEM
3022  newBlock->owner = this;
3023 #endif
3024  newBlock->ConcurrentQueue::Block::template reset_empty<
3025  implicit_context>();
3026  newBlock->next = nullptr;
3027 
3028  // Insert the new block into the index
3029  idxEntry->value.store(newBlock, std::memory_order_relaxed);
3030 
3031  // Store the chain of blocks so that we can undo if later allocations
3032  // fail, and so that we can find the blocks when we do the actual
3033  // enqueueing
3034  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
3035  firstAllocatedBlock != nullptr) {
3036  assert(this->tailBlock != nullptr);
3037  this->tailBlock->next = newBlock;
3038  }
3039  this->tailBlock = newBlock;
3040  endBlock = newBlock;
3041  firstAllocatedBlock =
3042  firstAllocatedBlock == nullptr ? newBlock : firstAllocatedBlock;
3043  } while (blockBaseDiff > 0);
3044  }
3045 
3046  // Enqueue, one block at a time
3047  index_t newTailIndex = startTailIndex + static_cast<index_t>(count);
3048  currentTailIndex = startTailIndex;
3049  this->tailBlock = startBlock;
3050  assert((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) != 0 ||
3051  firstAllocatedBlock != nullptr || count == 0);
3052  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) == 0 &&
3053  firstAllocatedBlock != nullptr) {
3054  this->tailBlock = firstAllocatedBlock;
3055  }
3056  while (true) {
3057  auto stopIndex =
3058  (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
3059  static_cast<index_t>(BLOCK_SIZE);
3060  if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
3061  stopIndex = newTailIndex;
3062  }
3064  T, decltype(*itemFirst),
3065  new (nullptr) T(details::deref_noexcept(itemFirst)))) {
3066  while (currentTailIndex != stopIndex) {
3067  new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
3068  }
3069  } else {
3070  MOODYCAMEL_TRY {
3071  while (currentTailIndex != stopIndex) {
3072  new ((*this->tailBlock)[currentTailIndex])
3073  T(details::nomove_if<(bool)!MOODYCAMEL_NOEXCEPT_CTOR(
3074  T, decltype(*itemFirst),
3075  new (nullptr) T(details::deref_noexcept(
3076  itemFirst)))>::eval(*itemFirst));
3077  ++currentTailIndex;
3078  ++itemFirst;
3079  }
3080  }
3081  MOODYCAMEL_CATCH(...) {
3082  auto constructedStopIndex = currentTailIndex;
3083  auto lastBlockEnqueued = this->tailBlock;
3084 
3085  if (!details::is_trivially_destructible<T>::value) {
3086  auto block = startBlock;
3087  if ((startTailIndex & static_cast<index_t>(BLOCK_SIZE - 1)) ==
3088  0) {
3089  block = firstAllocatedBlock;
3090  }
3091  currentTailIndex = startTailIndex;
3092  while (true) {
3093  stopIndex =
3094  (currentTailIndex & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
3095  static_cast<index_t>(BLOCK_SIZE);
3096  if (details::circular_less_than<index_t>(constructedStopIndex,
3097  stopIndex)) {
3098  stopIndex = constructedStopIndex;
3099  }
3100  while (currentTailIndex != stopIndex) {
3101  (*block)[currentTailIndex++]->~T();
3102  }
3103  if (block == lastBlockEnqueued) {
3104  break;
3105  }
3106  block = block->next;
3107  }
3108  }
3109 
3110  currentTailIndex =
3111  (startTailIndex - 1) & ~static_cast<index_t>(BLOCK_SIZE - 1);
3112  for (auto block = firstAllocatedBlock; block != nullptr;
3113  block = block->next) {
3114  currentTailIndex += static_cast<index_t>(BLOCK_SIZE);
3115  auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
3116  idxEntry->value.store(nullptr, std::memory_order_relaxed);
3117  rewind_block_index_tail();
3118  }
3119  this->parent->add_blocks_to_free_list(firstAllocatedBlock);
3120  this->tailBlock = startBlock;
3122  }
3123  }
3124 
3125  if (this->tailBlock == endBlock) {
3126  assert(currentTailIndex == newTailIndex);
3127  break;
3128  }
3129  this->tailBlock = this->tailBlock->next;
3130  }
3131  this->tailIndex.store(newTailIndex, std::memory_order_release);
3132  return true;
3133  }
3134 
3135  template <typename It>
3136  size_t dequeue_bulk(It &itemFirst, size_t max) {
3137  auto tail = this->tailIndex.load(std::memory_order_relaxed);
3138  auto overcommit = this->dequeueOvercommit.load(std::memory_order_relaxed);
3139  auto desiredCount = static_cast<size_t>(
3140  tail - (this->dequeueOptimisticCount.load(std::memory_order_relaxed) -
3141  overcommit));
3142  if (details::circular_less_than<size_t>(0, desiredCount)) {
3143  desiredCount = desiredCount < max ? desiredCount : max;
3144  std::atomic_thread_fence(std::memory_order_acquire);
3145 
3146  auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
3147  desiredCount, std::memory_order_relaxed);
3148 
3149  tail = this->tailIndex.load(std::memory_order_acquire);
3150  auto actualCount =
3151  static_cast<size_t>(tail - (myDequeueCount - overcommit));
3152  if (details::circular_less_than<size_t>(0, actualCount)) {
3153  actualCount = desiredCount < actualCount ? desiredCount : actualCount;
3154  if (actualCount < desiredCount) {
3155  this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
3157  }
3158 
3159  // Get the first index. Note that since there's guaranteed to be at
3160  // least actualCount elements, this will never exceed tail.
3161  auto firstIndex =
3162  this->headIndex.fetch_add(actualCount, std::memory_order_acq_rel);
3163 
3164  // Iterate the blocks and dequeue
3165  auto index = firstIndex;
3166  BlockIndexHeader *localBlockIndex;
3167  auto indexIndex =
3168  get_block_index_index_for_index(index, localBlockIndex);
3169  do {
3170  auto blockStartIndex = index;
3171  auto endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
3172  static_cast<index_t>(BLOCK_SIZE);
3173  endIndex =
3174  details::circular_less_than<index_t>(
3175  firstIndex + static_cast<index_t>(actualCount), endIndex)
3176  ? firstIndex + static_cast<index_t>(actualCount)
3177  : endIndex;
3178 
3179  auto entry = localBlockIndex->index[indexIndex];
3180  auto block = entry->value.load(std::memory_order_relaxed);
3181  if (MOODYCAMEL_NOEXCEPT_ASSIGN(T, T &&,
3182  details::deref_noexcept(itemFirst) =
3183  std::move((*(*block)[index])))) {
3184  while (index != endIndex) {
3185  auto &el = *((*block)[index]);
3186  *itemFirst++ = std::move(el);
3187  el.~T();
3188  ++index;
3189  }
3190  } else {
3191  MOODYCAMEL_TRY {
3192  while (index != endIndex) {
3193  auto &el = *((*block)[index]);
3194  *itemFirst = std::move(el);
3195  ++itemFirst;
3196  el.~T();
3197  ++index;
3198  }
3199  }
3200  MOODYCAMEL_CATCH(...) {
3201  do {
3202  entry = localBlockIndex->index[indexIndex];
3203  block = entry->value.load(std::memory_order_relaxed);
3204  while (index != endIndex) {
3205  (*block)[index++]->~T();
3206  }
3207 
3208  if (block->ConcurrentQueue::Block::template set_many_empty<
3209  implicit_context>(
3210  blockStartIndex,
3211  static_cast<size_t>(endIndex - blockStartIndex))) {
3212 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3213  debug::DebugLock lock(mutex);
3214 #endif
3215  entry->value.store(nullptr, std::memory_order_relaxed);
3216  this->parent->add_block_to_free_list(block);
3217  }
3218  indexIndex =
3219  (indexIndex + 1) & (localBlockIndex->capacity - 1);
3220 
3221  blockStartIndex = index;
3222  endIndex = (index & ~static_cast<index_t>(BLOCK_SIZE - 1)) +
3223  static_cast<index_t>(BLOCK_SIZE);
3224  endIndex =
3225  details::circular_less_than<index_t>(
3226  firstIndex + static_cast<index_t>(actualCount),
3227  endIndex)
3228  ? firstIndex + static_cast<index_t>(actualCount)
3229  : endIndex;
3230  } while (index != firstIndex + actualCount);
3231 
3233  }
3234  }
3235  if (block->ConcurrentQueue::Block::template set_many_empty<
3236  implicit_context>(
3237  blockStartIndex,
3238  static_cast<size_t>(endIndex - blockStartIndex))) {
3239  {
3240 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3241  debug::DebugLock lock(mutex);
3242 #endif
3243  // Note that the set_many_empty above did a release, meaning
3244  // that anybody who acquires the block we're about to free can
3245  // use it safely since our writes (and reads!) will have
3246  // happened-before then.
3247  entry->value.store(nullptr, std::memory_order_relaxed);
3248  }
3249  this->parent->add_block_to_free_list(
3250  block); // releases the above store
3251  }
3252  indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3253  } while (index != firstIndex + actualCount);
3254 
3255  return actualCount;
3256  } else {
3257  this->dequeueOvercommit.fetch_add(desiredCount,
3259  }
3260  }
3261 
3262  return 0;
3263  }
3264 
3265  private:
3266  // The block size must be > 1, so any number with the low bit set is an
3267  // invalid block base index
3268  static const index_t INVALID_BLOCK_BASE = 1;
3269 
3270  struct BlockIndexEntry {
3271  std::atomic<index_t> key;
3272  std::atomic<Block *> value;
3273  };
3274 
3275  struct BlockIndexHeader {
3276  size_t capacity;
3277  std::atomic<size_t> tail;
3278  BlockIndexEntry *entries;
3279  BlockIndexEntry **index;
3280  BlockIndexHeader *prev;
3281  };
3282 
3283  template <AllocationMode allocMode>
3284  inline bool insert_block_index_entry(BlockIndexEntry *&idxEntry,
3285  index_t blockStartIndex) {
3286  auto localBlockIndex =
3287  blockIndex.load(std::memory_order_relaxed); // We're the only writer
3288  // thread, relaxed is OK
3289  if (localBlockIndex == nullptr) {
3290  return false; // this can happen if new_block_index failed in the
3291  // constructor
3292  }
3293  auto newTail =
3294  (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) &
3295  (localBlockIndex->capacity - 1);
3296  idxEntry = localBlockIndex->index[newTail];
3297  if (idxEntry->key.load(std::memory_order_relaxed) == INVALID_BLOCK_BASE ||
3298  idxEntry->value.load(std::memory_order_relaxed) == nullptr) {
3299  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3300  localBlockIndex->tail.store(newTail, std::memory_order_release);
3301  return true;
3302  }
3303 
3304  // No room in the old block index, try to allocate another one!
3305  if (allocMode == CannotAlloc || !new_block_index()) {
3306  return false;
3307  }
3308  localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3309  newTail = (localBlockIndex->tail.load(std::memory_order_relaxed) + 1) &
3310  (localBlockIndex->capacity - 1);
3311  idxEntry = localBlockIndex->index[newTail];
3312  assert(idxEntry->key.load(std::memory_order_relaxed) ==
3313  INVALID_BLOCK_BASE);
3314  idxEntry->key.store(blockStartIndex, std::memory_order_relaxed);
3315  localBlockIndex->tail.store(newTail, std::memory_order_release);
3316  return true;
3317  }
3318 
3319  inline void rewind_block_index_tail() {
3320  auto localBlockIndex = blockIndex.load(std::memory_order_relaxed);
3321  localBlockIndex->tail.store(
3322  (localBlockIndex->tail.load(std::memory_order_relaxed) - 1) &
3323  (localBlockIndex->capacity - 1),
3325  }
3326 
3327  inline BlockIndexEntry *get_block_index_entry_for_index(
3328  index_t index) const {
3329  BlockIndexHeader *localBlockIndex;
3330  auto idx = get_block_index_index_for_index(index, localBlockIndex);
3331  return localBlockIndex->index[idx];
3332  }
3333 
3334  inline size_t get_block_index_index_for_index(
3335  index_t index, BlockIndexHeader *&localBlockIndex) const {
3336 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3337  debug::DebugLock lock(mutex);
3338 #endif
3339  index &= ~static_cast<index_t>(BLOCK_SIZE - 1);
3340  localBlockIndex = blockIndex.load(std::memory_order_acquire);
3341  auto tail = localBlockIndex->tail.load(std::memory_order_acquire);
3342  auto tailBase =
3343  localBlockIndex->index[tail]->key.load(std::memory_order_relaxed);
3344  assert(tailBase != INVALID_BLOCK_BASE);
3345  // Note: Must use division instead of shift because the index may wrap
3346  // around, causing a negative offset, whose negativity we want to preserve
3347  auto offset = static_cast<size_t>(
3348  static_cast<typename std::make_signed<index_t>::type>(index -
3349  tailBase) /
3350  BLOCK_SIZE);
3351  size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
3352  assert(localBlockIndex->index[idx]->key.load(std::memory_order_relaxed) ==
3353  index &&
3354  localBlockIndex->index[idx]->value.load(
3355  std::memory_order_relaxed) != nullptr);
3356  return idx;
3357  }
3358 
3359  bool new_block_index() {
3360  auto prev = blockIndex.load(std::memory_order_relaxed);
3361  size_t prevCapacity = prev == nullptr ? 0 : prev->capacity;
3362  auto entryCount = prev == nullptr ? nextBlockIndexCapacity : prevCapacity;
3363  auto raw = static_cast<char *>((Traits::malloc)(
3364  sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value -
3365  1 + sizeof(BlockIndexEntry) * entryCount +
3366  std::alignment_of<BlockIndexEntry *>::value - 1 +
3367  sizeof(BlockIndexEntry *) * nextBlockIndexCapacity));
3368  if (raw == nullptr) {
3369  return false;
3370  }
3371 
3372  auto header = new (raw) BlockIndexHeader;
3373  auto entries = reinterpret_cast<BlockIndexEntry *>(
3374  details::align_for<BlockIndexEntry>(raw + sizeof(BlockIndexHeader)));
3375  auto index = reinterpret_cast<BlockIndexEntry **>(
3376  details::align_for<BlockIndexEntry *>(
3377  reinterpret_cast<char *>(entries) +
3378  sizeof(BlockIndexEntry) * entryCount));
3379  if (prev != nullptr) {
3380  auto prevTail = prev->tail.load(std::memory_order_relaxed);
3381  auto prevPos = prevTail;
3382  size_t i = 0;
3383  do {
3384  prevPos = (prevPos + 1) & (prev->capacity - 1);
3385  index[i++] = prev->index[prevPos];
3386  } while (prevPos != prevTail);
3387  assert(i == prevCapacity);
3388  }
3389  for (size_t i = 0; i != entryCount; ++i) {
3390  new (entries + i) BlockIndexEntry;
3391  entries[i].key.store(INVALID_BLOCK_BASE, std::memory_order_relaxed);
3392  index[prevCapacity + i] = entries + i;
3393  }
3394  header->prev = prev;
3395  header->entries = entries;
3396  header->index = index;
3397  header->capacity = nextBlockIndexCapacity;
3398  header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1),
3400 
3401  blockIndex.store(header, std::memory_order_release);
3402 
3403  nextBlockIndexCapacity <<= 1;
3404 
3405  return true;
3406  }
3407 
3408  private:
3409  size_t nextBlockIndexCapacity;
3410  std::atomic<BlockIndexHeader *> blockIndex;
3411 
3412 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3413  public:
3414  details::ThreadExitListener threadExitListener;
3415 
3416  private:
3417 #endif
3418 
3419 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3420  public:
3421  ImplicitProducer *nextImplicitProducer;
3422 
3423  private:
3424 #endif
3425 
3426 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3427  mutable debug::DebugMutex mutex;
3428 #endif
3429 #if MCDBGQ_TRACKMEM
3430  friend struct MemStats;
3431 #endif
3432  };
3433 
3435  // Block pool manipulation
3437 
3438  void populate_initial_block_list(size_t blockCount) {
3439  initialBlockPoolSize = blockCount;
3440  if (initialBlockPoolSize == 0) {
3441  initialBlockPool = nullptr;
3442  return;
3443  }
3444 
3445  initialBlockPool = create_array<Block>(blockCount);
3446  if (initialBlockPool == nullptr) {
3447  initialBlockPoolSize = 0;
3448  }
3449  for (size_t i = 0; i < initialBlockPoolSize; ++i) {
3450  initialBlockPool[i].dynamicallyAllocated = false;
3451  }
3452  }
3453 
3454  inline Block *try_get_block_from_initial_pool() {
3455  if (initialBlockPoolIndex.load(std::memory_order_relaxed) >=
3456  initialBlockPoolSize) {
3457  return nullptr;
3458  }
3459 
3460  auto index = initialBlockPoolIndex.fetch_add(1, std::memory_order_relaxed);
3461 
3462  return index < initialBlockPoolSize ? (initialBlockPool + index) : nullptr;
3463  }
3464 
3465  inline void add_block_to_free_list(Block *block) {
3466 #if MCDBGQ_TRACKMEM
3467  block->owner = nullptr;
3468 #endif
3469  freeList.add(block);
3470  }
3471 
3472  inline void add_blocks_to_free_list(Block *block) {
3473  while (block != nullptr) {
3474  auto next = block->next;
3475  add_block_to_free_list(block);
3476  block = next;
3477  }
3478  }
3479 
3480  inline Block *try_get_block_from_free_list() { return freeList.try_get(); }
3481 
3482  // Gets a free block from one of the memory pools, or allocates a new one (if
3483  // applicable)
3484  template <AllocationMode canAlloc>
3485  Block *requisition_block() {
3486  auto block = try_get_block_from_initial_pool();
3487  if (block != nullptr) {
3488  return block;
3489  }
3490 
3491  block = try_get_block_from_free_list();
3492  if (block != nullptr) {
3493  return block;
3494  }
3495 
3496  if (canAlloc == CanAlloc) {
3497  return create<Block>();
3498  }
3499 
3500  return nullptr;
3501  }
3502 
3503 #if MCDBGQ_TRACKMEM
3504  public:
3505  struct MemStats {
3506  size_t allocatedBlocks;
3507  size_t usedBlocks;
3508  size_t freeBlocks;
3509  size_t ownedBlocksExplicit;
3510  size_t ownedBlocksImplicit;
3511  size_t implicitProducers;
3512  size_t explicitProducers;
3513  size_t elementsEnqueued;
3514  size_t blockClassBytes;
3515  size_t queueClassBytes;
3516  size_t implicitBlockIndexBytes;
3517  size_t explicitBlockIndexBytes;
3518 
3519  friend class ConcurrentQueue;
3520 
3521  private:
3522  static MemStats getFor(ConcurrentQueue *q) {
3523  MemStats stats = {0};
3524 
3525  stats.elementsEnqueued = q->size_approx();
3526 
3527  auto block = q->freeList.head_unsafe();
3528  while (block != nullptr) {
3529  ++stats.allocatedBlocks;
3530  ++stats.freeBlocks;
3531  block = block->freeListNext.load(std::memory_order_relaxed);
3532  }
3533 
3534  for (auto ptr = q->producerListTail.load(std::memory_order_acquire);
3535  ptr != nullptr; ptr = ptr->next_prod()) {
3536  bool implicit = dynamic_cast<ImplicitProducer *>(ptr) != nullptr;
3537  stats.implicitProducers += implicit ? 1 : 0;
3538  stats.explicitProducers += implicit ? 0 : 1;
3539 
3540  if (implicit) {
3541  auto prod = static_cast<ImplicitProducer *>(ptr);
3542  stats.queueClassBytes += sizeof(ImplicitProducer);
3543  auto head = prod->headIndex.load(std::memory_order_relaxed);
3544  auto tail = prod->tailIndex.load(std::memory_order_relaxed);
3545  auto hash = prod->blockIndex.load(std::memory_order_relaxed);
3546  if (hash != nullptr) {
3547  for (size_t i = 0; i != hash->capacity; ++i) {
3548  if (hash->index[i]->key.load(std::memory_order_relaxed) !=
3549  ImplicitProducer::INVALID_BLOCK_BASE &&
3550  hash->index[i]->value.load(std::memory_order_relaxed) !=
3551  nullptr) {
3552  ++stats.allocatedBlocks;
3553  ++stats.ownedBlocksImplicit;
3554  }
3555  }
3556  stats.implicitBlockIndexBytes +=
3557  hash->capacity *
3558  sizeof(typename ImplicitProducer::BlockIndexEntry);
3559  for (; hash != nullptr; hash = hash->prev) {
3560  stats.implicitBlockIndexBytes +=
3561  sizeof(typename ImplicitProducer::BlockIndexHeader) +
3562  hash->capacity *
3563  sizeof(typename ImplicitProducer::BlockIndexEntry *);
3564  }
3565  }
3566  for (; details::circular_less_than<index_t>(head, tail);
3567  head += BLOCK_SIZE) {
3568  // auto block = prod->get_block_index_entry_for_index(head);
3569  ++stats.usedBlocks;
3570  }
3571  } else {
3572  auto prod = static_cast<ExplicitProducer *>(ptr);
3573  stats.queueClassBytes += sizeof(ExplicitProducer);
3574  auto tailBlock = prod->tailBlock;
3575  bool wasNonEmpty = false;
3576  if (tailBlock != nullptr) {
3577  auto block = tailBlock;
3578  do {
3579  ++stats.allocatedBlocks;
3580  if (!block->ConcurrentQueue::Block::template is_empty<
3581  explicit_context>() ||
3582  wasNonEmpty) {
3583  ++stats.usedBlocks;
3584  wasNonEmpty = wasNonEmpty || block != tailBlock;
3585  }
3586  ++stats.ownedBlocksExplicit;
3587  block = block->next;
3588  } while (block != tailBlock);
3589  }
3590  auto index = prod->blockIndex.load(std::memory_order_relaxed);
3591  while (index != nullptr) {
3592  stats.explicitBlockIndexBytes +=
3593  sizeof(typename ExplicitProducer::BlockIndexHeader) +
3594  index->size *
3595  sizeof(typename ExplicitProducer::BlockIndexEntry);
3596  index = static_cast<typename ExplicitProducer::BlockIndexHeader *>(
3597  index->prev);
3598  }
3599  }
3600  }
3601 
3602  auto freeOnInitialPool =
3603  q->initialBlockPoolIndex.load(std::memory_order_relaxed) >=
3604  q->initialBlockPoolSize
3605  ? 0
3606  : q->initialBlockPoolSize -
3607  q->initialBlockPoolIndex.load(std::memory_order_relaxed);
3608  stats.allocatedBlocks += freeOnInitialPool;
3609  stats.freeBlocks += freeOnInitialPool;
3610 
3611  stats.blockClassBytes = sizeof(Block) * stats.allocatedBlocks;
3612  stats.queueClassBytes += sizeof(ConcurrentQueue);
3613 
3614  return stats;
3615  }
3616  };
3617 
3618  // For debugging only. Not thread-safe.
3619  MemStats getMemStats() { return MemStats::getFor(this); }
3620 
3621  private:
3622  friend struct MemStats;
3623 #endif
3624 
3626  // Producer list manipulation
3628 
3629  ProducerBase *recycle_or_create_producer(bool isExplicit) {
3630  bool recycled;
3631  return recycle_or_create_producer(isExplicit, recycled);
3632  }
3633 
3634  ProducerBase *recycle_or_create_producer(bool isExplicit, bool &recycled) {
3635 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3636  debug::DebugLock lock(implicitProdMutex);
3637 #endif
3638  // Try to re-use one first
3639  for (auto ptr = producerListTail.load(std::memory_order_acquire);
3640  ptr != nullptr; ptr = ptr->next_prod()) {
3641  if (ptr->inactive.load(std::memory_order_relaxed) &&
3642  ptr->isExplicit == isExplicit) {
3643  bool expected = true;
3644  if (ptr->inactive.compare_exchange_strong(
3645  expected, /* desired */
3647  // We caught one! It's been marked as activated, the caller can have
3648  // it
3649  recycled = true;
3650  return ptr;
3651  }
3652  }
3653  }
3654 
3655  recycled = false;
3656  return add_producer(
3657  isExplicit ? static_cast<ProducerBase *>(create<ExplicitProducer>(this))
3658  : create<ImplicitProducer>(this));
3659  }
3660 
3661  ProducerBase *add_producer(ProducerBase *producer) {
3662  // Handle failed memory allocation
3663  if (producer == nullptr) {
3664  return nullptr;
3665  }
3666 
3667  producerCount.fetch_add(1, std::memory_order_relaxed);
3668 
3669  // Add it to the lock-free list
3670  auto prevTail = producerListTail.load(std::memory_order_relaxed);
3671  do {
3672  producer->next = prevTail;
3673  } while (!producerListTail.compare_exchange_weak(
3674  prevTail, producer, std::memory_order_release,
3676 
3677 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3678  if (producer->isExplicit) {
3679  auto prevTailExplicit = explicitProducers.load(std::memory_order_relaxed);
3680  do {
3681  static_cast<ExplicitProducer *>(producer)->nextExplicitProducer =
3682  prevTailExplicit;
3683  } while (!explicitProducers.compare_exchange_weak(
3684  prevTailExplicit, static_cast<ExplicitProducer *>(producer),
3686  } else {
3687  auto prevTailImplicit = implicitProducers.load(std::memory_order_relaxed);
3688  do {
3689  static_cast<ImplicitProducer *>(producer)->nextImplicitProducer =
3690  prevTailImplicit;
3691  } while (!implicitProducers.compare_exchange_weak(
3692  prevTailImplicit, static_cast<ImplicitProducer *>(producer),
3694  }
3695 #endif
3696 
3697  return producer;
3698  }
3699 
3700  void reown_producers() {
3701  // After another instance is moved-into/swapped-with this one, all the
3702  // producers we stole still think their parents are the other queue.
3703  // So fix them up!
3704  for (auto ptr = producerListTail.load(std::memory_order_relaxed);
3705  ptr != nullptr; ptr = ptr->next_prod()) {
3706  ptr->parent = this;
3707  }
3708  }
3709 
3711  // Implicit producer hash
3713 
3714  struct ImplicitProducerKVP {
3715  std::atomic<details::thread_id_t> key;
3716  ImplicitProducer *value; // No need for atomicity since it's only read by
3717  // the thread that sets it in the first place
3718 
3719  ImplicitProducerKVP() : value(nullptr) {}
3720 
3721  ImplicitProducerKVP(ImplicitProducerKVP &&other) MOODYCAMEL_NOEXCEPT {
3722  key.store(other.key.load(std::memory_order_relaxed),
3724  value = other.value;
3725  }
3726 
3727  inline ImplicitProducerKVP &operator=(ImplicitProducerKVP &&other)
3729  swap(other);
3730  return *this;
3731  }
3732 
3733  inline void swap(ImplicitProducerKVP &other) MOODYCAMEL_NOEXCEPT {
3734  if (this != &other) {
3735  details::swap_relaxed(key, other.key);
3736  std::swap(value, other.value);
3737  }
3738  }
3739  };
3740 
3741  template <typename XT, typename XTraits>
3742  friend void moodycamel::swap(
3746 
3747  struct ImplicitProducerHash {
3748  size_t capacity;
3749  ImplicitProducerKVP *entries;
3750  ImplicitProducerHash *prev;
3751  };
3752 
3753  inline void populate_initial_implicit_producer_hash() {
3754  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3755 
3756  implicitProducerHashCount.store(0, std::memory_order_relaxed);
3757  auto hash = &initialImplicitProducerHash;
3758  hash->capacity = INITIAL_IMPLICIT_PRODUCER_HASH_SIZE;
3759  hash->entries = &initialImplicitProducerHashEntries[0];
3760  for (size_t i = 0; i != INITIAL_IMPLICIT_PRODUCER_HASH_SIZE; ++i) {
3761  initialImplicitProducerHashEntries[i].key.store(
3763  }
3764  hash->prev = nullptr;
3765  implicitProducerHash.store(hash, std::memory_order_relaxed);
3766  }
3767 
3768  void swap_implicit_producer_hashes(ConcurrentQueue &other) {
3769  if (INITIAL_IMPLICIT_PRODUCER_HASH_SIZE == 0) return;
3770 
3771  // Swap (assumes our implicit producer hash is initialized)
3772  initialImplicitProducerHashEntries.swap(
3773  other.initialImplicitProducerHashEntries);
3774  initialImplicitProducerHash.entries =
3775  &initialImplicitProducerHashEntries[0];
3776  other.initialImplicitProducerHash.entries =
3777  &other.initialImplicitProducerHashEntries[0];
3778 
3779  details::swap_relaxed(implicitProducerHashCount,
3780  other.implicitProducerHashCount);
3781 
3782  details::swap_relaxed(implicitProducerHash, other.implicitProducerHash);
3783  if (implicitProducerHash.load(std::memory_order_relaxed) ==
3784  &other.initialImplicitProducerHash) {
3785  implicitProducerHash.store(&initialImplicitProducerHash,
3787  } else {
3788  ImplicitProducerHash *hash;
3789  for (hash = implicitProducerHash.load(std::memory_order_relaxed);
3790  hash->prev != &other.initialImplicitProducerHash;
3791  hash = hash->prev) {
3792  continue;
3793  }
3794  hash->prev = &initialImplicitProducerHash;
3795  }
3796  if (other.implicitProducerHash.load(std::memory_order_relaxed) ==
3797  &initialImplicitProducerHash) {
3798  other.implicitProducerHash.store(&other.initialImplicitProducerHash,
3800  } else {
3801  ImplicitProducerHash *hash;
3802  for (hash = other.implicitProducerHash.load(std::memory_order_relaxed);
3803  hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3804  continue;
3805  }
3806  hash->prev = &other.initialImplicitProducerHash;
3807  }
3808  }
3809 
3810  // Only fails (returns nullptr) if memory allocation fails
3811  ImplicitProducer *get_or_add_implicit_producer() {
3812  // Note that since the data is essentially thread-local (key is thread ID),
3813  // there's a reduced need for fences (memory ordering is already consistent
3814  // for any individual thread), except for the current table itself.
3815 
3816  // Start by looking for the thread ID in the current and all previous hash
3817  // tables. If it's not found, it must not be in there yet, since this same
3818  // thread would have added it previously to one of the tables that we
3819  // traversed.
3820 
3821  // Code and algorithm adapted from
3822  // http://preshing.com/20130605/the-worlds-simplest-lock-free-hash-table
3823 
3824 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3825  debug::DebugLock lock(implicitProdMutex);
3826 #endif
3827 
3828  auto id = details::thread_id();
3829  auto hashedId = details::hash_thread_id(id);
3830 
3831  auto mainHash = implicitProducerHash.load(std::memory_order_acquire);
3832  for (auto hash = mainHash; hash != nullptr; hash = hash->prev) {
3833  // Look for the id in this hash
3834  auto index = hashedId;
3835  while (true) { // Not an infinite loop because at least one slot is free
3836  // in the hash table
3837  index &= hash->capacity - 1;
3838 
3839  auto probedKey =
3840  hash->entries[index].key.load(std::memory_order_relaxed);
3841  if (probedKey == id) {
3842  // Found it! If we had to search several hashes deep, though, we
3843  // should lazily add it to the current main hash table to avoid the
3844  // extended search next time. Note there's guaranteed to be room in
3845  // the current hash table since every subsequent table implicitly
3846  // reserves space for all previous tables (there's only one
3847  // implicitProducerHashCount).
3848  auto value = hash->entries[index].value;
3849  if (hash != mainHash) {
3850  index = hashedId;
3851  while (true) {
3852  index &= mainHash->capacity - 1;
3853  probedKey =
3854  mainHash->entries[index].key.load(std::memory_order_relaxed);
3855  auto empty = details::invalid_thread_id;
3856 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3857  auto reusable = details::invalid_thread_id2;
3858  if ((probedKey == empty &&
3859  mainHash->entries[index].key.compare_exchange_strong(
3860  empty, id, std::memory_order_relaxed,
3862  (probedKey == reusable &&
3863  mainHash->entries[index].key.compare_exchange_strong(
3864  reusable, id, std::memory_order_acquire,
3866 #else
3867  if ((probedKey == empty &&
3868  mainHash->entries[index].key.compare_exchange_strong(
3869  empty, id, std::memory_order_relaxed,
3871 #endif
3872  mainHash->entries[index].value = value;
3873  break;
3874  }
3875  ++index;
3876  }
3877  }
3878 
3879  return value;
3880  }
3881  if (probedKey == details::invalid_thread_id) {
3882  break; // Not in this hash table
3883  }
3884  ++index;
3885  }
3886  }
3887 
3888  // Insert!
3889  auto newCount =
3890  1 + implicitProducerHashCount.fetch_add(1, std::memory_order_relaxed);
3891  while (true) {
3892  if (newCount >= (mainHash->capacity >> 1) &&
3893  !implicitProducerHashResizeInProgress.test_and_set(
3895  // We've acquired the resize lock, try to allocate a bigger hash table.
3896  // Note the acquire fence synchronizes with the release fence at the end
3897  // of this block, and hence when we reload implicitProducerHash it must
3898  // be the most recent version (it only gets changed within this locked
3899  // block).
3900  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3901  if (newCount >= (mainHash->capacity >> 1)) {
3902  auto newCapacity = mainHash->capacity << 1;
3903  while (newCount >= (newCapacity >> 1)) {
3904  newCapacity <<= 1;
3905  }
3906  auto raw = static_cast<char *>(
3907  (Traits::malloc)(sizeof(ImplicitProducerHash) +
3908  std::alignment_of<ImplicitProducerKVP>::value -
3909  1 + sizeof(ImplicitProducerKVP) * newCapacity));
3910  if (raw == nullptr) {
3911  // Allocation failed
3912  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3913  implicitProducerHashResizeInProgress.clear(
3915  return nullptr;
3916  }
3917 
3918  auto newHash = new (raw) ImplicitProducerHash;
3919  newHash->capacity = newCapacity;
3920  newHash->entries = reinterpret_cast<ImplicitProducerKVP *>(
3921  details::align_for<ImplicitProducerKVP>(
3922  raw + sizeof(ImplicitProducerHash)));
3923  for (size_t i = 0; i != newCapacity; ++i) {
3924  new (newHash->entries + i) ImplicitProducerKVP;
3925  newHash->entries[i].key.store(details::invalid_thread_id,
3927  }
3928  newHash->prev = mainHash;
3929  implicitProducerHash.store(newHash, std::memory_order_release);
3930  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3931  mainHash = newHash;
3932  } else {
3933  implicitProducerHashResizeInProgress.clear(std::memory_order_release);
3934  }
3935  }
3936 
3937  // If it's < three-quarters full, add to the old one anyway so that we
3938  // don't have to wait for the next table to finish being allocated by
3939  // another thread (and if we just finished allocating above, the condition
3940  // will always be true)
3941  if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3942  bool recycled;
3943  auto producer = static_cast<ImplicitProducer *>(
3944  recycle_or_create_producer(false, recycled));
3945  if (producer == nullptr) {
3946  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3947  return nullptr;
3948  }
3949  if (recycled) {
3950  implicitProducerHashCount.fetch_sub(1, std::memory_order_relaxed);
3951  }
3952 
3953 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3954  producer->threadExitListener.callback =
3955  &ConcurrentQueue::implicit_producer_thread_exited_callback;
3956  producer->threadExitListener.userData = producer;
3957  details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3958 #endif
3959 
3960  auto index = hashedId;
3961  while (true) {
3962  index &= mainHash->capacity - 1;
3963  auto probedKey =
3964  mainHash->entries[index].key.load(std::memory_order_relaxed);
3965 
3966  auto empty = details::invalid_thread_id;
3967 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3968  auto reusable = details::invalid_thread_id2;
3969  if ((probedKey == empty &&
3970  mainHash->entries[index].key.compare_exchange_strong(
3971  empty, id, std::memory_order_relaxed,
3973  (probedKey == reusable &&
3974  mainHash->entries[index].key.compare_exchange_strong(
3975  reusable, id, std::memory_order_acquire,
3977 #else
3978  if ((probedKey == empty &&
3979  mainHash->entries[index].key.compare_exchange_strong(
3980  empty, id, std::memory_order_relaxed,
3982 #endif
3983  mainHash->entries[index].value = producer;
3984  break;
3985  }
3986  ++index;
3987  }
3988  return producer;
3989  }
3990 
3991  // Hmm, the old hash is quite full and somebody else is busy allocating a
3992  // new one. We need to wait for the allocating thread to finish (if it
3993  // succeeds, we add, if not, we try to allocate ourselves).
3994  mainHash = implicitProducerHash.load(std::memory_order_acquire);
3995  }
3996  } // namespace moodycamel
3997 
3998 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3999  void implicit_producer_thread_exited(ImplicitProducer *producer) {
4000  // Remove from thread exit listeners
4001  details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
4002 
4003  // Remove from hash
4004 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
4005  debug::DebugLock lock(implicitProdMutex);
4006 #endif
4007  auto hash = implicitProducerHash.load(std::memory_order_acquire);
4008  assert(hash != nullptr); // The thread exit listener is only registered if
4009  // we were added to a hash in the first place
4010  auto id = details::thread_id();
4011  auto hashedId = details::hash_thread_id(id);
4012  details::thread_id_t probedKey;
4013 
4014  // We need to traverse all the hashes just in case other threads aren't on
4015  // the current one yet and are trying to add an entry thinking there's a
4016  // free slot (because they reused a producer)
4017  for (; hash != nullptr; hash = hash->prev) {
4018  auto index = hashedId;
4019  do {
4020  index &= hash->capacity - 1;
4021  probedKey = hash->entries[index].key.load(std::memory_order_relaxed);
4022  if (probedKey == id) {
4023  hash->entries[index].key.store(details::invalid_thread_id2,
4025  break;
4026  }
4027  ++index;
4028  } while (
4029  probedKey !=
4030  details::invalid_thread_id); // Can happen if the hash has changed
4031  // but we weren't put back in it yet, or
4032  // if we weren't added to this hash in
4033  // the first place
4034  }
4035 
4036  // Mark the queue as being recyclable
4037  producer->inactive.store(true, std::memory_order_release);
4038  }
4039 
4040  static void implicit_producer_thread_exited_callback(void *userData) {
4041  auto producer = static_cast<ImplicitProducer *>(userData);
4042  auto queue = producer->parent;
4043  queue->implicit_producer_thread_exited(producer);
4044  }
4045 #endif
4046 
4048  // Utility functions
4050 
4051  template <typename U>
4052  static inline U *create_array(size_t count) {
4053  assert(count > 0);
4054  auto p = static_cast<U *>((Traits::malloc)(sizeof(U) * count));
4055  if (p == nullptr) {
4056  return nullptr;
4057  }
4058 
4059  for (size_t i = 0; i != count; ++i) {
4060  new (p + i) U();
4061  }
4062  return p;
4063  }
4064 
4065  template <typename U>
4066  static inline void destroy_array(U *p, size_t count) {
4067  if (p != nullptr) {
4068  assert(count > 0);
4069  for (size_t i = count; i != 0;) {
4070  (p + --i)->~U();
4071  }
4072  (Traits::free)(p);
4073  }
4074  }
4075 
4076  template <typename U>
4077  static inline U *create() {
4078  auto p = (Traits::malloc)(sizeof(U));
4079  return p != nullptr ? new (p) U : nullptr;
4080  }
4081 
4082  template <typename U, typename A1>
4083  static inline U *create(A1 &&a1) {
4084  auto p = (Traits::malloc)(sizeof(U));
4085  return p != nullptr ? new (p) U(std::forward<A1>(a1)) : nullptr;
4086  }
4087 
4088  template <typename U>
4089  static inline void destroy(U *p) {
4090  if (p != nullptr) {
4091  p->~U();
4092  }
4093  (Traits::free)(p);
4094  }
4095 
4096  private:
4097  std::atomic<ProducerBase *> producerListTail;
4098  std::atomic<std::uint32_t> producerCount;
4099 
4100  std::atomic<size_t> initialBlockPoolIndex;
4101  Block *initialBlockPool;
4102  size_t initialBlockPoolSize;
4103 
4104 #if !MCDBGQ_USEDEBUGFREELIST
4105  FreeList<Block> freeList;
4106 #else
4107  debug::DebugFreeList<Block> freeList;
4108 #endif
4109 
4110  std::atomic<ImplicitProducerHash *> implicitProducerHash;
4111  std::atomic<size_t>
4112  implicitProducerHashCount; // Number of slots logically used
4113  ImplicitProducerHash initialImplicitProducerHash;
4114  std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE>
4115  initialImplicitProducerHashEntries;
4116  std::atomic_flag implicitProducerHashResizeInProgress;
4117 
4118  std::atomic<std::uint32_t> nextExplicitConsumerId;
4119  std::atomic<std::uint32_t> globalExplicitConsumerOffset;
4120 
4121 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
4122  debug::DebugMutex implicitProdMutex;
4123 #endif
4124 
4125 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
4126  std::atomic<ExplicitProducer *> explicitProducers;
4127  std::atomic<ImplicitProducer *> implicitProducers;
4128 #endif
4129 };
4130 
4131 template <typename T, typename Traits>
4132 ProducerToken::ProducerToken(ConcurrentQueue<T, Traits> &queue)
4133  : producer(queue.recycle_or_create_producer(true)) {
4134  if (producer != nullptr) {
4135  producer->token = this;
4136  }
4137 }
4138 
4139 template <typename T, typename Traits>
4141  : producer(reinterpret_cast<ConcurrentQueue<T, Traits> *>(&queue)
4142  ->recycle_or_create_producer(true)) {
4143  if (producer != nullptr) {
4144  producer->token = this;
4145  }
4146 }
4147 
4148 template <typename T, typename Traits>
4150  : itemsConsumedFromCurrent(0),
4151  currentProducer(nullptr),
4152  desiredProducer(nullptr) {
4153  initialOffset =
4154  queue.nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
4155  lastKnownGlobalOffset = -1;
4156 }
4157 
4158 template <typename T, typename Traits>
4160  : itemsConsumedFromCurrent(0),
4161  currentProducer(nullptr),
4162  desiredProducer(nullptr) {
4163  initialOffset =
4164  reinterpret_cast<ConcurrentQueue<T, Traits> *>(&queue)
4165  ->nextExplicitConsumerId.fetch_add(1, std::memory_order_release);
4166  lastKnownGlobalOffset = -1;
4167 }
4168 
4169 template <typename T, typename Traits>
4170 inline void swap(ConcurrentQueue<T, Traits> &a,
4172  a.swap(b);
4173 }
4174 
4176  a.swap(b);
4177 }
4178 
4180  a.swap(b);
4181 }
4182 
4183 template <typename T, typename Traits>
4184 inline void swap(typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &a,
4187  a.swap(b);
4188 }
4189 
4190 } // namespace moodycamel
4191 
4192 #if defined(__GNUC__)
4193 #pragma GCC diagnostic pop
4194 #endif
Definition: blockingconcurrentqueue.h:373
Definition: concurrentqueue.h:802
bool try_enqueue(producer_token_t const &token, T &&item)
Definition: concurrentqueue.h:1164
bool enqueue(producer_token_t const &token, T const &item)
Definition: concurrentqueue.h:1098
bool enqueue(producer_token_t const &token, T &&item)
Definition: concurrentqueue.h:1106
ConcurrentQueue & operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
bool try_dequeue(U &item)
Definition: concurrentqueue.h:1197
ConcurrentQueue(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
Definition: concurrentqueue.h:1395
static const size_t MAX_SUBQUEUE_SIZE
Definition: concurrentqueue.h:828
ConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
Definition: concurrentqueue.h:886
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: concurrentqueue.h:1187
bool enqueue(T &&item)
Definition: concurrentqueue.h:1089
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: concurrentqueue.h:817
bool enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1118
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
Definition: concurrentqueue.h:1331
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
Definition: concurrentqueue.h:1383
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: concurrentqueue.h:1130
bool try_dequeue(consumer_token_t &token, U &item)
Definition: concurrentqueue.h:1257
friend class ConcurrentQueueTests
Definition: concurrentqueue.h:1436
::moodycamel::ProducerToken producer_token_t
Definition: concurrentqueue.h:804
friend struct ExplicitProducer
Definition: concurrentqueue.h:1431
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:1038
size_t try_dequeue_bulk(It itemFirst, size_t max)
Definition: concurrentqueue.h:1313
friend struct ConsumerToken
Definition: concurrentqueue.h:1430
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: concurrentqueue.h:819
ConcurrentQueue & operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:1028
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:815
bool try_enqueue(T const &item)
Definition: concurrentqueue.h:1139
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: concurrentqueue.h:811
friend struct ImplicitProducer
Definition: concurrentqueue.h:1433
Traits::size_t size_t
Definition: concurrentqueue.h:808
::moodycamel::ConsumerToken consumer_token_t
Definition: concurrentqueue.h:805
bool enqueue(T const &item)
Definition: concurrentqueue.h:1078
static bool is_lock_free()
Definition: concurrentqueue.h:1418
friend struct ProducerToken
Definition: concurrentqueue.h:1429
bool try_enqueue(producer_token_t const &token, T const &item)
Definition: concurrentqueue.h:1157
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
Definition: concurrentqueue.h:910
Traits::index_t index_t
Definition: concurrentqueue.h:807
bool try_dequeue_non_interleaved(U &item)
Definition: concurrentqueue.h:1242
~ConcurrentQueue()
Definition: concurrentqueue.h:933
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:813
static const size_t BLOCK_SIZE
Definition: concurrentqueue.h:810
ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:987
size_t size_approx() const
Definition: concurrentqueue.h:1406
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1176
bool try_enqueue(T &&item)
Definition: concurrentqueue.h:1149
#define MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:252
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
Definition: concurrentqueue.h:253
#define MOODYCAMEL_THREADLOCAL
Definition: concurrentqueue.h:174
#define MOODYCAMEL_DELETE_FUNCTION
Definition: concurrentqueue.h:289
#define MOODYCAMEL_CATCH(...)
Definition: concurrentqueue.h:201
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
Definition: concurrentqueue.h:254
#define MOODYCAMEL_RETHROW
Definition: concurrentqueue.h:202
#define MOODYCAMEL_TRY
Definition: concurrentqueue.h:200
static const thread_id_t invalid_thread_id2
Definition: concurrentqueue.h:180
long long y
Definition: concurrentqueue.h:338
static bool circular_less_than(T a, T b)
Definition: concurrentqueue.h:501
static bool() likely(bool x)
Definition: concurrentqueue.h:302
static void swap_relaxed(std::atomic< T > &left, std::atomic< T > &right)
Definition: concurrentqueue.h:546
std::max_align_t std_max_align_t
Definition: concurrentqueue.h:329
static auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
Definition: concurrentqueue.h:575
static const thread_id_t invalid_thread_id
Definition: concurrentqueue.h:179
static char * align_for(char *ptr)
Definition: concurrentqueue.h:519
static size_t hash_thread_id(thread_id_t id)
Definition: concurrentqueue.h:490
void * z
Definition: concurrentqueue.h:339
static thread_id_t thread_id()
Definition: concurrentqueue.h:183
static bool() unlikely(bool x)
Definition: concurrentqueue.h:303
static T const & nomove(T const &x)
Definition: concurrentqueue.h:554
std_max_align_t x
Definition: concurrentqueue.h:337
std::uintptr_t thread_id_t
Definition: concurrentqueue.h:178
static T ceil_to_pow_2(T x)
Definition: concurrentqueue.h:527
Definition: concurrentqueue.h:336
Definition: atomicops.h:69
void swap(BlockingConcurrentQueue< T, Traits > &a, BlockingConcurrentQueue< T, Traits > &b) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:929
@ memory_order_acq_rel
Definition: atomicops.h:75
@ memory_order_acquire
Definition: atomicops.h:73
@ memory_order_relaxed
Definition: atomicops.h:72
@ memory_order_release
Definition: atomicops.h:74
void swap(ConsumerToken &a, ConsumerToken &b) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:4180
Definition: concurrentqueue.h:349
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: concurrentqueue.h:394
static void free(void *ptr)
Definition: concurrentqueue.h:424
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: concurrentqueue.h:399
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:382
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:387
static void * malloc(size_t size)
Definition: concurrentqueue.h:422
static const size_t BLOCK_SIZE
Definition: concurrentqueue.h:370
std::size_t index_t
Definition: concurrentqueue.h:363
static const size_t MAX_SUBQUEUE_SIZE
Definition: concurrentqueue.h:406
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: concurrentqueue.h:377
std::size_t size_t
Definition: concurrentqueue.h:351
Definition: concurrentqueue.h:748
ConsumerToken & operator=(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
ConsumerToken(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
ConsumerToken(ConcurrentQueue< T, Traits > &q)
Definition: concurrentqueue.h:4150
ConsumerToken & operator=(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:762
friend class ConcurrentQueueTests
Definition: concurrentqueue.h:783
ConsumerToken(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:755
void swap(ConsumerToken &other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:767
Definition: concurrentqueue.h:687
bool valid() const
Definition: concurrentqueue.h:725
~ProducerToken()
Definition: concurrentqueue.h:727
friend class ConcurrentQueueTests
Definition: concurrentqueue.h:742
ProducerToken(ConcurrentQueue< T, Traits > &queue)
Definition: concurrentqueue.h:4133
ProducerToken & operator=(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:702
details::ConcurrentQueueProducerTypelessBase * producer
Definition: concurrentqueue.h:745
ProducerToken(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
void swap(ProducerToken &other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:707
ProducerToken(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:694
ProducerToken & operator=(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
std::atomic< bool > inactive
Definition: concurrentqueue.h:453
ConcurrentQueueProducerTypelessBase * next
Definition: concurrentqueue.h:452
ProducerToken * token
Definition: concurrentqueue.h:454
ConcurrentQueueProducerTypelessBase()
Definition: concurrentqueue.h:456
static std::uint64_t hash(std::uint64_t h)
Definition: concurrentqueue.h:478
Definition: concurrentqueue.h:461
static std::uint32_t hash(std::uint32_t h)
Definition: concurrentqueue.h:462
Definition: concurrentqueue.h:315
static const T value
Definition: concurrentqueue.h:318
Definition: concurrentqueue.h:488
Definition: concurrentqueue.h:582
static auto eval(U &&x) -> decltype(std::forward< U >(x))
Definition: concurrentqueue.h:569
Definition: concurrentqueue.h:559
static T const & eval(T const &x)
Definition: concurrentqueue.h:561
Definition: concurrentqueue.h:651
@ value
Definition: concurrentqueue.h:652
Definition: concurrentqueue.h:676
Definition: concurrentqueue.h:80
thread_id_t thread_id_numeric_size_t
Definition: concurrentqueue.h:81
thread_id_t thread_id_hash_t
Definition: concurrentqueue.h:82
static thread_id_hash_t prehash(thread_id_t const &x)
Definition: concurrentqueue.h:84