38 #pragma GCC diagnostic push
39 #pragma GCC diagnostic ignored "-Wconversion"
41 #ifdef MCDBGQ_USE_RELACY
42 #pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
46 #if defined(__APPLE__)
47 #include "TargetConditionals.h"
50 #ifdef MCDBGQ_USE_RELACY
51 #include "relacy/relacy_std.hpp"
52 #include "relacy_shims.h"
72 #include <type_traits>
79 template <
typename thread_
id_t>
88 #if defined(MCDBGQ_USE_RELACY)
97 #elif defined(_WIN32) || defined(__WINDOWS__) || defined(__WIN32__)
100 extern "C" __declspec(dllimport)
unsigned long __stdcall GetCurrentThreadId(
104 static_assert(
sizeof(
unsigned long) ==
sizeof(std::uint32_t),
105 "Expected size of unsigned long to be 32 bits on Windows");
114 return static_cast<thread_id_t>(::GetCurrentThreadId());
118 #elif defined(__arm__) || defined(_M_ARM) || defined(__aarch64__) || \
119 (defined(__APPLE__) && TARGET_OS_IPHONE)
122 static_assert(
sizeof(std::thread::id) == 4 ||
sizeof(std::thread::id) == 8,
123 "std::thread::id is expected to be either 4 or 8 bytes");
133 template <std::
size_t>
134 struct thread_id_size {};
136 struct thread_id_size<4> {
137 typedef std::uint32_t numeric_t;
140 struct thread_id_size<8> {
141 typedef std::uint64_t numeric_t;
146 typedef thread_id_size<
sizeof(
thread_id_t)>::numeric_t
156 return std::hash<std::thread::id>()(x);
168 #if defined(__GNUC__) || defined(__INTEL_COMPILER)
169 #define MOODYCAMEL_THREADLOCAL __thread
170 #elif defined(_MSC_VER)
171 #define MOODYCAMEL_THREADLOCAL __declspec(thread)
174 #define MOODYCAMEL_THREADLOCAL thread_local
192 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
193 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || \
194 (defined(__GNUC__) && defined(__EXCEPTIONS)) || \
195 (!defined(_MSC_VER) && !defined(__GNUC__))
196 #define MOODYCAMEL_EXCEPTIONS_ENABLED
199 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
200 #define MOODYCAMEL_TRY try
201 #define MOODYCAMEL_CATCH(...) catch (__VA_ARGS__)
202 #define MOODYCAMEL_RETHROW throw
203 #define MOODYCAMEL_THROW(expr) throw(expr)
205 #define MOODYCAMEL_TRY if (true)
206 #define MOODYCAMEL_CATCH(...) else if (false)
207 #define MOODYCAMEL_RETHROW
208 #define MOODYCAMEL_THROW(expr)
211 #ifndef MOODYCAMEL_NOEXCEPT
212 #if !defined(MOODYCAMEL_EXCEPTIONS_ENABLED)
213 #define MOODYCAMEL_NOEXCEPT
214 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) true
215 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) true
216 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1800
220 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
221 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) \
222 (std::is_rvalue_reference<valueType>::value && \
223 std::is_move_constructible<type>::value \
224 ? std::is_trivially_move_constructible<type>::value \
225 : std::is_trivially_copy_constructible<type>::value)
226 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) \
227 ((std::is_rvalue_reference<valueType>::value && \
228 std::is_move_assignable<type>::value \
229 ? std::is_trivially_move_assignable<type>::value || \
230 std::is_nothrow_move_assignable<type>::value \
231 : std::is_trivially_copy_assignable<type>::value || \
232 std::is_nothrow_copy_assignable<type>::value) && \
233 MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
234 #elif defined(_MSC_VER) && defined(_NOEXCEPT) && _MSC_VER < 1900
235 #define MOODYCAMEL_NOEXCEPT _NOEXCEPT
236 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) \
237 (std::is_rvalue_reference<valueType>::value && \
238 std::is_move_constructible<type>::value \
239 ? std::is_trivially_move_constructible<type>::value || \
240 std::is_nothrow_move_constructible<type>::value \
241 : std::is_trivially_copy_constructible<type>::value || \
242 std::is_nothrow_copy_constructible<type>::value)
243 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) \
244 ((std::is_rvalue_reference<valueType>::value && \
245 std::is_move_assignable<type>::value \
246 ? std::is_trivially_move_assignable<type>::value || \
247 std::is_nothrow_move_assignable<type>::value \
248 : std::is_trivially_copy_assignable<type>::value || \
249 std::is_nothrow_copy_assignable<type>::value) && \
250 MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr))
252 #define MOODYCAMEL_NOEXCEPT noexcept
253 #define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr) noexcept(expr)
254 #define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr) noexcept(expr)
258 #ifndef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
259 #ifdef MCDBGQ_USE_RELACY
260 #define MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
267 #if (!defined(_MSC_VER) || _MSC_VER >= 1900) && \
268 (!defined(__MINGW32__) && !defined(__MINGW64__) || \
269 !defined(__WINPTHREADS_VERSION)) && \
270 (!defined(__GNUC__) || __GNUC__ > 4 || \
271 (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)) && \
272 (!defined(__APPLE__) || !TARGET_OS_IPHONE) && !defined(__arm__) && \
273 !defined(_M_ARM) && !defined(__aarch64__)
285 #ifndef MOODYCAMEL_DELETE_FUNCTION
286 #if defined(_MSC_VER) && _MSC_VER < 1800
287 #define MOODYCAMEL_DELETE_FUNCTION
289 #define MOODYCAMEL_DELETE_FUNCTION = delete
296 #if defined(__GNUC__)
298 static inline bool(
likely)(
bool x) {
return __builtin_expect((x),
true); }
300 static inline bool(
unlikely)(
bool x) {
return __builtin_expect((x),
false); }
302 static inline bool(
likely)(
bool x) {
return x; }
308 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
309 #include "internal/concurrentqueue_internal_debug.h"
314 template <
typename T>
316 static_assert(std::is_integral<T>::value,
317 "const_numeric_max can only be used with integers");
319 std::numeric_limits<T>::is_signed
320 ? (
static_cast<T
>(1) << (
sizeof(T) * CHAR_BIT - 1)) -
322 :
static_cast<T
>(-1);
325 #if defined(__GLIBCXX__)
326 typedef ::max_align_t
409 #ifndef MCDBGQ_USE_RELACY
413 #if defined(malloc) || defined(free)
416 static inline void *WORKAROUND_malloc(
size_t size) {
return malloc(size); }
417 static inline void WORKAROUND_free(
void *ptr) {
return free(ptr); }
418 static inline void *(
malloc)(
size_t size) {
return WORKAROUND_malloc(size); }
419 static inline void(
free)(
void *ptr) {
return WORKAROUND_free(ptr); }
422 static inline void *
malloc(
size_t size) {
return std::malloc(size); }
424 static inline void free(
void *ptr) {
return std::free(ptr); }
429 static inline void *
malloc(
size_t size) {
return rl::rl_malloc(size, $); }
430 static inline void free(
void *ptr) {
return rl::rl_free(ptr, $); }
441 struct ProducerToken;
442 struct ConsumerToken;
444 template <
typename T,
typename Traits>
445 class ConcurrentQueue;
446 template <
typename T,
typename Traits>
447 class BlockingConcurrentQueue;
448 class ConcurrentQueueTests;
460 template <
bool use32>
462 static inline std::uint32_t
hash(std::uint32_t h) {
472 return h ^ (h >> 16);
478 static inline std::uint64_t
hash(std::uint64_t h) {
480 h *= 0xff51afd7ed558ccd;
482 h *= 0xc4ceb9fe1a85ec53;
483 return h ^ (h >> 33);
487 template <std::
size_t size>
493 "Expected a platform where thread IDs are at most 64-bit values");
494 return static_cast<size_t>(
500 template <
typename T>
503 #pragma warning(push)
504 #pragma warning(disable : 4554)
507 std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
508 "circular_less_than is intended to be used only with unsigned integer "
510 return static_cast<T
>(a - b) >
511 static_cast<T
>(
static_cast<T
>(1)
512 <<
static_cast<T
>(
sizeof(T) * CHAR_BIT - 1));
518 template <
typename U>
520 const std::size_t alignment = std::alignment_of<U>::value;
522 (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) %
526 template <
typename T>
529 std::is_integral<T>::value && !std::numeric_limits<T>::is_signed,
530 "ceil_to_pow_2 is intended to be used only with unsigned integer types");
538 for (std::size_t i = 1; i <
sizeof(T); i <<= 1) {
545 template <
typename T>
546 static inline void swap_relaxed(std::atomic<T> &left, std::atomic<T> &right) {
553 template <
typename T>
554 static inline T
const &
nomove(T
const &x) {
558 template <
bool Enable>
560 template <
typename T>
561 static inline T
const &
eval(T
const &x) {
568 template <
typename U>
569 static inline auto eval(U &&x) -> decltype(std::forward<U>(x)) {
570 return std::forward<U>(x);
574 template <
typename It>
579 #if defined(__clang__) || !defined(__GNUC__) || __GNUC__ > 4 || \
580 (__GNUC__ == 4 && __GNUC_MINOR__ >= 8)
581 template <
typename T>
584 template <
typename T>
588 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
589 #ifdef MCDBGQ_USE_RELACY
590 typedef RelacyThreadExitListener ThreadExitListener;
591 typedef RelacyThreadExitNotifier ThreadExitNotifier;
593 struct ThreadExitListener {
594 typedef void (*callback_t)(
void *);
598 ThreadExitListener *next;
601 class ThreadExitNotifier {
603 static void subscribe(ThreadExitListener *listener) {
604 auto &tlsInst = instance();
605 listener->next = tlsInst.tail;
606 tlsInst.tail = listener;
609 static void unsubscribe(ThreadExitListener *listener) {
610 auto &tlsInst = instance();
611 ThreadExitListener **prev = &tlsInst.tail;
612 for (
auto ptr = tlsInst.tail; ptr !=
nullptr; ptr = ptr->next) {
613 if (ptr == listener) {
622 ThreadExitNotifier() : tail(nullptr) {}
624 ThreadExitNotifier &operator=(ThreadExitNotifier
const &)
627 ~ThreadExitNotifier() {
629 assert(
this == &instance() &&
630 "If this assert fails, you likely have a buggy compiler! Change the "
631 "preprocessor conditions such that "
632 "MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED is no longer defined.");
633 for (
auto ptr = tail; ptr !=
nullptr; ptr = ptr->next) {
634 ptr->callback(ptr->userData);
639 static inline ThreadExitNotifier &instance() {
640 static thread_local ThreadExitNotifier notifier;
645 ThreadExitListener *tail;
650 template <
typename T>
656 enum {
value = ATOMIC_CHAR_LOCK_FREE };
660 enum {
value = ATOMIC_SHORT_LOCK_FREE };
664 enum {
value = ATOMIC_INT_LOCK_FREE };
668 enum {
value = ATOMIC_LONG_LOCK_FREE };
672 enum {
value = ATOMIC_LLONG_LOCK_FREE };
674 template <
typename T>
679 enum {
value = ATOMIC_BOOL_LOCK_FREE };
681 template <
typename U>
683 enum { value = ATOMIC_POINTER_LOCK_FREE };
688 template <
typename T,
typename Traits>
691 template <
typename T,
typename Traits>
696 other.producer =
nullptr;
712 if (other.producer !=
nullptr) {
739 template <
typename T,
typename Traits>
749 template <
typename T,
typename Traits>
752 template <
typename T,
typename Traits>
756 : initialOffset(other.initialOffset),
757 lastKnownGlobalOffset(other.lastKnownGlobalOffset),
758 itemsConsumedFromCurrent(other.itemsConsumedFromCurrent),
759 currentProducer(other.currentProducer),
760 desiredProducer(other.desiredProducer) {}
768 std::swap(initialOffset, other.initialOffset);
769 std::swap(lastKnownGlobalOffset, other.lastKnownGlobalOffset);
770 std::swap(itemsConsumedFromCurrent, other.itemsConsumedFromCurrent);
771 std::swap(currentProducer, other.currentProducer);
772 std::swap(desiredProducer, other.desiredProducer);
780 template <
typename T,
typename Traits>
786 std::uint32_t initialOffset;
787 std::uint32_t lastKnownGlobalOffset;
788 std::uint32_t itemsConsumedFromCurrent;
796 template <
typename T,
typename Traits>
801 template <
typename T,
typename Traits = ConcurrentQueueDefaultTraits>
810 static const size_t BLOCK_SIZE =
static_cast<size_t>(Traits::BLOCK_SIZE);
812 static_cast<size_t>(Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD);
814 static_cast<size_t>(Traits::EXPLICIT_INITIAL_INDEX_SIZE);
816 static_cast<size_t>(Traits::IMPLICIT_INITIAL_INDEX_SIZE);
818 static_cast<size_t>(Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE);
820 static_cast<std::uint32_t
>(
821 Traits::EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE);
823 #pragma warning(push)
824 #pragma warning(disable : 4307)
826 #pragma warning(disable : 4309)
830 static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) <
833 : ((
static_cast<size_t>(Traits::MAX_SUBQUEUE_SIZE) +
840 static_assert(!std::numeric_limits<size_t>::is_signed &&
841 std::is_integral<size_t>::value,
842 "Traits::size_t must be an unsigned integral type");
843 static_assert(!std::numeric_limits<index_t>::is_signed &&
844 std::is_integral<index_t>::value,
845 "Traits::index_t must be an unsigned integral type");
846 static_assert(
sizeof(
index_t) >=
sizeof(
size_t),
847 "Traits::index_t must be at least as wide as Traits::size_t");
849 "Traits::BLOCK_SIZE must be a power of 2 (and at least 2)");
853 "Traits::EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD must be a "
854 "power of 2 (and greater than 1)");
858 "Traits::EXPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and "
863 "Traits::IMPLICIT_INITIAL_INDEX_SIZE must be a power of 2 (and "
869 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be a power of 2");
872 "Traits::INITIAL_IMPLICIT_PRODUCER_HASH_SIZE must be at least "
873 "1 (or 0 to disable implicit enqueueing)");
887 : producerListTail(nullptr),
889 initialBlockPoolIndex(0),
890 nextExplicitConsumerId(0),
891 globalExplicitConsumerOffset(0) {
893 populate_initial_implicit_producer_hash();
894 populate_initial_block_list(capacity /
BLOCK_SIZE +
895 ((capacity & (
BLOCK_SIZE - 1)) == 0 ? 0 : 1));
897 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
911 size_t maxImplicitProducers)
912 : producerListTail(nullptr),
914 initialBlockPoolIndex(0),
915 nextExplicitConsumerId(0),
916 globalExplicitConsumerOffset(0) {
918 populate_initial_implicit_producer_hash();
920 (maxExplicitProducers + 1) +
921 2 * (maxExplicitProducers + maxImplicitProducers);
922 populate_initial_block_list(blocks);
924 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
936 while (ptr !=
nullptr) {
937 auto next = ptr->next_prod();
938 if (ptr->token !=
nullptr) {
939 ptr->token->producer =
nullptr;
948 while (hash !=
nullptr) {
949 auto prev = hash->prev;
950 if (prev !=
nullptr) {
952 for (
size_t i = 0; i != hash->capacity; ++i) {
953 hash->entries[i].~ImplicitProducerKVP();
955 hash->~ImplicitProducerHash();
956 (Traits::free)(hash);
963 auto block = freeList.head_unsafe();
964 while (block !=
nullptr) {
966 if (block->dynamicallyAllocated) {
973 destroy_array(initialBlockPool, initialBlockPoolSize);
991 initialBlockPoolIndex(
993 initialBlockPool(other.initialBlockPool),
994 initialBlockPoolSize(other.initialBlockPoolSize),
995 freeList(std::move(other.freeList)),
996 nextExplicitConsumerId(
998 globalExplicitConsumerOffset(other.globalExplicitConsumerOffset.load(
1002 populate_initial_implicit_producer_hash();
1003 swap_implicit_producer_hashes(other);
1010 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1011 explicitProducers.store(
1015 implicitProducers.store(
1022 other.initialBlockPoolSize = 0;
1023 other.initialBlockPool =
nullptr;
1030 return swap_internal(other);
1039 swap_internal(other);
1044 if (
this == &other) {
1051 std::swap(initialBlockPool, other.initialBlockPool);
1052 std::swap(initialBlockPoolSize, other.initialBlockPoolSize);
1053 freeList.swap(other.freeList);
1056 other.globalExplicitConsumerOffset);
1058 swap_implicit_producer_hashes(other);
1061 other.reown_producers();
1063 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
1080 return inner_enqueue<CanAlloc>(item);
1091 return inner_enqueue<CanAlloc>(std::move(item));
1099 return inner_enqueue<CanAlloc>(token, item);
1107 return inner_enqueue<CanAlloc>(token, std::move(item));
1117 template <
typename It>
1120 return inner_enqueue_bulk<CanAlloc>(itemFirst, count);
1129 template <
typename It>
1131 return inner_enqueue_bulk<CanAlloc>(token, itemFirst, count);
1141 return inner_enqueue<CannotAlloc>(item);
1151 return inner_enqueue<CannotAlloc>(std::move(item));
1158 return inner_enqueue<CannotAlloc>(token, item);
1165 return inner_enqueue<CannotAlloc>(token, std::move(item));
1175 template <
typename It>
1178 return inner_enqueue_bulk<CannotAlloc>(itemFirst, count);
1186 template <
typename It>
1189 return inner_enqueue_bulk<CannotAlloc>(token, itemFirst, count);
1196 template <
typename U>
1200 size_t nonEmptyCount = 0;
1201 ProducerBase *best =
nullptr;
1202 size_t bestSize = 0;
1204 nonEmptyCount < 3 && ptr != nullptr; ptr = ptr->next_prod()) {
1205 auto size = ptr->size_approx();
1207 if (size > bestSize) {
1218 if (nonEmptyCount > 0) {
1223 ptr !=
nullptr; ptr = ptr->next_prod()) {
1224 if (ptr != best && ptr->dequeue(item)) {
1241 template <
typename U>
1244 ptr !=
nullptr; ptr = ptr->next_prod()) {
1245 if (ptr->dequeue(item)) {
1256 template <
typename U>
1269 if (token.desiredProducer ==
nullptr ||
1270 token.lastKnownGlobalOffset !=
1272 if (!update_current_producer_after_rotation(token)) {
1280 if (
static_cast<ProducerBase *
>(token.currentProducer)->dequeue(item)) {
1281 if (++token.itemsConsumedFromCurrent ==
1289 auto ptr =
static_cast<ProducerBase *
>(token.currentProducer)->next_prod();
1290 if (ptr ==
nullptr) {
1293 while (ptr !=
static_cast<ProducerBase *
>(token.currentProducer)) {
1294 if (ptr->dequeue(item)) {
1295 token.currentProducer = ptr;
1296 token.itemsConsumedFromCurrent = 1;
1299 ptr = ptr->next_prod();
1300 if (ptr ==
nullptr) {
1312 template <
typename It>
1316 ptr !=
nullptr; ptr = ptr->next_prod()) {
1317 count += ptr->dequeue_bulk(itemFirst, max - count);
1330 template <
typename It>
1332 if (token.desiredProducer ==
nullptr ||
1333 token.lastKnownGlobalOffset !=
1335 if (!update_current_producer_after_rotation(token)) {
1340 size_t count =
static_cast<ProducerBase *
>(token.currentProducer)
1341 ->dequeue_bulk(itemFirst, max);
1343 if ((token.itemsConsumedFromCurrent +=
static_cast<std::uint32_t
>(max)) >=
1349 token.itemsConsumedFromCurrent +=
static_cast<std::uint32_t
>(count);
1353 auto ptr =
static_cast<ProducerBase *
>(token.currentProducer)->next_prod();
1354 if (ptr ==
nullptr) {
1357 while (ptr !=
static_cast<ProducerBase *
>(token.currentProducer)) {
1358 auto dequeued = ptr->dequeue_bulk(itemFirst, max);
1360 if (dequeued != 0) {
1361 token.currentProducer = ptr;
1362 token.itemsConsumedFromCurrent =
static_cast<std::uint32_t
>(dequeued);
1364 if (dequeued == max) {
1368 ptr = ptr->next_prod();
1369 if (ptr ==
nullptr) {
1382 template <
typename U>
1394 template <
typename It>
1396 It itemFirst,
size_t max) {
1398 ->dequeue_bulk(itemFirst, max);
1409 ptr !=
nullptr; ptr = ptr->next_prod()) {
1410 size += ptr->size_approx();
1438 enum AllocationMode { CanAlloc, CannotAlloc };
1444 template <AllocationMode canAlloc,
typename U>
1447 ->ConcurrentQueue::ExplicitProducer::template enqueue<canAlloc>(
1448 std::forward<U>(element));
1451 template <AllocationMode canAlloc,
typename U>
1452 inline bool inner_enqueue(U &&element) {
1453 auto producer = get_or_add_implicit_producer();
1454 return producer ==
nullptr
1456 : producer->ConcurrentQueue::ImplicitProducer::template
enqueue<
1457 canAlloc>(std::forward<U>(element));
1460 template <AllocationMode canAlloc,
typename It>
1461 inline bool inner_enqueue_bulk(
producer_token_t const &token, It itemFirst,
1464 ->ConcurrentQueue::ExplicitProducer::template enqueue_bulk<canAlloc>(
1468 template <AllocationMode canAlloc,
typename It>
1469 inline bool inner_enqueue_bulk(It itemFirst,
size_t count) {
1470 auto producer = get_or_add_implicit_producer();
1471 return producer ==
nullptr
1473 : producer->ConcurrentQueue::ImplicitProducer::
template enqueue_bulk<canAlloc>(itemFirst, count);
1476 inline bool update_current_producer_after_rotation(
consumer_token_t &token) {
1479 if (token.desiredProducer ==
nullptr && tail ==
nullptr) {
1490 std::uint32_t offset = prodCount - 1 - (token.initialOffset % prodCount);
1491 token.desiredProducer = tail;
1492 for (std::uint32_t i = 0; i != offset; ++i) {
1493 token.desiredProducer =
1494 static_cast<ProducerBase *
>(token.desiredProducer)->next_prod();
1495 if (token.desiredProducer ==
nullptr) {
1496 token.desiredProducer = tail;
1501 std::uint32_t delta = globalOffset - token.lastKnownGlobalOffset;
1502 if (delta >= prodCount) {
1503 delta = delta % prodCount;
1505 for (std::uint32_t i = 0; i != delta; ++i) {
1506 token.desiredProducer =
1507 static_cast<ProducerBase *
>(token.desiredProducer)->next_prod();
1508 if (token.desiredProducer ==
nullptr) {
1509 token.desiredProducer = tail;
1513 token.lastKnownGlobalOffset = globalOffset;
1514 token.currentProducer = token.desiredProducer;
1515 token.itemsConsumedFromCurrent = 0;
1523 template <
typename N>
1524 struct FreeListNode {
1525 FreeListNode() : freeListRefs(0), freeListNext(nullptr) {}
1527 std::atomic<std::uint32_t> freeListRefs;
1528 std::atomic<N *> freeListNext;
1535 template <
typename N>
1538 FreeList() : freeListHead(nullptr) {}
1540 FreeList(FreeList &&other)
1545 void swap(FreeList &other) {
1552 inline void add(N *node) {
1553 #if MCDBGQ_NOLOCKFREE_FREELIST
1554 debug::DebugLock lock(mutex);
1558 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST,
1562 add_knowing_refcount_is_zero(node);
1566 inline N *try_get() {
1567 #if MCDBGQ_NOLOCKFREE_FREELIST
1568 debug::DebugLock lock(mutex);
1571 while (head !=
nullptr) {
1572 auto prevHead = head;
1574 if ((refs & REFS_MASK) == 0 ||
1575 !head->freeListRefs.compare_exchange_strong(
1586 if (freeListHead.compare_exchange_strong(head, next,
1594 SHOULD_BE_ON_FREELIST) == 0);
1607 if (refs == SHOULD_BE_ON_FREELIST + 1) {
1608 add_knowing_refcount_is_zero(prevHead);
1617 N *head_unsafe()
const {
1622 inline void add_knowing_refcount_is_zero(N *node) {
1638 if (!freeListHead.compare_exchange_strong(head, node,
1643 if (node->freeListRefs.fetch_add(SHOULD_BE_ON_FREELIST - 1,
1655 std::atomic<N *> freeListHead;
1657 static const std::uint32_t REFS_MASK = 0x7FFFFFFF;
1658 static const std::uint32_t SHOULD_BE_ON_FREELIST = 0x80000000;
1660 #if MCDBGQ_NOLOCKFREE_FREELIST
1661 debug::DebugMutex mutex;
1669 enum InnerQueueContext { implicit_context = 0, explicit_context = 1 };
1674 elementsCompletelyDequeued(0),
1676 freeListNext(nullptr),
1677 shouldBeOnFreeList(false),
1678 dynamicallyAllocated(true) {
1684 template <InnerQueueContext context>
1685 inline bool is_empty()
const {
1686 if (context == explicit_context &&
1714 template <InnerQueueContext context>
1715 inline bool set_empty(
index_t i) {
1716 if (context == explicit_context &&
1720 static_cast<size_t>(
1724 static_cast<size_t>(i &
1740 template <InnerQueueContext context>
1741 inline bool set_many_empty(
index_t i,
size_t count) {
1742 if (context == explicit_context &&
1749 for (
size_t j = 0; j != count; ++j) {
1756 auto prevVal = elementsCompletelyDequeued.fetch_add(
1763 template <InnerQueueContext context>
1764 inline void set_all_empty() {
1765 if (context == explicit_context &&
1777 template <InnerQueueContext context>
1778 inline void reset_empty() {
1779 if (context == explicit_context &&
1792 return static_cast<T *
>(
static_cast<void *
>(elements)) +
1797 return static_cast<T
const *
>(
static_cast<void const *
>(elements)) +
1810 std::alignment_of<T>::value <=
1811 std::alignment_of<details::max_align_t>::value,
1812 "The queue does not support super-aligned types at this time");
1819 details::max_align_t dummy;
1824 std::atomic<size_t> elementsCompletelyDequeued;
1825 std::atomic<bool> emptyFlags
1829 std::atomic<std::uint32_t> freeListRefs;
1830 std::atomic<Block *> freeListNext;
1831 std::atomic<bool> shouldBeOnFreeList;
1832 bool dynamicallyAllocated;
1840 static_assert(std::alignment_of<Block>::value >=
1841 std::alignment_of<details::max_align_t>::value,
1842 "Internal error: Blocks must be at least as aligned as the "
1843 "type they are wrapping");
1856 struct ProducerBase :
public details::ConcurrentQueueProducerTypelessBase {
1860 dequeueOptimisticCount(0),
1861 dequeueOvercommit(0),
1863 isExplicit(isExplicit_),
1866 virtual ~ProducerBase(){};
1868 template <
typename U>
1869 inline bool dequeue(U &element) {
1877 template <
typename It>
1878 inline size_t dequeue_bulk(It &itemFirst,
size_t max) {
1888 inline ProducerBase *next_prod()
const {
1889 return static_cast<ProducerBase *
>(
next);
1896 ?
static_cast<size_t>(tail - head)
1900 inline index_t getTail()
const {
1905 std::atomic<index_t> tailIndex;
1906 std::atomic<index_t> headIndex;
1908 std::atomic<index_t> dequeueOptimisticCount;
1909 std::atomic<index_t> dequeueOvercommit;
1919 friend struct MemStats;
1929 : ProducerBase(parent, true),
1930 blockIndex(nullptr),
1931 pr_blockIndexSlotsUsed(0),
1933 pr_blockIndexFront(0),
1934 pr_blockIndexEntries(nullptr),
1935 pr_blockIndexRaw(nullptr) {
1936 size_t poolBasedIndexSize =
1938 if (poolBasedIndexSize > pr_blockIndexSize) {
1939 pr_blockIndexSize = poolBasedIndexSize;
1950 if (this->tailBlock !=
1953 Block *halfDequeuedBlock =
nullptr;
1959 size_t i = (pr_blockIndexFront - pr_blockIndexSlotsUsed) &
1960 (pr_blockIndexSize - 1);
1961 while (details::circular_less_than<index_t>(
1964 i = (i + 1) & (pr_blockIndexSize - 1);
1966 assert(details::circular_less_than<index_t>(
1967 pr_blockIndexEntries[i].base,
1969 halfDequeuedBlock = pr_blockIndexEntries[i].block;
1974 auto block = this->tailBlock;
1976 block = block->next;
1977 if (block->ConcurrentQueue::Block::template is_empty<
1978 explicit_context>()) {
1983 if (block == halfDequeuedBlock) {
1984 i =
static_cast<size_t>(
1991 auto lastValidIndex =
1995 :
static_cast<size_t>(
1999 (block != this->tailBlock || i != lastValidIndex)) {
2000 (*block)[i++]->~T();
2002 }
while (block != this->tailBlock);
2006 if (this->tailBlock !=
nullptr) {
2007 auto block = this->tailBlock;
2009 auto nextBlock = block->next;
2010 if (block->dynamicallyAllocated) {
2013 this->parent->add_block_to_free_list(block);
2016 }
while (block != this->tailBlock);
2020 auto header =
static_cast<BlockIndexHeader *
>(pr_blockIndexRaw);
2021 while (header !=
nullptr) {
2022 auto prev =
static_cast<BlockIndexHeader *
>(header->prev);
2023 header->~BlockIndexHeader();
2024 (Traits::free)(header);
2029 template <AllocationMode allocMode,
typename U>
2030 inline bool enqueue(U &&element) {
2033 index_t newTailIndex = 1 + currentTailIndex;
2036 auto startBlock = this->tailBlock;
2037 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2038 if (this->tailBlock !=
nullptr &&
2039 this->tailBlock->next->ConcurrentQueue::Block::template is_empty<
2040 explicit_context>()) {
2042 this->tailBlock = this->tailBlock->next;
2043 this->tailBlock->ConcurrentQueue::Block::template reset_empty<
2044 explicit_context>();
2059 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2060 if (!details::circular_less_than<index_t>(
2073 if (pr_blockIndexRaw ==
nullptr ||
2074 pr_blockIndexSlotsUsed == pr_blockIndexSize) {
2079 if (allocMode == CannotAlloc ||
2080 !new_block_index(pr_blockIndexSlotsUsed)) {
2088 ->ConcurrentQueue::template requisition_block<allocMode>();
2089 if (newBlock ==
nullptr) {
2093 newBlock->owner =
this;
2095 newBlock->ConcurrentQueue::Block::template reset_empty<
2096 explicit_context>();
2097 if (this->tailBlock ==
nullptr) {
2098 newBlock->next = newBlock;
2100 newBlock->next = this->tailBlock->next;
2101 this->tailBlock->next = newBlock;
2103 this->tailBlock = newBlock;
2104 ++pr_blockIndexSlotsUsed;
2108 T, U,
new (
nullptr) T(std::forward<U>(element)))) {
2112 new ((*this->tailBlock)[currentTailIndex])
2113 T(std::forward<U>(element));
2118 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2120 startBlock ==
nullptr ? this->tailBlock : startBlock;
2125 (void)originalBlockIndexSlotsUsed;
2130 ->entries[pr_blockIndexFront];
2131 entry.base = currentTailIndex;
2132 entry.block = this->tailBlock;
2135 pr_blockIndexFront = (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2138 T, U,
new (
nullptr) T(std::forward<U>(element)))) {
2145 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2151 template <
typename U>
2152 bool dequeue(U &element) {
2155 if (details::circular_less_than<index_t>(
2183 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
2205 myDequeueCount - overcommit, tail))) {
2223 auto localBlockIndexHead =
2230 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2231 auto blockBaseIndex = index & ~static_cast<index_t>(
BLOCK_SIZE - 1);
2232 auto offset =
static_cast<size_t>(
2233 static_cast<typename std::make_signed<index_t>::type
>(
2234 blockBaseIndex - headBase) /
2236 auto block = localBlockIndex
2237 ->entries[(localBlockIndexHead + offset) &
2238 (localBlockIndex->size - 1)]
2242 auto &el = *((*block)[index]);
2251 (*block)[index]->~T();
2252 block->ConcurrentQueue::Block::template set_empty<
2253 explicit_context>(index);
2255 } guard = {block, index};
2257 element = std::move(el);
2259 element = std::move(el);
2261 block->ConcurrentQueue::Block::template set_empty<explicit_context>(
2269 this->dequeueOvercommit.fetch_add(
2281 template <AllocationMode allocMode,
typename It>
2287 auto startBlock = this->tailBlock;
2288 auto originalBlockIndexFront = pr_blockIndexFront;
2289 auto originalBlockIndexSlotsUsed = pr_blockIndexSlotsUsed;
2291 Block *firstAllocatedBlock =
nullptr;
2294 size_t blockBaseDiff =
2295 ((startTailIndex + count - 1) &
2297 ((startTailIndex - 1) & ~static_cast<index_t>(
BLOCK_SIZE - 1));
2300 if (blockBaseDiff > 0) {
2302 while (blockBaseDiff > 0 && this->tailBlock !=
nullptr &&
2303 this->tailBlock->next != firstAllocatedBlock &&
2304 this->tailBlock->next->ConcurrentQueue::Block::template is_empty<
2305 explicit_context>()) {
2309 this->tailBlock = this->tailBlock->next;
2310 firstAllocatedBlock = firstAllocatedBlock ==
nullptr
2312 : firstAllocatedBlock;
2315 ->entries[pr_blockIndexFront];
2316 entry.base = currentTailIndex;
2317 entry.block = this->tailBlock;
2318 pr_blockIndexFront =
2319 (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2323 while (blockBaseDiff > 0) {
2328 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2330 !details::circular_less_than<index_t>(
2335 if (pr_blockIndexRaw ==
nullptr ||
2336 pr_blockIndexSlotsUsed == pr_blockIndexSize || full) {
2337 if (allocMode == CannotAlloc || full ||
2338 !new_block_index(originalBlockIndexSlotsUsed)) {
2340 pr_blockIndexFront = originalBlockIndexFront;
2341 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2343 startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2350 originalBlockIndexFront = originalBlockIndexSlotsUsed;
2356 ->ConcurrentQueue::template requisition_block<allocMode>();
2357 if (newBlock ==
nullptr) {
2358 pr_blockIndexFront = originalBlockIndexFront;
2359 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2361 startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2366 newBlock->owner =
this;
2368 newBlock->ConcurrentQueue::Block::template set_all_empty<
2369 explicit_context>();
2370 if (this->tailBlock ==
nullptr) {
2371 newBlock->next = newBlock;
2373 newBlock->next = this->tailBlock->next;
2374 this->tailBlock->next = newBlock;
2376 this->tailBlock = newBlock;
2377 firstAllocatedBlock = firstAllocatedBlock ==
nullptr
2379 : firstAllocatedBlock;
2381 ++pr_blockIndexSlotsUsed;
2384 ->entries[pr_blockIndexFront];
2385 entry.base = currentTailIndex;
2386 entry.block = this->tailBlock;
2387 pr_blockIndexFront =
2388 (pr_blockIndexFront + 1) & (pr_blockIndexSize - 1);
2393 auto block = firstAllocatedBlock;
2395 block->ConcurrentQueue::Block::template reset_empty<
2396 explicit_context>();
2397 if (block == this->tailBlock) {
2400 block = block->next;
2404 T, decltype(*itemFirst),
2407 ->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
2413 index_t newTailIndex = startTailIndex +
static_cast<index_t>(count);
2414 currentTailIndex = startTailIndex;
2415 auto endBlock = this->tailBlock;
2416 this->tailBlock = startBlock;
2418 firstAllocatedBlock !=
nullptr || count == 0);
2420 firstAllocatedBlock !=
nullptr) {
2421 this->tailBlock = firstAllocatedBlock;
2425 (currentTailIndex & ~static_cast<index_t>(
BLOCK_SIZE - 1)) +
2427 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
2428 stopIndex = newTailIndex;
2431 T, decltype(*itemFirst),
2433 while (currentTailIndex != stopIndex) {
2434 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
2438 while (currentTailIndex != stopIndex) {
2446 new ((*this->tailBlock)[currentTailIndex])
2448 T, decltype(*itemFirst),
2450 itemFirst)))>::eval(*itemFirst));
2459 auto constructedStopIndex = currentTailIndex;
2460 auto lastBlockEnqueued = this->tailBlock;
2462 pr_blockIndexFront = originalBlockIndexFront;
2463 pr_blockIndexSlotsUsed = originalBlockIndexSlotsUsed;
2465 startBlock ==
nullptr ? firstAllocatedBlock : startBlock;
2467 if (!details::is_trivially_destructible<T>::value) {
2468 auto block = startBlock;
2471 block = firstAllocatedBlock;
2473 currentTailIndex = startTailIndex;
2476 (currentTailIndex & ~static_cast<index_t>(
BLOCK_SIZE - 1)) +
2478 if (details::circular_less_than<index_t>(constructedStopIndex,
2480 stopIndex = constructedStopIndex;
2482 while (currentTailIndex != stopIndex) {
2483 (*block)[currentTailIndex++]->~T();
2485 if (block == lastBlockEnqueued) {
2488 block = block->next;
2495 if (this->tailBlock == endBlock) {
2496 assert(currentTailIndex == newTailIndex);
2499 this->tailBlock = this->tailBlock->next;
2503 T, decltype(*itemFirst),
2505 firstAllocatedBlock !=
nullptr) {
2507 ->front.store((pr_blockIndexFront - 1) & (pr_blockIndexSize - 1),
2515 template <
typename It>
2516 size_t dequeue_bulk(It &itemFirst,
size_t max) {
2519 auto desiredCount =
static_cast<size_t>(
2522 if (details::circular_less_than<size_t>(0, desiredCount)) {
2523 desiredCount = desiredCount < max ? desiredCount : max;
2526 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
2532 static_cast<size_t>(tail - (myDequeueCount - overcommit));
2533 if (details::circular_less_than<size_t>(0, actualCount)) {
2534 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
2535 if (actualCount < desiredCount) {
2536 this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
2547 auto localBlockIndexHead =
2550 auto headBase = localBlockIndex->entries[localBlockIndexHead].base;
2551 auto firstBlockBaseIndex =
2552 firstIndex & ~static_cast<index_t>(
BLOCK_SIZE - 1);
2553 auto offset =
static_cast<size_t>(
2554 static_cast<typename std::make_signed<index_t>::type
>(
2555 firstBlockBaseIndex - headBase) /
2558 (localBlockIndexHead + offset) & (localBlockIndex->size - 1);
2561 auto index = firstIndex;
2563 auto firstIndexInBlock = index;
2564 auto endIndex = (index & ~static_cast<index_t>(
BLOCK_SIZE - 1)) +
2567 details::circular_less_than<index_t>(
2568 firstIndex +
static_cast<index_t>(actualCount), endIndex)
2569 ? firstIndex +
static_cast<index_t>(actualCount)
2571 auto block = localBlockIndex->entries[indexIndex].block;
2574 std::move((*(*block)[index])))) {
2575 while (index != endIndex) {
2576 auto &el = *((*block)[index]);
2577 *itemFirst++ = std::move(el);
2583 while (index != endIndex) {
2584 auto &el = *((*block)[index]);
2585 *itemFirst = std::move(el);
2597 block = localBlockIndex->entries[indexIndex].block;
2598 while (index != endIndex) {
2599 (*block)[index++]->~T();
2601 block->ConcurrentQueue::Block::template set_many_empty<
2604 static_cast<size_t>(endIndex - firstIndexInBlock));
2605 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2607 firstIndexInBlock = index;
2608 endIndex = (index & ~static_cast<index_t>(
BLOCK_SIZE - 1)) +
2611 details::circular_less_than<index_t>(
2612 firstIndex +
static_cast<index_t>(actualCount),
2614 ? firstIndex +
static_cast<index_t>(actualCount)
2616 }
while (index != firstIndex + actualCount);
2621 block->ConcurrentQueue::Block::template set_many_empty<
2624 static_cast<size_t>(endIndex - firstIndexInBlock));
2625 indexIndex = (indexIndex + 1) & (localBlockIndex->size - 1);
2626 }
while (index != firstIndex + actualCount);
2632 this->dequeueOvercommit.fetch_add(desiredCount,
2641 struct BlockIndexEntry {
2646 struct BlockIndexHeader {
2650 BlockIndexEntry *entries;
2654 bool new_block_index(
size_t numberOfFilledSlotsToExpose) {
2655 auto prevBlockSizeMask = pr_blockIndexSize - 1;
2658 pr_blockIndexSize <<= 1;
2659 auto newRawPtr =
static_cast<char *
>((Traits::malloc)(
2660 sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value -
2661 1 +
sizeof(BlockIndexEntry) * pr_blockIndexSize));
2662 if (newRawPtr ==
nullptr) {
2663 pr_blockIndexSize >>= 1;
2667 auto newBlockIndexEntries =
reinterpret_cast<BlockIndexEntry *
>(
2668 details::align_for<BlockIndexEntry>(newRawPtr +
2669 sizeof(BlockIndexHeader)));
2673 if (pr_blockIndexSlotsUsed != 0) {
2675 (pr_blockIndexFront - pr_blockIndexSlotsUsed) & prevBlockSizeMask;
2677 newBlockIndexEntries[j++] = pr_blockIndexEntries[i];
2678 i = (i + 1) & prevBlockSizeMask;
2679 }
while (i != pr_blockIndexFront);
2683 auto header =
new (newRawPtr) BlockIndexHeader;
2684 header->size = pr_blockIndexSize;
2685 header->front.store(numberOfFilledSlotsToExpose - 1,
2687 header->entries = newBlockIndexEntries;
2688 header->prev = pr_blockIndexRaw;
2691 pr_blockIndexFront = j;
2692 pr_blockIndexEntries = newBlockIndexEntries;
2693 pr_blockIndexRaw = newRawPtr;
2700 std::atomic<BlockIndexHeader *> blockIndex;
2704 size_t pr_blockIndexSlotsUsed;
2705 size_t pr_blockIndexSize;
2706 size_t pr_blockIndexFront;
2707 BlockIndexEntry *pr_blockIndexEntries;
2708 void *pr_blockIndexRaw;
2710 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
2718 friend struct MemStats;
2728 : ProducerBase(parent, false),
2730 blockIndex(nullptr) {
2741 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
2744 details::ThreadExitNotifier::unsubscribe(&threadExitListener);
2751 Block *block =
nullptr;
2753 bool forceFreeLastBlock =
2756 while (index != tail) {
2759 if (block !=
nullptr) {
2761 this->parent->add_block_to_free_list(block);
2764 block = get_block_index_entry_for_index(index)->value.load(
2768 ((*block)[index])->~T();
2774 if (this->tailBlock !=
nullptr &&
2775 (forceFreeLastBlock ||
2777 this->parent->add_block_to_free_list(this->tailBlock);
2782 if (localBlockIndex !=
nullptr) {
2783 for (
size_t i = 0; i != localBlockIndex->capacity; ++i) {
2784 localBlockIndex->index[i]->~BlockIndexEntry();
2787 auto prev = localBlockIndex->prev;
2788 localBlockIndex->~BlockIndexHeader();
2789 (Traits::free)(localBlockIndex);
2790 localBlockIndex = prev;
2791 }
while (localBlockIndex !=
nullptr);
2795 template <AllocationMode allocMode,
typename U>
2796 inline bool enqueue(U &&element) {
2799 index_t newTailIndex = 1 + currentTailIndex;
2803 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2804 if (!details::circular_less_than<index_t>(
2811 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2812 debug::DebugLock lock(mutex);
2815 BlockIndexEntry *idxEntry;
2816 if (!insert_block_index_entry<allocMode>(idxEntry, currentTailIndex)) {
2823 ->ConcurrentQueue::template requisition_block<allocMode>();
2824 if (newBlock ==
nullptr) {
2825 rewind_block_index_tail();
2830 newBlock->owner =
this;
2833 ->ConcurrentQueue::Block::template reset_empty<implicit_context>();
2836 T, U,
new (
nullptr) T(std::forward<U>(element)))) {
2840 new ((*newBlock)[currentTailIndex]) T(std::forward<U>(element));
2843 rewind_block_index_tail();
2845 this->parent->add_block_to_free_list(newBlock);
2853 this->tailBlock = newBlock;
2856 T, U,
new (
nullptr) T(std::forward<U>(element)))) {
2863 new ((*this->tailBlock)[currentTailIndex]) T(std::forward<U>(element));
2869 template <
typename U>
2870 bool dequeue(U &element) {
2875 if (details::circular_less_than<index_t>(
2881 index_t myDequeueCount = this->dequeueOptimisticCount.fetch_add(
2885 myDequeueCount - overcommit, tail))) {
2890 auto entry = get_block_index_entry_for_index(index);
2894 auto &el = *((*block)[index]);
2897 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2901 debug::DebugLock lock(producer->mutex);
2906 BlockIndexEntry *entry;
2910 (*block)[index]->~T();
2911 if (block->ConcurrentQueue::Block::template set_empty<
2912 implicit_context>(index)) {
2914 parent->add_block_to_free_list(block);
2917 } guard = {block, index, entry, this->parent};
2919 element = std::move(el);
2921 element = std::move(el);
2924 if (block->ConcurrentQueue::Block::template set_empty<
2925 implicit_context>(index)) {
2927 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2928 debug::DebugLock lock(mutex);
2934 this->parent->add_block_to_free_list(
2948 template <AllocationMode allocMode,
typename It>
2961 auto startBlock = this->tailBlock;
2962 Block *firstAllocatedBlock =
nullptr;
2963 auto endBlock = this->tailBlock;
2966 size_t blockBaseDiff =
2967 ((startTailIndex + count - 1) &
2969 ((startTailIndex - 1) & ~static_cast<index_t>(
BLOCK_SIZE - 1));
2972 if (blockBaseDiff > 0) {
2973 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
2974 debug::DebugLock lock(mutex);
2981 BlockIndexEntry *idxEntry =
2985 bool indexInserted =
false;
2987 assert(!details::circular_less_than<index_t>(currentTailIndex, head));
2989 !details::circular_less_than<index_t>(
2995 !(indexInserted = insert_block_index_entry<allocMode>(
2996 idxEntry, currentTailIndex)) ||
2998 this->parent->ConcurrentQueue::template requisition_block<
2999 allocMode>()) ==
nullptr) {
3002 if (indexInserted) {
3003 rewind_block_index_tail();
3008 for (
auto block = firstAllocatedBlock; block !=
nullptr;
3009 block = block->next) {
3011 idxEntry = get_block_index_entry_for_index(currentTailIndex);
3013 rewind_block_index_tail();
3015 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
3016 this->tailBlock = startBlock;
3022 newBlock->owner =
this;
3024 newBlock->ConcurrentQueue::Block::template reset_empty<
3025 implicit_context>();
3026 newBlock->next =
nullptr;
3035 firstAllocatedBlock !=
nullptr) {
3036 assert(this->tailBlock !=
nullptr);
3037 this->tailBlock->next = newBlock;
3039 this->tailBlock = newBlock;
3040 endBlock = newBlock;
3041 firstAllocatedBlock =
3042 firstAllocatedBlock ==
nullptr ? newBlock : firstAllocatedBlock;
3043 }
while (blockBaseDiff > 0);
3047 index_t newTailIndex = startTailIndex +
static_cast<index_t>(count);
3048 currentTailIndex = startTailIndex;
3049 this->tailBlock = startBlock;
3051 firstAllocatedBlock !=
nullptr || count == 0);
3053 firstAllocatedBlock !=
nullptr) {
3054 this->tailBlock = firstAllocatedBlock;
3058 (currentTailIndex & ~static_cast<index_t>(
BLOCK_SIZE - 1)) +
3060 if (details::circular_less_than<index_t>(newTailIndex, stopIndex)) {
3061 stopIndex = newTailIndex;
3064 T, decltype(*itemFirst),
3066 while (currentTailIndex != stopIndex) {
3067 new ((*this->tailBlock)[currentTailIndex++]) T(*itemFirst++);
3071 while (currentTailIndex != stopIndex) {
3072 new ((*this->tailBlock)[currentTailIndex])
3074 T, decltype(*itemFirst),
3076 itemFirst)))>::eval(*itemFirst));
3082 auto constructedStopIndex = currentTailIndex;
3083 auto lastBlockEnqueued = this->tailBlock;
3085 if (!details::is_trivially_destructible<T>::value) {
3086 auto block = startBlock;
3089 block = firstAllocatedBlock;
3091 currentTailIndex = startTailIndex;
3094 (currentTailIndex & ~static_cast<index_t>(
BLOCK_SIZE - 1)) +
3096 if (details::circular_less_than<index_t>(constructedStopIndex,
3098 stopIndex = constructedStopIndex;
3100 while (currentTailIndex != stopIndex) {
3101 (*block)[currentTailIndex++]->~T();
3103 if (block == lastBlockEnqueued) {
3106 block = block->next;
3112 for (
auto block = firstAllocatedBlock; block !=
nullptr;
3113 block = block->next) {
3115 auto idxEntry = get_block_index_entry_for_index(currentTailIndex);
3117 rewind_block_index_tail();
3119 this->parent->add_blocks_to_free_list(firstAllocatedBlock);
3120 this->tailBlock = startBlock;
3125 if (this->tailBlock == endBlock) {
3126 assert(currentTailIndex == newTailIndex);
3129 this->tailBlock = this->tailBlock->next;
3135 template <
typename It>
3136 size_t dequeue_bulk(It &itemFirst,
size_t max) {
3139 auto desiredCount =
static_cast<size_t>(
3142 if (details::circular_less_than<size_t>(0, desiredCount)) {
3143 desiredCount = desiredCount < max ? desiredCount : max;
3146 auto myDequeueCount = this->dequeueOptimisticCount.fetch_add(
3151 static_cast<size_t>(tail - (myDequeueCount - overcommit));
3152 if (details::circular_less_than<size_t>(0, actualCount)) {
3153 actualCount = desiredCount < actualCount ? desiredCount : actualCount;
3154 if (actualCount < desiredCount) {
3155 this->dequeueOvercommit.fetch_add(desiredCount - actualCount,
3165 auto index = firstIndex;
3166 BlockIndexHeader *localBlockIndex;
3168 get_block_index_index_for_index(index, localBlockIndex);
3170 auto blockStartIndex = index;
3171 auto endIndex = (index & ~static_cast<index_t>(
BLOCK_SIZE - 1)) +
3174 details::circular_less_than<index_t>(
3175 firstIndex +
static_cast<index_t>(actualCount), endIndex)
3176 ? firstIndex +
static_cast<index_t>(actualCount)
3179 auto entry = localBlockIndex->index[indexIndex];
3183 std::move((*(*block)[index])))) {
3184 while (index != endIndex) {
3185 auto &el = *((*block)[index]);
3186 *itemFirst++ = std::move(el);
3192 while (index != endIndex) {
3193 auto &el = *((*block)[index]);
3194 *itemFirst = std::move(el);
3202 entry = localBlockIndex->index[indexIndex];
3204 while (index != endIndex) {
3205 (*block)[index++]->~T();
3208 if (block->ConcurrentQueue::Block::template set_many_empty<
3211 static_cast<size_t>(endIndex - blockStartIndex))) {
3212 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3213 debug::DebugLock lock(mutex);
3216 this->parent->add_block_to_free_list(block);
3219 (indexIndex + 1) & (localBlockIndex->capacity - 1);
3221 blockStartIndex = index;
3222 endIndex = (index & ~static_cast<index_t>(
BLOCK_SIZE - 1)) +
3225 details::circular_less_than<index_t>(
3226 firstIndex +
static_cast<index_t>(actualCount),
3228 ? firstIndex +
static_cast<index_t>(actualCount)
3230 }
while (index != firstIndex + actualCount);
3235 if (block->ConcurrentQueue::Block::template set_many_empty<
3238 static_cast<size_t>(endIndex - blockStartIndex))) {
3240 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3241 debug::DebugLock lock(mutex);
3249 this->parent->add_block_to_free_list(
3252 indexIndex = (indexIndex + 1) & (localBlockIndex->capacity - 1);
3253 }
while (index != firstIndex + actualCount);
3257 this->dequeueOvercommit.fetch_add(desiredCount,
3268 static const index_t INVALID_BLOCK_BASE = 1;
3270 struct BlockIndexEntry {
3271 std::atomic<index_t> key;
3272 std::atomic<Block *> value;
3275 struct BlockIndexHeader {
3277 std::atomic<size_t> tail;
3278 BlockIndexEntry *entries;
3279 BlockIndexEntry **index;
3280 BlockIndexHeader *prev;
3283 template <AllocationMode allocMode>
3284 inline bool insert_block_index_entry(BlockIndexEntry *&idxEntry,
3286 auto localBlockIndex =
3289 if (localBlockIndex ==
nullptr) {
3295 (localBlockIndex->capacity - 1);
3296 idxEntry = localBlockIndex->index[newTail];
3305 if (allocMode == CannotAlloc || !new_block_index()) {
3310 (localBlockIndex->capacity - 1);
3311 idxEntry = localBlockIndex->index[newTail];
3313 INVALID_BLOCK_BASE);
3319 inline void rewind_block_index_tail() {
3321 localBlockIndex->tail.store(
3323 (localBlockIndex->capacity - 1),
3327 inline BlockIndexEntry *get_block_index_entry_for_index(
3329 BlockIndexHeader *localBlockIndex;
3330 auto idx = get_block_index_index_for_index(index, localBlockIndex);
3331 return localBlockIndex->index[idx];
3334 inline size_t get_block_index_index_for_index(
3335 index_t index, BlockIndexHeader *&localBlockIndex)
const {
3336 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3337 debug::DebugLock lock(mutex);
3339 index &= ~static_cast<index_t>(
BLOCK_SIZE - 1);
3344 assert(tailBase != INVALID_BLOCK_BASE);
3347 auto offset =
static_cast<size_t>(
3348 static_cast<typename std::make_signed<index_t>::type
>(index -
3351 size_t idx = (tail + offset) & (localBlockIndex->capacity - 1);
3354 localBlockIndex->index[idx]->value.load(
3359 bool new_block_index() {
3361 size_t prevCapacity = prev ==
nullptr ? 0 : prev->capacity;
3362 auto entryCount = prev ==
nullptr ? nextBlockIndexCapacity : prevCapacity;
3363 auto raw =
static_cast<char *
>((Traits::malloc)(
3364 sizeof(BlockIndexHeader) + std::alignment_of<BlockIndexEntry>::value -
3365 1 +
sizeof(BlockIndexEntry) * entryCount +
3366 std::alignment_of<BlockIndexEntry *>::value - 1 +
3367 sizeof(BlockIndexEntry *) * nextBlockIndexCapacity));
3368 if (raw ==
nullptr) {
3372 auto header =
new (raw) BlockIndexHeader;
3373 auto entries =
reinterpret_cast<BlockIndexEntry *
>(
3374 details::align_for<BlockIndexEntry>(raw +
sizeof(BlockIndexHeader)));
3375 auto index =
reinterpret_cast<BlockIndexEntry **
>(
3376 details::align_for<BlockIndexEntry *>(
3377 reinterpret_cast<char *
>(entries) +
3378 sizeof(BlockIndexEntry) * entryCount));
3379 if (prev !=
nullptr) {
3381 auto prevPos = prevTail;
3384 prevPos = (prevPos + 1) & (prev->capacity - 1);
3385 index[i++] = prev->index[prevPos];
3386 }
while (prevPos != prevTail);
3387 assert(i == prevCapacity);
3389 for (
size_t i = 0; i != entryCount; ++i) {
3390 new (entries + i) BlockIndexEntry;
3392 index[prevCapacity + i] = entries + i;
3394 header->prev = prev;
3395 header->entries = entries;
3396 header->index = index;
3397 header->capacity = nextBlockIndexCapacity;
3398 header->tail.store((prevCapacity - 1) & (nextBlockIndexCapacity - 1),
3403 nextBlockIndexCapacity <<= 1;
3409 size_t nextBlockIndexCapacity;
3410 std::atomic<BlockIndexHeader *> blockIndex;
3412 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3414 details::ThreadExitListener threadExitListener;
3419 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3426 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODBLOCKINDEX
3427 mutable debug::DebugMutex mutex;
3430 friend struct MemStats;
3438 void populate_initial_block_list(
size_t blockCount) {
3439 initialBlockPoolSize = blockCount;
3440 if (initialBlockPoolSize == 0) {
3441 initialBlockPool =
nullptr;
3445 initialBlockPool = create_array<Block>(blockCount);
3446 if (initialBlockPool ==
nullptr) {
3447 initialBlockPoolSize = 0;
3449 for (
size_t i = 0; i < initialBlockPoolSize; ++i) {
3450 initialBlockPool[i].dynamicallyAllocated =
false;
3454 inline Block *try_get_block_from_initial_pool() {
3456 initialBlockPoolSize) {
3462 return index < initialBlockPoolSize ? (initialBlockPool + index) :
nullptr;
3465 inline void add_block_to_free_list(Block *block) {
3467 block->owner =
nullptr;
3469 freeList.add(block);
3472 inline void add_blocks_to_free_list(Block *block) {
3473 while (block !=
nullptr) {
3474 auto next = block->next;
3475 add_block_to_free_list(block);
3480 inline Block *try_get_block_from_free_list() {
return freeList.try_get(); }
3484 template <AllocationMode canAlloc>
3485 Block *requisition_block() {
3486 auto block = try_get_block_from_initial_pool();
3487 if (block !=
nullptr) {
3491 block = try_get_block_from_free_list();
3492 if (block !=
nullptr) {
3496 if (canAlloc == CanAlloc) {
3497 return create<Block>();
3506 size_t allocatedBlocks;
3509 size_t ownedBlocksExplicit;
3510 size_t ownedBlocksImplicit;
3511 size_t implicitProducers;
3512 size_t explicitProducers;
3513 size_t elementsEnqueued;
3514 size_t blockClassBytes;
3515 size_t queueClassBytes;
3516 size_t implicitBlockIndexBytes;
3517 size_t explicitBlockIndexBytes;
3523 MemStats stats = {0};
3525 stats.elementsEnqueued = q->size_approx();
3527 auto block = q->freeList.head_unsafe();
3528 while (block !=
nullptr) {
3529 ++stats.allocatedBlocks;
3535 ptr !=
nullptr; ptr = ptr->next_prod()) {
3537 stats.implicitProducers += implicit ? 1 : 0;
3538 stats.explicitProducers += implicit ? 0 : 1;
3546 if (hash !=
nullptr) {
3547 for (
size_t i = 0; i != hash->capacity; ++i) {
3549 ImplicitProducer::INVALID_BLOCK_BASE &&
3552 ++stats.allocatedBlocks;
3553 ++stats.ownedBlocksImplicit;
3556 stats.implicitBlockIndexBytes +=
3558 sizeof(
typename ImplicitProducer::BlockIndexEntry);
3559 for (; hash !=
nullptr; hash = hash->prev) {
3560 stats.implicitBlockIndexBytes +=
3561 sizeof(
typename ImplicitProducer::BlockIndexHeader) +
3563 sizeof(
typename ImplicitProducer::BlockIndexEntry *);
3566 for (; details::circular_less_than<index_t>(head, tail);
3574 auto tailBlock = prod->tailBlock;
3575 bool wasNonEmpty =
false;
3576 if (tailBlock !=
nullptr) {
3577 auto block = tailBlock;
3579 ++stats.allocatedBlocks;
3580 if (!block->ConcurrentQueue::Block::template is_empty<
3581 explicit_context>() ||
3584 wasNonEmpty = wasNonEmpty || block != tailBlock;
3586 ++stats.ownedBlocksExplicit;
3587 block = block->next;
3588 }
while (block != tailBlock);
3591 while (index !=
nullptr) {
3592 stats.explicitBlockIndexBytes +=
3593 sizeof(
typename ExplicitProducer::BlockIndexHeader) +
3595 sizeof(
typename ExplicitProducer::BlockIndexEntry);
3596 index =
static_cast<typename ExplicitProducer::BlockIndexHeader *
>(
3602 auto freeOnInitialPool =
3604 q->initialBlockPoolSize
3606 : q->initialBlockPoolSize -
3608 stats.allocatedBlocks += freeOnInitialPool;
3609 stats.freeBlocks += freeOnInitialPool;
3611 stats.blockClassBytes =
sizeof(Block) * stats.allocatedBlocks;
3619 MemStats getMemStats() {
return MemStats::getFor(
this); }
3622 friend struct MemStats;
3629 ProducerBase *recycle_or_create_producer(
bool isExplicit) {
3631 return recycle_or_create_producer(isExplicit, recycled);
3634 ProducerBase *recycle_or_create_producer(
bool isExplicit,
bool &recycled) {
3635 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3636 debug::DebugLock lock(implicitProdMutex);
3640 ptr !=
nullptr; ptr = ptr->next_prod()) {
3642 ptr->isExplicit == isExplicit) {
3643 bool expected =
true;
3644 if (ptr->inactive.compare_exchange_strong(
3656 return add_producer(
3657 isExplicit ?
static_cast<ProducerBase *
>(create<ExplicitProducer>(
this))
3658 : create<ImplicitProducer>(
this));
3661 ProducerBase *add_producer(ProducerBase *producer) {
3663 if (producer ==
nullptr) {
3672 producer->next = prevTail;
3673 }
while (!producerListTail.compare_exchange_weak(
3677 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
3678 if (producer->isExplicit) {
3683 }
while (!explicitProducers.compare_exchange_weak(
3691 }
while (!implicitProducers.compare_exchange_weak(
3700 void reown_producers() {
3705 ptr !=
nullptr; ptr = ptr->next_prod()) {
3714 struct ImplicitProducerKVP {
3715 std::atomic<details::thread_id_t> key;
3719 ImplicitProducerKVP() : value(nullptr) {}
3724 value = other.value;
3727 inline ImplicitProducerKVP &
operator=(ImplicitProducerKVP &&other)
3734 if (
this != &other) {
3741 template <
typename XT,
typename XTraits>
3747 struct ImplicitProducerHash {
3749 ImplicitProducerKVP *entries;
3750 ImplicitProducerHash *prev;
3753 inline void populate_initial_implicit_producer_hash() {
3757 auto hash = &initialImplicitProducerHash;
3759 hash->entries = &initialImplicitProducerHashEntries[0];
3761 initialImplicitProducerHashEntries[i].key.store(
3764 hash->prev =
nullptr;
3772 initialImplicitProducerHashEntries.swap(
3773 other.initialImplicitProducerHashEntries);
3774 initialImplicitProducerHash.entries =
3775 &initialImplicitProducerHashEntries[0];
3776 other.initialImplicitProducerHash.entries =
3777 &other.initialImplicitProducerHashEntries[0];
3780 other.implicitProducerHashCount);
3784 &other.initialImplicitProducerHash) {
3785 implicitProducerHash.store(&initialImplicitProducerHash,
3788 ImplicitProducerHash *hash;
3790 hash->prev != &other.initialImplicitProducerHash;
3791 hash = hash->prev) {
3794 hash->prev = &initialImplicitProducerHash;
3797 &initialImplicitProducerHash) {
3798 other.implicitProducerHash.store(&other.initialImplicitProducerHash,
3801 ImplicitProducerHash *hash;
3803 hash->prev != &initialImplicitProducerHash; hash = hash->prev) {
3806 hash->prev = &other.initialImplicitProducerHash;
3824 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
3825 debug::DebugLock lock(implicitProdMutex);
3832 for (
auto hash = mainHash; hash !=
nullptr; hash = hash->prev) {
3834 auto index = hashedId;
3837 index &= hash->capacity - 1;
3841 if (probedKey ==
id) {
3848 auto value = hash->entries[index].value;
3849 if (hash != mainHash) {
3852 index &= mainHash->capacity - 1;
3856 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3858 if ((probedKey == empty &&
3859 mainHash->entries[index].key.compare_exchange_strong(
3862 (probedKey == reusable &&
3863 mainHash->entries[index].key.compare_exchange_strong(
3867 if ((probedKey == empty &&
3868 mainHash->entries[index].key.compare_exchange_strong(
3872 mainHash->entries[index].value = value;
3892 if (newCount >= (mainHash->capacity >> 1) &&
3893 !implicitProducerHashResizeInProgress.test_and_set(
3901 if (newCount >= (mainHash->capacity >> 1)) {
3902 auto newCapacity = mainHash->capacity << 1;
3903 while (newCount >= (newCapacity >> 1)) {
3906 auto raw =
static_cast<char *
>(
3907 (Traits::malloc)(
sizeof(ImplicitProducerHash) +
3908 std::alignment_of<ImplicitProducerKVP>::value -
3909 1 +
sizeof(ImplicitProducerKVP) * newCapacity));
3910 if (raw ==
nullptr) {
3913 implicitProducerHashResizeInProgress.clear(
3918 auto newHash =
new (raw) ImplicitProducerHash;
3919 newHash->capacity = newCapacity;
3920 newHash->entries =
reinterpret_cast<ImplicitProducerKVP *
>(
3921 details::align_for<ImplicitProducerKVP>(
3922 raw +
sizeof(ImplicitProducerHash)));
3923 for (
size_t i = 0; i != newCapacity; ++i) {
3924 new (newHash->entries + i) ImplicitProducerKVP;
3928 newHash->prev = mainHash;
3941 if (newCount < (mainHash->capacity >> 1) + (mainHash->capacity >> 2)) {
3944 recycle_or_create_producer(
false, recycled));
3945 if (producer ==
nullptr) {
3953 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3954 producer->threadExitListener.callback =
3955 &ConcurrentQueue::implicit_producer_thread_exited_callback;
3956 producer->threadExitListener.userData = producer;
3957 details::ThreadExitNotifier::subscribe(&producer->threadExitListener);
3960 auto index = hashedId;
3962 index &= mainHash->capacity - 1;
3967 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
3969 if ((probedKey == empty &&
3970 mainHash->entries[index].key.compare_exchange_strong(
3973 (probedKey == reusable &&
3974 mainHash->entries[index].key.compare_exchange_strong(
3978 if ((probedKey == empty &&
3979 mainHash->entries[index].key.compare_exchange_strong(
3983 mainHash->entries[index].value = producer;
3998 #ifdef MOODYCAMEL_CPP11_THREAD_LOCAL_SUPPORTED
4001 details::ThreadExitNotifier::unsubscribe(&producer->threadExitListener);
4004 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
4005 debug::DebugLock lock(implicitProdMutex);
4008 assert(hash !=
nullptr);
4017 for (; hash !=
nullptr; hash = hash->prev) {
4018 auto index = hashedId;
4020 index &= hash->capacity - 1;
4022 if (probedKey ==
id) {
4040 static void implicit_producer_thread_exited_callback(
void *userData) {
4042 auto queue = producer->parent;
4043 queue->implicit_producer_thread_exited(producer);
4051 template <
typename U>
4052 static inline U *create_array(
size_t count) {
4054 auto p =
static_cast<U *
>((Traits::malloc)(
sizeof(U) * count));
4059 for (
size_t i = 0; i != count; ++i) {
4065 template <
typename U>
4066 static inline void destroy_array(U *p,
size_t count) {
4069 for (
size_t i = count; i != 0;) {
4076 template <
typename U>
4077 static inline U *create() {
4078 auto p = (Traits::malloc)(
sizeof(U));
4079 return p !=
nullptr ?
new (p) U :
nullptr;
4082 template <
typename U,
typename A1>
4083 static inline U *create(A1 &&a1) {
4084 auto p = (Traits::malloc)(
sizeof(U));
4085 return p !=
nullptr ?
new (p) U(std::forward<A1>(a1)) :
nullptr;
4088 template <
typename U>
4089 static inline void destroy(U *p) {
4097 std::atomic<ProducerBase *> producerListTail;
4098 std::atomic<std::uint32_t> producerCount;
4100 std::atomic<size_t> initialBlockPoolIndex;
4101 Block *initialBlockPool;
4102 size_t initialBlockPoolSize;
4104 #if !MCDBGQ_USEDEBUGFREELIST
4105 FreeList<Block> freeList;
4107 debug::DebugFreeList<Block> freeList;
4110 std::atomic<ImplicitProducerHash *> implicitProducerHash;
4112 implicitProducerHashCount;
4113 ImplicitProducerHash initialImplicitProducerHash;
4114 std::array<ImplicitProducerKVP, INITIAL_IMPLICIT_PRODUCER_HASH_SIZE>
4115 initialImplicitProducerHashEntries;
4116 std::atomic_flag implicitProducerHashResizeInProgress;
4118 std::atomic<std::uint32_t> nextExplicitConsumerId;
4119 std::atomic<std::uint32_t> globalExplicitConsumerOffset;
4121 #if MCDBGQ_NOLOCKFREE_IMPLICITPRODHASH
4122 debug::DebugMutex implicitProdMutex;
4125 #ifdef MOODYCAMEL_QUEUE_INTERNAL_DEBUG
4126 std::atomic<ExplicitProducer *> explicitProducers;
4127 std::atomic<ImplicitProducer *> implicitProducers;
4131 template <
typename T,
typename Traits>
4133 : producer(queue.recycle_or_create_producer(true)) {
4134 if (producer !=
nullptr) {
4135 producer->token =
this;
4139 template <
typename T,
typename Traits>
4142 ->recycle_or_create_producer(true)) {
4143 if (producer !=
nullptr) {
4144 producer->token =
this;
4148 template <
typename T,
typename Traits>
4150 : itemsConsumedFromCurrent(0),
4151 currentProducer(nullptr),
4152 desiredProducer(nullptr) {
4155 lastKnownGlobalOffset = -1;
4158 template <
typename T,
typename Traits>
4160 : itemsConsumedFromCurrent(0),
4161 currentProducer(nullptr),
4162 desiredProducer(nullptr) {
4166 lastKnownGlobalOffset = -1;
4169 template <
typename T,
typename Traits>
4183 template <
typename T,
typename Traits>
4184 inline void swap(
typename ConcurrentQueue<T, Traits>::ImplicitProducerKVP &a,
4192 #if defined(__GNUC__)
4193 #pragma GCC diagnostic pop
Definition: blockingconcurrentqueue.h:373
Definition: concurrentqueue.h:802
bool try_enqueue(producer_token_t const &token, T &&item)
Definition: concurrentqueue.h:1164
bool enqueue(producer_token_t const &token, T const &item)
Definition: concurrentqueue.h:1098
bool enqueue(producer_token_t const &token, T &&item)
Definition: concurrentqueue.h:1106
ConcurrentQueue & operator=(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
bool try_dequeue(U &item)
Definition: concurrentqueue.h:1197
ConcurrentQueue(ConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
size_t try_dequeue_bulk_from_producer(producer_token_t const &producer, It itemFirst, size_t max)
Definition: concurrentqueue.h:1395
static const size_t MAX_SUBQUEUE_SIZE
Definition: concurrentqueue.h:828
ConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
Definition: concurrentqueue.h:886
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: concurrentqueue.h:1187
bool enqueue(T &&item)
Definition: concurrentqueue.h:1089
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: concurrentqueue.h:817
bool enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1118
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
Definition: concurrentqueue.h:1331
bool try_dequeue_from_producer(producer_token_t const &producer, U &item)
Definition: concurrentqueue.h:1383
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: concurrentqueue.h:1130
bool try_dequeue(consumer_token_t &token, U &item)
Definition: concurrentqueue.h:1257
friend class ConcurrentQueueTests
Definition: concurrentqueue.h:1436
::moodycamel::ProducerToken producer_token_t
Definition: concurrentqueue.h:804
friend struct ExplicitProducer
Definition: concurrentqueue.h:1431
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:1038
size_t try_dequeue_bulk(It itemFirst, size_t max)
Definition: concurrentqueue.h:1313
friend struct ConsumerToken
Definition: concurrentqueue.h:1430
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: concurrentqueue.h:819
ConcurrentQueue & operator=(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:1028
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:815
bool try_enqueue(T const &item)
Definition: concurrentqueue.h:1139
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: concurrentqueue.h:811
friend struct ImplicitProducer
Definition: concurrentqueue.h:1433
Traits::size_t size_t
Definition: concurrentqueue.h:808
::moodycamel::ConsumerToken consumer_token_t
Definition: concurrentqueue.h:805
bool enqueue(T const &item)
Definition: concurrentqueue.h:1078
static bool is_lock_free()
Definition: concurrentqueue.h:1418
friend struct ProducerToken
Definition: concurrentqueue.h:1429
bool try_enqueue(producer_token_t const &token, T const &item)
Definition: concurrentqueue.h:1157
ConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
Definition: concurrentqueue.h:910
Traits::index_t index_t
Definition: concurrentqueue.h:807
bool try_dequeue_non_interleaved(U &item)
Definition: concurrentqueue.h:1242
~ConcurrentQueue()
Definition: concurrentqueue.h:933
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:813
static const size_t BLOCK_SIZE
Definition: concurrentqueue.h:810
ConcurrentQueue(ConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:987
size_t size_approx() const
Definition: concurrentqueue.h:1406
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1176
bool try_enqueue(T &&item)
Definition: concurrentqueue.h:1149
#define MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:252
#define MOODYCAMEL_NOEXCEPT_CTOR(type, valueType, expr)
Definition: concurrentqueue.h:253
#define MOODYCAMEL_THREADLOCAL
Definition: concurrentqueue.h:174
#define MOODYCAMEL_DELETE_FUNCTION
Definition: concurrentqueue.h:289
#define MOODYCAMEL_CATCH(...)
Definition: concurrentqueue.h:201
#define MOODYCAMEL_NOEXCEPT_ASSIGN(type, valueType, expr)
Definition: concurrentqueue.h:254
#define MOODYCAMEL_RETHROW
Definition: concurrentqueue.h:202
#define MOODYCAMEL_TRY
Definition: concurrentqueue.h:200
static const thread_id_t invalid_thread_id2
Definition: concurrentqueue.h:180
long long y
Definition: concurrentqueue.h:338
static bool circular_less_than(T a, T b)
Definition: concurrentqueue.h:501
static bool() likely(bool x)
Definition: concurrentqueue.h:302
static void swap_relaxed(std::atomic< T > &left, std::atomic< T > &right)
Definition: concurrentqueue.h:546
std::max_align_t std_max_align_t
Definition: concurrentqueue.h:329
static auto deref_noexcept(It &it) MOODYCAMEL_NOEXCEPT -> decltype(*it)
Definition: concurrentqueue.h:575
static const thread_id_t invalid_thread_id
Definition: concurrentqueue.h:179
static char * align_for(char *ptr)
Definition: concurrentqueue.h:519
static size_t hash_thread_id(thread_id_t id)
Definition: concurrentqueue.h:490
void * z
Definition: concurrentqueue.h:339
static thread_id_t thread_id()
Definition: concurrentqueue.h:183
static bool() unlikely(bool x)
Definition: concurrentqueue.h:303
static T const & nomove(T const &x)
Definition: concurrentqueue.h:554
std_max_align_t x
Definition: concurrentqueue.h:337
std::uintptr_t thread_id_t
Definition: concurrentqueue.h:178
static T ceil_to_pow_2(T x)
Definition: concurrentqueue.h:527
Definition: concurrentqueue.h:336
Definition: atomicops.h:69
void swap(BlockingConcurrentQueue< T, Traits > &a, BlockingConcurrentQueue< T, Traits > &b) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:929
@ memory_order_acq_rel
Definition: atomicops.h:75
@ memory_order_acquire
Definition: atomicops.h:73
@ memory_order_relaxed
Definition: atomicops.h:72
@ memory_order_release
Definition: atomicops.h:74
void swap(ConsumerToken &a, ConsumerToken &b) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:4180
Definition: concurrentqueue.h:349
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: concurrentqueue.h:394
static void free(void *ptr)
Definition: concurrentqueue.h:424
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: concurrentqueue.h:399
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:382
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:387
static void * malloc(size_t size)
Definition: concurrentqueue.h:422
static const size_t BLOCK_SIZE
Definition: concurrentqueue.h:370
std::size_t index_t
Definition: concurrentqueue.h:363
static const size_t MAX_SUBQUEUE_SIZE
Definition: concurrentqueue.h:406
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: concurrentqueue.h:377
std::size_t size_t
Definition: concurrentqueue.h:351
Definition: concurrentqueue.h:748
ConsumerToken & operator=(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
ConsumerToken(ConsumerToken const &) MOODYCAMEL_DELETE_FUNCTION
ConsumerToken(ConcurrentQueue< T, Traits > &q)
Definition: concurrentqueue.h:4150
ConsumerToken & operator=(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:762
friend class ConcurrentQueueTests
Definition: concurrentqueue.h:783
ConsumerToken(ConsumerToken &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:755
void swap(ConsumerToken &other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:767
Definition: concurrentqueue.h:687
bool valid() const
Definition: concurrentqueue.h:725
~ProducerToken()
Definition: concurrentqueue.h:727
friend class ConcurrentQueueTests
Definition: concurrentqueue.h:742
ProducerToken(ConcurrentQueue< T, Traits > &queue)
Definition: concurrentqueue.h:4133
ProducerToken & operator=(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:702
details::ConcurrentQueueProducerTypelessBase * producer
Definition: concurrentqueue.h:745
ProducerToken(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
void swap(ProducerToken &other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:707
ProducerToken(ProducerToken &&other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:694
ProducerToken & operator=(ProducerToken const &) MOODYCAMEL_DELETE_FUNCTION
Definition: concurrentqueue.h:451
std::atomic< bool > inactive
Definition: concurrentqueue.h:453
ConcurrentQueueProducerTypelessBase * next
Definition: concurrentqueue.h:452
ProducerToken * token
Definition: concurrentqueue.h:454
ConcurrentQueueProducerTypelessBase()
Definition: concurrentqueue.h:456
static std::uint64_t hash(std::uint64_t h)
Definition: concurrentqueue.h:478
Definition: concurrentqueue.h:461
static std::uint32_t hash(std::uint32_t h)
Definition: concurrentqueue.h:462
Definition: concurrentqueue.h:315
static const T value
Definition: concurrentqueue.h:318
Definition: concurrentqueue.h:488
Definition: concurrentqueue.h:582
static auto eval(U &&x) -> decltype(std::forward< U >(x))
Definition: concurrentqueue.h:569
Definition: concurrentqueue.h:559
static T const & eval(T const &x)
Definition: concurrentqueue.h:561
Definition: concurrentqueue.h:651
@ value
Definition: concurrentqueue.h:652
Definition: concurrentqueue.h:676
Definition: concurrentqueue.h:80
thread_id_t thread_id_numeric_size_t
Definition: concurrentqueue.h:81
thread_id_t thread_id_hash_t
Definition: concurrentqueue.h:82
static thread_id_hash_t prehash(thread_id_t const &x)
Definition: concurrentqueue.h:84