12 #include <type_traits>
15 #if __cplusplus > 199711L || _MSC_VER >= 1700
32 #ifndef MOODYCAMEL_CACHE_LINE_SIZE
33 #define MOODYCAMEL_CACHE_LINE_SIZE 64
36 #ifndef MOODYCAMEL_EXCEPTIONS_ENABLED
37 #if (defined(_MSC_VER) && defined(_CPPUNWIND)) || \
38 (defined(__GNUC__) && defined(__EXCEPTIONS)) || \
39 (!defined(_MSC_VER) && !defined(__GNUC__))
40 #define MOODYCAMEL_EXCEPTIONS_ENABLED
48 #pragma warning(disable : 4820)
49 #pragma warning(disable : 4127)
54 template <
typename T,
size_t MAX_BLOCK_SIZE = 512>
88 assert(MAX_BLOCK_SIZE == ceilToPow2(MAX_BLOCK_SIZE) &&
89 "MAX_BLOCK_SIZE must be a power of 2");
90 assert(MAX_BLOCK_SIZE >= 2 &&
"MAX_BLOCK_SIZE must be at least 2");
92 Block* firstBlock =
nullptr;
94 largestBlockSize = ceilToPow2(
97 if (largestBlockSize > MAX_BLOCK_SIZE * 2) {
106 size_t initialBlockCount =
107 (maxSize + MAX_BLOCK_SIZE * 2 - 3) / (MAX_BLOCK_SIZE - 1);
108 largestBlockSize = MAX_BLOCK_SIZE;
109 Block* lastBlock =
nullptr;
110 for (
size_t i = 0; i != initialBlockCount; ++i) {
111 auto block = make_block(largestBlockSize);
112 if (block ==
nullptr) {
113 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
114 throw std::bad_alloc();
119 if (firstBlock ==
nullptr) {
122 lastBlock->next = block;
125 block->next = firstBlock;
128 firstBlock = make_block(largestBlockSize);
129 if (firstBlock ==
nullptr) {
130 #ifdef MOODYCAMEL_EXCEPTIONS_ENABLED
131 throw std::bad_alloc();
136 firstBlock->next = firstBlock;
138 frontBlock = firstBlock;
139 tailBlock = firstBlock;
153 Block* frontBlock_ = frontBlock;
154 Block* block = frontBlock_;
156 Block* nextBlock = block->next;
157 size_t blockFront = block->front;
158 size_t blockTail = block->tail;
160 for (
size_t i = blockFront; i != blockTail;
161 i = (i + 1) & block->sizeMask) {
162 auto element =
reinterpret_cast<T*
>(block->data + i *
sizeof(T));
167 auto rawBlock = block->rawThis;
171 }
while (block != frontBlock_);
178 return inner_enqueue<CannotAlloc>(element);
185 return inner_enqueue<CannotAlloc>(std::forward<T>(element));
192 return inner_enqueue<CanAlloc>(element);
199 return inner_enqueue<CanAlloc>(std::forward<T>(element));
205 template <
typename U>
208 ReentrantGuard guard(this->dequeuing);
228 Block* frontBlock_ = frontBlock.
load();
229 size_t blockTail = frontBlock_->localTail;
230 size_t blockFront = frontBlock_->front.load();
232 if (blockFront != blockTail ||
233 blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
236 non_empty_front_block:
239 reinterpret_cast<T*
>(frontBlock_->data + blockFront *
sizeof(T));
240 result = std::move(*element);
243 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
246 frontBlock_->front = blockFront;
247 }
else if (frontBlock_ != tailBlock.
load()) {
250 frontBlock_ = frontBlock.
load();
251 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
252 blockFront = frontBlock_->front.load();
255 if (blockFront != blockTail) {
257 goto non_empty_front_block;
261 Block* nextBlock = frontBlock_->next;
267 size_t nextBlockFront = nextBlock->front.load();
268 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
273 assert(nextBlockFront != nextBlockTail);
279 frontBlock = frontBlock_ = nextBlock;
284 reinterpret_cast<T*
>(frontBlock_->data + nextBlockFront *
sizeof(T));
286 result = std::move(*element);
289 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
292 frontBlock_->front = nextBlockFront;
308 ReentrantGuard guard(this->dequeuing);
312 Block* frontBlock_ = frontBlock.
load();
313 size_t blockTail = frontBlock_->localTail;
314 size_t blockFront = frontBlock_->front.load();
316 if (blockFront != blockTail ||
317 blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
319 non_empty_front_block:
320 return reinterpret_cast<T*
>(frontBlock_->data + blockFront *
sizeof(T));
321 }
else if (frontBlock_ != tailBlock.
load()) {
323 frontBlock_ = frontBlock.
load();
324 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
325 blockFront = frontBlock_->front.load();
328 if (blockFront != blockTail) {
329 goto non_empty_front_block;
332 Block* nextBlock = frontBlock_->next;
334 size_t nextBlockFront = nextBlock->front.load();
337 assert(nextBlockFront != nextBlock->tail.load());
338 return reinterpret_cast<T*
>(nextBlock->data + nextBlockFront *
sizeof(T));
349 ReentrantGuard guard(this->dequeuing);
353 Block* frontBlock_ = frontBlock.
load();
354 size_t blockTail = frontBlock_->localTail;
355 size_t blockFront = frontBlock_->front.load();
357 if (blockFront != blockTail ||
358 blockFront != (frontBlock_->localTail = frontBlock_->tail.load())) {
361 non_empty_front_block:
363 reinterpret_cast<T*
>(frontBlock_->data + blockFront *
sizeof(T));
366 blockFront = (blockFront + 1) & frontBlock_->sizeMask;
369 frontBlock_->front = blockFront;
370 }
else if (frontBlock_ != tailBlock.
load()) {
372 frontBlock_ = frontBlock.
load();
373 blockTail = frontBlock_->localTail = frontBlock_->tail.load();
374 blockFront = frontBlock_->front.load();
377 if (blockFront != blockTail) {
378 goto non_empty_front_block;
382 Block* nextBlock = frontBlock_->next;
384 size_t nextBlockFront = nextBlock->front.load();
385 size_t nextBlockTail = nextBlock->localTail = nextBlock->tail.load();
388 assert(nextBlockFront != nextBlockTail);
392 frontBlock = frontBlock_ = nextBlock;
397 reinterpret_cast<T*
>(frontBlock_->data + nextBlockFront *
sizeof(T));
400 nextBlockFront = (nextBlockFront + 1) & frontBlock_->sizeMask;
403 frontBlock_->front = nextBlockFront;
416 Block* frontBlock_ = frontBlock.
load();
417 Block* block = frontBlock_;
420 size_t blockFront = block->front.load();
421 size_t blockTail = block->tail.load();
422 result += (blockTail - blockFront) & block->sizeMask;
423 block = block->next.load();
424 }
while (block != frontBlock_);
429 enum AllocationMode { CanAlloc, CannotAlloc };
431 template <AllocationMode canAlloc,
typename U>
432 bool inner_enqueue(U&& element) {
434 ReentrantGuard guard(this->enqueuing);
444 Block* tailBlock_ = tailBlock.
load();
445 size_t blockFront = tailBlock_->localFront;
446 size_t blockTail = tailBlock_->tail.load();
448 size_t nextBlockTail = (blockTail + 1) & tailBlock_->sizeMask;
449 if (nextBlockTail != blockFront ||
450 nextBlockTail != (tailBlock_->localFront = tailBlock_->front.load())) {
453 char* location = tailBlock_->data + blockTail *
sizeof(T);
454 new (location) T(std::forward<U>(element));
457 tailBlock_->tail = nextBlockTail;
460 if (tailBlock_->next.load() != frontBlock) {
471 Block* tailBlockNext = tailBlock_->next.load();
472 size_t nextBlockFront = tailBlockNext->localFront =
473 tailBlockNext->front.load();
474 nextBlockTail = tailBlockNext->tail.load();
479 assert(nextBlockFront == nextBlockTail);
480 tailBlockNext->localFront = nextBlockFront;
482 char* location = tailBlockNext->data + nextBlockTail *
sizeof(T);
483 new (location) T(std::forward<U>(element));
485 tailBlockNext->tail = (nextBlockTail + 1) & tailBlockNext->sizeMask;
488 tailBlock = tailBlockNext;
489 }
else if (canAlloc == CanAlloc) {
491 auto newBlockSize = largestBlockSize >= MAX_BLOCK_SIZE
493 : largestBlockSize * 2;
494 auto newBlock = make_block(newBlockSize);
495 if (newBlock ==
nullptr) {
499 largestBlockSize = newBlockSize;
501 new (newBlock->data) T(std::forward<U>(element));
503 assert(newBlock->front == 0);
504 newBlock->tail = newBlock->localTail = 1;
506 newBlock->next = tailBlock_->next.load();
507 tailBlock_->next = newBlock;
517 tailBlock = newBlock;
518 }
else if (canAlloc == CannotAlloc) {
522 assert(
false &&
"Should be unreachable code");
542 for (
size_t i = 1; i <
sizeof(size_t); i <<= 1) {
549 template <
typename U>
551 const std::size_t alignment = std::alignment_of<U>::value;
553 (alignment - (
reinterpret_cast<std::uintptr_t
>(ptr) % alignment)) %
559 struct ReentrantGuard {
560 ReentrantGuard(
bool& _inSection) : inSection(_inSection) {
562 "ReaderWriterQueue does not support enqueuing or dequeuing "
563 "elements from other elements' ctors and dtors");
567 ~ReentrantGuard() { inSection =
false; }
570 ReentrantGuard& operator=(ReentrantGuard
const&);
580 weak_atomic<size_t> front;
585 sizeof(weak_atomic<size_t>) -
sizeof(
size_t)];
586 weak_atomic<size_t> tail;
590 sizeof(weak_atomic<size_t>) -
594 weak_atomic<Block*> next;
598 const size_t sizeMask;
601 Block(
size_t const& _size,
char* _rawThis,
char* _data)
613 Block& operator=(Block
const&);
619 static Block* make_block(
size_t capacity) {
622 auto size =
sizeof(Block) + std::alignment_of<Block>::value - 1;
623 size +=
sizeof(T) * capacity + std::alignment_of<T>::value - 1;
624 auto newBlockRaw =
static_cast<char*
>(std::malloc(size));
625 if (newBlockRaw ==
nullptr) {
629 auto newBlockAligned = align_for<Block>(newBlockRaw);
630 auto newBlockData = align_for<T>(newBlockAligned +
sizeof(Block));
631 return new (newBlockAligned) Block(capacity, newBlockRaw, newBlockData);
643 size_t largestBlockSize;
652 template <
typename T,
size_t MAX_BLOCK_SIZE = 512>
697 if (inner.
enqueue(std::forward<T>(element))) {
707 template <
typename U>
720 template <
typename U>
735 template <
typename U>
737 if (!sema.
wait(timeout_usecs)) {
747 #if __cplusplus > 199711L || _MSC_VER >= 1700
754 template <
typename U,
typename Rep,
typename Period>
756 U& result, std::chrono::duration<Rep, Period>
const& timeout) {
759 std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
775 bool result = inner.
pop();
793 ReaderWriterQueue inner;
794 spsc_sema::LightweightSemaphore sema;
#define AE_UNUSED(x)
Definition: atomicops.h:45
#define AE_FORCEINLINE
Definition: atomicops.h:54
Definition: readerwriterqueue.h:653
AE_FORCEINLINE bool enqueue(T const &element)
Definition: readerwriterqueue.h:685
AE_FORCEINLINE bool try_enqueue(T &&element)
Definition: readerwriterqueue.h:674
AE_FORCEINLINE size_t size_approx() const
Definition: readerwriterqueue.h:785
AE_FORCEINLINE bool pop()
Definition: readerwriterqueue.h:773
AE_FORCEINLINE T * peek()
Definition: readerwriterqueue.h:768
void wait_dequeue(U &result)
Definition: readerwriterqueue.h:721
AE_FORCEINLINE bool try_enqueue(T const &element)
Definition: readerwriterqueue.h:663
BlockingReaderWriterQueue(size_t maxSize=15)
Definition: readerwriterqueue.h:658
bool try_dequeue(U &result)
Definition: readerwriterqueue.h:708
bool wait_dequeue_timed(U &result, std::int64_t timeout_usecs)
Definition: readerwriterqueue.h:736
AE_FORCEINLINE bool enqueue(T &&element)
Definition: readerwriterqueue.h:696
Definition: readerwriterqueue.h:55
size_t size_approx() const
Definition: readerwriterqueue.h:414
bool pop()
Definition: readerwriterqueue.h:347
ReaderWriterQueue(size_t maxSize=15)
Definition: readerwriterqueue.h:81
AE_FORCEINLINE bool try_enqueue(T const &element)
Definition: readerwriterqueue.h:177
T * peek()
Definition: readerwriterqueue.h:306
AE_FORCEINLINE bool enqueue(T const &element)
Definition: readerwriterqueue.h:191
AE_FORCEINLINE bool enqueue(T &&element)
Definition: readerwriterqueue.h:198
AE_FORCEINLINE bool try_enqueue(T &&element)
Definition: readerwriterqueue.h:184
bool try_dequeue(U &result)
Definition: readerwriterqueue.h:206
~ReaderWriterQueue()
Definition: readerwriterqueue.h:148
bool tryWait()
Definition: atomicops.h:619
void wait()
Definition: atomicops.h:627
ssize_t availableApprox() const
Definition: atomicops.h:644
void signal(ssize_t count=1)
Definition: atomicops.h:635
AE_FORCEINLINE T load() const
Definition: atomicops.h:343
Definition: atomicops.h:69
@ memory_order_acquire
Definition: atomicops.h:73
@ memory_order_sync
Definition: atomicops.h:80
@ memory_order_release
Definition: atomicops.h:74
AE_FORCEINLINE void fence(memory_order order)
Definition: atomicops.h:222
AE_FORCEINLINE void compiler_fence(memory_order order)
Definition: atomicops.h:201
#define MOODYCAMEL_CACHE_LINE_SIZE
Definition: readerwriterqueue.h:33