JODA  0.13.1 (59b41972)
JSON On-Demand Analysis
Queue.h
Go to the documentation of this file.
1 //
2 // Created by Nico on 22/02/2019.
3 //
4 
5 #ifndef JODA_QUEUE_H
6 #define JODA_QUEUE_H
7 #include <glog/logging.h>
9 #include <cstdint>
10 
11 #define JODA_FLAG_T uint32_t
13 // Basic Queue
15 
16 #define JODA_QUEUE_WAIT_TIME 1ms
17 template <class payload_t, JODA_FLAG_T flag = 0,
18  class queue_trait = moodycamel::ConcurrentQueueDefaultTraits>
23 
24  JODA_SHARED_QUEUE(size_t minCapacity, size_t maxExplicitProducers,
25  size_t maxImplicit = 0)
26  : queue(minCapacity, maxExplicitProducers, maxImplicit) {}
28 
31 
32  constexpr auto getFlag() { return flag; };
33 
35  std::atomic<unsigned long> inQueue{};
36  std::atomic<unsigned long> registered{};
37  std::atomic<unsigned long> finished{};
38  std::atomic<size_t> added{};
39  std::atomic<size_t> removed{};
40  std::atomic<bool> finishedWriting{};
41 
42  bool isFinished() {
45  }
46 
47  std::pair<size_t, size_t> getStatistics() {
48  std::pair<size_t, size_t> ret = {added.load(std::memory_order_acquire),
50  return ret;
51  };
52 
54  auto i = registered.fetch_add(1);
55  DLOG(INFO) << "Registered " << i + 1 << "th producer in queue " << flag;
56  DCHECK(i >= 0);
57  }
58 
60  auto i = registered.fetch_sub(1);
61  DLOG(INFO) << "Unregistered " << i << "th producer in queue " << flag;
62  DCHECK(i >= 0);
63  }
64 
66  auto i = finished.fetch_add(1, std::memory_order_release);
67  DLOG(INFO) << i + 1 << " Producer finished in queue " << flag;
68  if (finished.load(std::memory_order_acquire) >= registered.load()) {
70  DLOG(INFO) << "Queue " << flag << " finished";
71  }
72  }
73 
74  void send(typename queue_t::producer_token_t& ptok, payload_t&& e) {
75  using namespace std::chrono_literals;
76  auto te = std::move(e);
77  while (!queue.try_enqueue(ptok, std::move(te))) {
78  std::this_thread::sleep_for(JODA_QUEUE_WAIT_TIME);
79  }
80  inQueue.fetch_add(1, std::memory_order_release);
81  added.fetch_add(1, std::memory_order_relaxed);
82  }
83 
84  void send(payload_t&& e) {
85  using namespace std::chrono_literals;
86  auto te = std::move(e);
87  while (!queue.try_enqueue(std::move(te))) {
88  std::this_thread::sleep_for(JODA_QUEUE_WAIT_TIME);
89  }
90  inQueue.fetch_add(1, std::memory_order_release);
91  added.fetch_add(1, std::memory_order_relaxed);
92  }
93 
94  template <typename It>
95  void send(typename queue_t::producer_token_t& ptok, It e, size_t count) {
96  using namespace std::chrono_literals;
97  while (!queue.try_enqueue_bulk(ptok, e, count)) {
98  std::this_thread::sleep_for(JODA_QUEUE_WAIT_TIME);
99  }
100  inQueue.fetch_add(count, std::memory_order_release);
101  added.fetch_add(count, std::memory_order_relaxed);
102  }
103 
104  template <typename It>
105  void send(It e, size_t count) {
106  using namespace std::chrono_literals;
107  while (!queue.try_enqueue_bulk(e, count)) {
108  std::this_thread::sleep_for(JODA_QUEUE_WAIT_TIME);
109  }
110  inQueue.fetch_add(count, std::memory_order_release);
111  added.fetch_add(count, std::memory_order_relaxed);
112  }
113 
114  void retrieve(payload_t& e) {
115  using namespace std::chrono_literals;
116  while (!queue.try_dequeue(e)) {
117  std::this_thread::sleep_for(JODA_QUEUE_WAIT_TIME);
118  if (isFinished()) return;
119  }
120  inQueue.fetch_sub(1, std::memory_order_release);
121  removed.fetch_add(1, std::memory_order_relaxed);
122  }
123 
124  void retrieve(typename queue_t::consumer_token_t& ctok, payload_t& e) {
125  using namespace std::chrono_literals;
126  while (!queue.try_dequeue(ctok, e)) {
127  std::this_thread::sleep_for(JODA_QUEUE_WAIT_TIME);
128  if (isFinished()) return;
129  }
130  inQueue.fetch_sub(1, std::memory_order_release);
131  removed.fetch_add(1, std::memory_order_relaxed);
132  }
133 
134  template <typename It>
135  size_t retrieve(It e, size_t count) {
136  auto i = queue.try_dequeue_bulk(e, count);
137  if (i > 0) {
138  inQueue.fetch_sub(i, std::memory_order_release);
139  removed.fetch_add(i, std::memory_order_relaxed);
140  }
141  return i;
142  }
143  template <typename It>
144  size_t retrieve(typename queue_t::consumer_token_t& ctok, It e,
145  size_t count) {
146  auto i = queue.try_dequeue_bulk(ctok, e, count);
147  if (i > 0) {
148  inQueue.fetch_sub(i, std::memory_order_release);
149  removed.fetch_add(i, std::memory_order_relaxed);
150  }
151  return i;
152  }
153 };
154 
156  typedef bool payload_t;
158 
159  static std::unique_ptr<queue_t> getQueue() {
160  return std::make_unique<queue_t>();
161  }
162  static std::unique_ptr<queue_t> getQueue(size_t minCapacity,
163  size_t maxProducers) {
164  return std::make_unique<queue_t>(minCapacity, maxProducers);
165  }
166 };
167 
169 
170 #endif // JODA_QUEUE_H
#define JODA_QUEUE_WAIT_TIME
Definition: Queue.h:16
JODA_NULL_QUEUE NullQueue
Definition: Queue.h:168
#define JODA_FLAG_T
Definition: Queue.h:11
Definition: concurrentqueue.h:802
bool try_dequeue(U &item)
Definition: concurrentqueue.h:1197
size_t try_dequeue_bulk(It itemFirst, size_t max)
Definition: concurrentqueue.h:1313
bool try_enqueue(T const &item)
Definition: concurrentqueue.h:1139
bool try_enqueue_bulk(It itemFirst, size_t count)
Definition: concurrentqueue.h:1176
@ memory_order_acquire
Definition: atomicops.h:73
@ memory_order_relaxed
Definition: atomicops.h:72
@ memory_order_release
Definition: atomicops.h:74
Definition: Queue.h:155
static std::unique_ptr< queue_t > getQueue()
Definition: Queue.h:159
bool payload_t
Definition: Queue.h:156
static std::unique_ptr< queue_t > getQueue(size_t minCapacity, size_t maxProducers)
Definition: Queue.h:162
Definition: Queue.h:19
JODA_SHARED_QUEUE(JODA_SHARED_QUEUE &c)=delete
void send(typename queue_t::producer_token_t &ptok, It e, size_t count)
Definition: Queue.h:95
std::atomic< unsigned long > finished
Definition: Queue.h:37
void retrieve(typename queue_t::consumer_token_t &ctok, payload_t &e)
Definition: Queue.h:124
queue_t::producer_token_t ptok_t
Definition: Queue.h:21
queue_t queue
Definition: Queue.h:32
void producerFinished()
Definition: Queue.h:65
constexpr auto getFlag()
Definition: Queue.h:32
void send(It e, size_t count)
Definition: Queue.h:105
std::atomic< unsigned long > inQueue
Definition: Queue.h:35
queue_t::consumer_token_t ctok_t
Definition: Queue.h:22
void retrieve(payload_t &e)
Definition: Queue.h:114
std::atomic< unsigned long > registered
Definition: Queue.h:36
size_t retrieve(typename queue_t::consumer_token_t &ctok, It e, size_t count)
Definition: Queue.h:144
moodycamel::ConcurrentQueue< payload_t, queue_trait > queue_t
Definition: Queue.h:20
JODA_SHARED_QUEUE(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicit=0)
Definition: Queue.h:24
bool isFinished()
Definition: Queue.h:42
std::atomic< bool > finishedWriting
Definition: Queue.h:40
std::pair< size_t, size_t > getStatistics()
Definition: Queue.h:47
JODA_SHARED_QUEUE(JODA_SHARED_QUEUE &&c)=delete
std::atomic< size_t > added
Definition: Queue.h:38
void registerProducer()
Definition: Queue.h:53
void send(payload_t &&e)
Definition: Queue.h:84
size_t retrieve(It e, size_t count)
Definition: Queue.h:135
void unregisterProducer()
Definition: Queue.h:59
std::atomic< size_t > removed
Definition: Queue.h:39
JODA_SHARED_QUEUE()
Definition: Queue.h:27
void send(typename queue_t::producer_token_t &ptok, payload_t &&e)
Definition: Queue.h:74
Definition: concurrentqueue.h:349
Definition: concurrentqueue.h:748
Definition: concurrentqueue.h:687