13 #include <type_traits>
23 struct _SECURITY_ATTRIBUTES;
24 __declspec(dllimport)
void *__stdcall CreateSemaphoreW(
25 _SECURITY_ATTRIBUTES *lpSemaphoreAttributes,
long lInitialCount,
26 long lMaximumCount,
const wchar_t *lpName);
27 __declspec(dllimport)
int __stdcall CloseHandle(
void *hObject);
28 __declspec(dllimport)
unsigned long __stdcall WaitForSingleObject(
29 void *hHandle,
unsigned long dwMilliseconds);
30 __declspec(dllimport)
int __stdcall ReleaseSemaphore(
void *hSemaphore,
32 long *lpPreviousCount);
34 #elif defined(__MACH__)
35 #include <mach/mach.h>
36 #elif defined(__unix__)
37 #include <semaphore.h>
73 Semaphore(
int initialCount = 0) {
74 assert(initialCount >= 0);
75 const long maxLong = 0x7fffffff;
76 m_hSema = CreateSemaphoreW(
nullptr, initialCount, maxLong,
nullptr);
79 ~Semaphore() { CloseHandle(m_hSema); }
82 const unsigned long infinite = 0xffffffff;
83 WaitForSingleObject(m_hSema, infinite);
87 const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
88 return WaitForSingleObject(m_hSema, 0) != RC_WAIT_TIMEOUT;
91 bool timed_wait(std::uint64_t usecs) {
92 const unsigned long RC_WAIT_TIMEOUT = 0x00000102;
93 return WaitForSingleObject(m_hSema, (
unsigned long)(usecs / 1000)) !=
97 void signal(
int count = 1) { ReleaseSemaphore(m_hSema, count,
nullptr); }
99 #elif defined(__MACH__)
113 Semaphore(
int initialCount = 0) {
114 assert(initialCount >= 0);
115 semaphore_create(mach_task_self(), &m_sema, SYNC_POLICY_FIFO, initialCount);
118 ~Semaphore() { semaphore_destroy(mach_task_self(), m_sema); }
120 void wait() { semaphore_wait(m_sema); }
122 bool try_wait() {
return timed_wait(0); }
124 bool timed_wait(std::uint64_t timeout_usecs) {
126 ts.tv_sec =
static_cast<unsigned int>(timeout_usecs / 1000000);
127 ts.tv_nsec = (timeout_usecs % 1000000) * 1000;
131 kern_return_t rc = semaphore_timedwait(m_sema, ts);
133 return rc != KERN_OPERATION_TIMED_OUT && rc != KERN_ABORTED;
136 void signal() { semaphore_signal(m_sema); }
138 void signal(
int count) {
139 while (count-- > 0) {
140 semaphore_signal(m_sema);
144 #elif defined(__unix__)
157 Semaphore(
int initialCount = 0) {
158 assert(initialCount >= 0);
159 sem_init(&m_sema, 0, initialCount);
162 ~Semaphore() { sem_destroy(&m_sema); }
168 rc = sem_wait(&m_sema);
169 }
while (rc == -1 && errno == EINTR);
175 rc = sem_trywait(&m_sema);
176 }
while (rc == -1 && errno == EINTR);
177 return !(rc == -1 && errno == EAGAIN);
180 bool timed_wait(std::uint64_t usecs) {
182 const int usecs_in_1_sec = 1000000;
183 const int nsecs_in_1_sec = 1000000000;
184 clock_gettime(CLOCK_REALTIME, &ts);
185 ts.tv_sec += usecs / usecs_in_1_sec;
186 ts.tv_nsec += (usecs % usecs_in_1_sec) * 1000;
189 if (ts.tv_nsec >= nsecs_in_1_sec) {
190 ts.tv_nsec -= nsecs_in_1_sec;
196 rc = sem_timedwait(&m_sema, &ts);
197 }
while (rc == -1 && errno == EINTR);
198 return !(rc == -1 && errno == ETIMEDOUT);
201 void signal() { sem_post(&m_sema); }
203 void signal(
int count) {
204 while (count-- > 0) {
210 #error Unsupported platform! (No semaphore wrapper available)
218 typedef std::make_signed<std::size_t>::type
ssize_t;
221 std::atomic<ssize_t> m_count;
224 bool waitWithPartialSpinning(std::int64_t timeout_usecs = -1) {
230 while (--spin >= 0) {
232 if ((oldCount > 0) &&
233 m_count.compare_exchange_strong(oldCount, oldCount - 1,
237 std::atomic_signal_fence(
242 if (oldCount > 0)
return true;
243 if (timeout_usecs < 0) {
247 if (m_sema.timed_wait((std::uint64_t)timeout_usecs))
return true;
255 if (oldCount >= 0 && m_sema.try_wait())
return true;
256 if (oldCount < 0 && m_count.compare_exchange_strong(
264 std::int64_t timeout_usecs = -1) {
268 while (--spin >= 0) {
271 ssize_t newCount = oldCount > max ? oldCount - max : 0;
272 if (m_count.compare_exchange_strong(oldCount, newCount,
275 return oldCount - newCount;
281 if (timeout_usecs < 0)
283 else if (!m_sema.timed_wait((std::uint64_t)timeout_usecs)) {
286 if (oldCount >= 0 && m_sema.try_wait())
break;
288 m_count.compare_exchange_strong(oldCount, oldCount + 1,
301 assert(initialCount >= 0);
306 while (oldCount > 0) {
307 if (m_count.compare_exchange_weak(oldCount, oldCount - 1,
316 if (!
tryWait()) waitWithPartialSpinning();
319 bool wait(std::int64_t timeout_usecs) {
320 return tryWait() || waitWithPartialSpinning(timeout_usecs);
327 while (oldCount > 0) {
328 ssize_t newCount = oldCount > max ? oldCount - max : 0;
329 if (m_count.compare_exchange_weak(oldCount, newCount,
332 return oldCount - newCount;
341 if (result == 0 && max > 0)
342 result = waitManyWithPartialSpinning(max, timeout_usecs);
355 ssize_t toRelease = -oldCount < count ? -oldCount : count;
357 m_sema.signal((
int)toRelease);
363 return count > 0 ? count : 0;
372 template <
typename T,
typename Traits = ConcurrentQueueDefaultTraits>
384 typedef typename std::make_signed<size_t>::type
ssize_t;
416 "BlockingConcurrentQueue must have ConcurrentQueue as its first "
424 size_t maxImplicitProducers)
425 : inner(minCapacity, maxExplicitProducers, maxImplicitProducers),
430 "BlockingConcurrentQueue must have ConcurrentQueue as its first "
450 : inner(std::move(other.inner)),
451 sema(std::move(other.sema)) {}
455 return swap_internal(other);
464 swap_internal(other);
469 if (
this == &other) {
473 inner.
swap(other.inner);
474 sema.swap(other.sema);
538 template <
typename It>
541 inner.
enqueue_bulk(std::forward<It>(itemFirst), count))) {
554 template <
typename It>
558 inner.
enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
620 template <
typename It>
634 template <
typename It>
648 template <
typename U>
650 if (sema->tryWait()) {
663 template <
typename U>
665 if (sema->tryWait()) {
679 template <
typename It>
684 while (count != max) {
685 count += inner.template try_dequeue_bulk<It &>(itemFirst, max - count);
695 template <
typename It>
701 while (count != max) {
703 inner.template try_dequeue_bulk<It &>(token, itemFirst, max - count);
711 template <
typename U>
726 template <
typename U>
728 if (!sema->wait(timeout_usecs)) {
741 template <
typename U,
typename Rep,
typename Period>
743 U &item, std::chrono::duration<Rep, Period>
const &timeout) {
746 std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
752 template <
typename U>
767 template <
typename U>
769 std::int64_t timeout_usecs) {
770 if (!sema->wait(timeout_usecs)) {
783 template <
typename U,
typename Rep,
typename Period>
786 std::chrono::duration<Rep, Period>
const &timeout) {
789 std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
797 template <
typename It>
801 while (count != max) {
802 count += inner.template try_dequeue_bulk<It &>(itemFirst, max - count);
814 template <
typename It>
816 std::int64_t timeout_usecs) {
820 while (count != max) {
821 count += inner.template try_dequeue_bulk<It &>(itemFirst, max - count);
831 template <
typename It,
typename Rep,
typename Period>
833 It itemFirst,
size_t max,
834 std::chrono::duration<Rep, Period>
const &timeout) {
835 return wait_dequeue_bulk_timed<It &>(
837 std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
845 template <
typename It>
850 while (count != max) {
852 inner.template try_dequeue_bulk<It &>(token, itemFirst, max - count);
863 template <
typename It>
866 std::int64_t timeout_usecs) {
870 while (count != max) {
872 inner.template try_dequeue_bulk<It &>(token, itemFirst, max - count);
881 template <
typename It,
typename Rep,
typename Period>
884 std::chrono::duration<Rep, Period>
const &timeout) {
885 return wait_dequeue_bulk_timed<It &>(
886 token, itemFirst, max,
887 std::chrono::duration_cast<std::chrono::microseconds>(timeout).count());
895 inline size_t size_approx()
const {
return (
size_t)sema->availableApprox(); }
903 template <
typename U>
904 static inline U *create() {
905 auto p = (Traits::malloc)(
sizeof(U));
906 return p !=
nullptr ?
new (p) U :
nullptr;
909 template <
typename U,
typename A1>
910 static inline U *create(A1 &&a1) {
911 auto p = (Traits::malloc)(
sizeof(U));
912 return p !=
nullptr ?
new (p) U(std::forward<A1>(a1)) :
nullptr;
915 template <
typename U>
916 static inline void destroy(U *p) {
924 ConcurrentQueue inner;
925 std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore *)> sema;
928 template <
typename T,
typename Traits>
Definition: blockingconcurrentqueue.h:373
size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::chrono::duration< Rep, Period > const &timeout)
Definition: blockingconcurrentqueue.h:832
bool enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: blockingconcurrentqueue.h:555
void wait_dequeue(U &item)
Definition: blockingconcurrentqueue.h:712
bool wait_dequeue_timed(consumer_token_t &token, U &item, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:768
ConcurrentQueue::consumer_token_t consumer_token_t
Definition: blockingconcurrentqueue.h:380
void swap(BlockingConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:463
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: blockingconcurrentqueue.h:395
bool try_enqueue(T &&item)
Definition: blockingconcurrentqueue.h:583
bool try_enqueue_bulk(producer_token_t const &token, It itemFirst, size_t count)
Definition: blockingconcurrentqueue.h:635
size_t try_dequeue_bulk(It itemFirst, size_t max)
Definition: blockingconcurrentqueue.h:680
BlockingConcurrentQueue(BlockingConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:449
BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
Definition: blockingconcurrentqueue.h:423
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: blockingconcurrentqueue.h:391
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: blockingconcurrentqueue.h:621
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: blockingconcurrentqueue.h:389
bool wait_dequeue_timed(U &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: blockingconcurrentqueue.h:742
size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:815
BlockingConcurrentQueue & operator=(BlockingConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
size_t size_approx() const
Definition: blockingconcurrentqueue.h:895
BlockingConcurrentQueue & operator=(BlockingConcurrentQueue &&other) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:453
BlockingConcurrentQueue(size_t capacity=6 *BLOCK_SIZE)
Definition: blockingconcurrentqueue.h:410
std::make_signed< size_t >::type ssize_t
Definition: blockingconcurrentqueue.h:384
static const size_t BLOCK_SIZE
Definition: blockingconcurrentqueue.h:386
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: blockingconcurrentqueue.h:393
bool enqueue(producer_token_t const &token, T &&item)
Definition: blockingconcurrentqueue.h:523
bool try_dequeue(U &item)
Definition: blockingconcurrentqueue.h:649
static const size_t MAX_SUBQUEUE_SIZE
Definition: blockingconcurrentqueue.h:397
size_t wait_dequeue_bulk(It itemFirst, size_t max)
Definition: blockingconcurrentqueue.h:798
bool enqueue(T const &item)
Definition: blockingconcurrentqueue.h:485
bool enqueue_bulk(It itemFirst, size_t count)
Definition: blockingconcurrentqueue.h:539
bool enqueue(producer_token_t const &token, T const &item)
Definition: blockingconcurrentqueue.h:511
void wait_dequeue(consumer_token_t &token, U &item)
Definition: blockingconcurrentqueue.h:753
bool wait_dequeue_timed(U &item, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:727
static bool is_lock_free()
Definition: blockingconcurrentqueue.h:900
size_t wait_dequeue_bulk_timed(consumer_token_t &token, It itemFirst, size_t max, std::chrono::duration< Rep, Period > const &timeout)
Definition: blockingconcurrentqueue.h:882
bool wait_dequeue_timed(consumer_token_t &token, U &item, std::chrono::duration< Rep, Period > const &timeout)
Definition: blockingconcurrentqueue.h:784
bool enqueue(T &&item)
Definition: blockingconcurrentqueue.h:499
bool try_enqueue(producer_token_t const &token, T &&item)
Definition: blockingconcurrentqueue.h:605
bool try_enqueue(T const &item)
Definition: blockingconcurrentqueue.h:570
bool try_enqueue(producer_token_t const &token, T const &item)
Definition: blockingconcurrentqueue.h:594
ConcurrentQueue::producer_token_t producer_token_t
Definition: blockingconcurrentqueue.h:379
ConcurrentQueue::size_t size_t
Definition: blockingconcurrentqueue.h:383
size_t wait_dequeue_bulk_timed(consumer_token_t &token, It itemFirst, size_t max, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:864
ConcurrentQueue::index_t index_t
Definition: blockingconcurrentqueue.h:382
size_t wait_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
Definition: blockingconcurrentqueue.h:846
bool try_dequeue(consumer_token_t &token, U &item)
Definition: blockingconcurrentqueue.h:664
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: blockingconcurrentqueue.h:387
BlockingConcurrentQueue(BlockingConcurrentQueue const &) MOODYCAMEL_DELETE_FUNCTION
size_t try_dequeue_bulk(consumer_token_t &token, It itemFirst, size_t max)
Definition: blockingconcurrentqueue.h:696
Definition: concurrentqueue.h:802
bool try_dequeue(U &item)
Definition: concurrentqueue.h:1197
static const size_t MAX_SUBQUEUE_SIZE
Definition: concurrentqueue.h:828
static const size_t INITIAL_IMPLICIT_PRODUCER_HASH_SIZE
Definition: concurrentqueue.h:817
bool enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1118
void swap(ConcurrentQueue &other) MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:1038
static const std::uint32_t EXPLICIT_CONSUMER_CONSUMPTION_QUOTA_BEFORE_ROTATE
Definition: concurrentqueue.h:819
static const size_t IMPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:815
bool try_enqueue(T const &item)
Definition: concurrentqueue.h:1139
static const size_t EXPLICIT_BLOCK_EMPTY_COUNTER_THRESHOLD
Definition: concurrentqueue.h:811
Traits::size_t size_t
Definition: concurrentqueue.h:808
bool enqueue(T const &item)
Definition: concurrentqueue.h:1078
static bool is_lock_free()
Definition: concurrentqueue.h:1418
Traits::index_t index_t
Definition: concurrentqueue.h:807
static const size_t EXPLICIT_INITIAL_INDEX_SIZE
Definition: concurrentqueue.h:813
static const size_t BLOCK_SIZE
Definition: concurrentqueue.h:810
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1176
Definition: blockingconcurrentqueue.h:216
ssize_t availableApprox() const
Definition: blockingconcurrentqueue.h:361
ssize_t tryWaitMany(ssize_t max)
Definition: blockingconcurrentqueue.h:324
bool wait(std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:319
void wait()
Definition: blockingconcurrentqueue.h:315
std::make_signed< std::size_t >::type ssize_t
Definition: blockingconcurrentqueue.h:218
ssize_t waitMany(ssize_t max, std::int64_t timeout_usecs)
Definition: blockingconcurrentqueue.h:338
ssize_t waitMany(ssize_t max)
Definition: blockingconcurrentqueue.h:346
LightweightSemaphore(ssize_t initialCount=0)
Definition: blockingconcurrentqueue.h:300
bool tryWait()
Definition: blockingconcurrentqueue.h:304
void signal(ssize_t count=1)
Definition: blockingconcurrentqueue.h:352
#define MOODYCAMEL_NOEXCEPT
Definition: concurrentqueue.h:252
#define MOODYCAMEL_THROW(expr)
Definition: concurrentqueue.h:203
#define MOODYCAMEL_DELETE_FUNCTION
Definition: concurrentqueue.h:289
static bool() likely(bool x)
Definition: concurrentqueue.h:302
Definition: atomicops.h:69
void swap(BlockingConcurrentQueue< T, Traits > &a, BlockingConcurrentQueue< T, Traits > &b) MOODYCAMEL_NOEXCEPT
Definition: blockingconcurrentqueue.h:929
@ memory_order_acquire
Definition: atomicops.h:73
@ memory_order_relaxed
Definition: atomicops.h:72
@ memory_order_release
Definition: atomicops.h:74
Definition: concurrentqueue.h:748
Definition: concurrentqueue.h:687