JODA  0.13.1 (59b41972)
JSON On-Demand Analysis
IOThreadPool.h
Go to the documentation of this file.
1 //
2 // Created by Nico on 28/01/2019.
3 //
4 
5 #ifndef JODA_IOTHREADPOOL_H
6 #define JODA_IOTHREADPOOL_H
7 
8 #include <algorithm>
9 #include <future>
10 #include <vector>
11 
12 #include "IThreadUser.h"
13 
17 template <class Worker>
18 class IOThreadPool : public IThreadUser {
19  public:
20  /*
21  * Types
22  */
23  typedef typename Worker::IQueue IQueue;
24  typedef typename Worker::OQueue OQueue;
25  typedef typename Worker::IPayload IPayload;
26  typedef typename Worker::OPayload OPayload;
27  typedef typename Worker::WConf WConf;
28 
29  /*
30  * Constructors
31  */
32  virtual ~IOThreadPool() = default;
33 
42  explicit IOThreadPool(IQueue* iqueue, OQueue* oqueue, WConf conf)
44  iqueue(iqueue),
45  oqueue(oqueue),
46  workerConfig(conf) {
47  while (threads.size() < getMaxThreads()) {
48  addThread();
49  if (!threads.back()->isRunning()) break;
50  }
51  }
52 
61  explicit IOThreadPool(IQueue* iqueue, OQueue* oqueue, size_t maxThreads,
62  WConf conf)
64  iqueue(iqueue),
65  oqueue(oqueue),
66  workerConfig(conf) {
67  while (threads.size() < getMaxThreads()) {
68  addThread();
69  if (!threads.back()->isRunning()) break;
70  }
71  }
72 
77  void forceThreads(size_t threads) override {
78  while (threads < getUsedThreads()) {
79  addThread();
80  }
81  while (threads > getUsedThreads()) {
82  removeThread();
83  }
84  }
85 
90  size_t getUsedThreads() const override {
91  size_t count = 0;
92  for (const auto& thread : threads) {
93  if (!thread->requestedToStop()) {
94  count++;
95  }
96  }
97  return count;
98  }
99 
103  void wait() {
104  for (auto& thread : threads) {
105  thread->waitFor();
106  }
107  clearThreads();
108  }
109 
114  size_t getMaxThreads() const override { return IThreadUser::getMaxThreads(); }
115 
120  void setMaxThreads(size_t maxThreads) override {
122  }
123 
128  size_t recommendedThreads() const override {
129  return Worker::recommendedThreads();
130  }
131 
137  for (const auto& item : threads) {
138  if (item->running()) return true;
139  }
140  return false;
141  }
142 
143  void setWorkerConfig(const WConf& workerConfig) {
144  IOThreadPool::workerConfig = workerConfig;
145  }
146 
147  protected:
148  private:
149  std::vector<std::unique_ptr<Worker>> threads{};
150  IQueue* iqueue;
151  OQueue* oqueue;
152  WConf workerConfig;
153 
154  void clearThreads() {
155  threads.erase(std::remove_if(threads.begin(), threads.end(),
156  [](const std::unique_ptr<Worker>& o) {
157  return !o->isRunning();
158  }),
159  threads.end());
160  }
161  void removeThread() {
162  clearThreads();
163  this->threads.back()->stop();
164  }
165 
166  void addThread() {
167  clearThreads();
168  this->threads.emplace_back(
169  std::make_unique<Worker>(iqueue, oqueue, workerConfig));
170  this->threads.back()->start();
171  }
172 };
173 
174 /*
175  * Thread
176  */
177 
181 template <class IQueueStruct, class OQueueStruct, class WConfig>
183  public:
184  /*
185  * Types
186  */
187 
188  typedef typename IQueueStruct::queue_t IQueue;
189  typedef typename OQueueStruct::queue_t OQueue;
190  typedef typename IQueueStruct::payload_t IPayload;
191  typedef typename OQueueStruct::payload_t OPayload;
192  typedef WConfig WConf;
193 
194  static constexpr auto getIQueueFlags() { return IQueueStruct::getFlag(); };
195 
196  static constexpr auto getOQueueFlags() { return OQueueStruct::getFlag(); };
197 
199  : iqueue(iqueue), oqueue(oqueue), conf(conf){};
201  : iqueue(iqueue), oqueue(oqueue), conf(std::move(conf)){};
204  IWorkerThread(const IWorkerThread&) = delete;
206  virtual ~IWorkerThread() { fut.wait(); };
207 
208  void start() {
209  fut = std::async(std::launch::async, [this] { return this->run(); });
210  }
211  void waitFor() { fut.wait(); };
212  void stop() { shouldRun = false; };
213  bool requestedToStop() const { return shouldRun; };
214  bool isRunning() const { return running; };
215  static const size_t recommendedThreads() { return 1; }
216 
217  protected:
221  bool shouldRun = true;
222  virtual void work() = 0;
223 
224  private:
225  void run() {
226  this->work();
227  running = false;
228  };
229 
230  std::future<void> fut;
231 
232  bool running = true;
233 };
234 
235 #endif // JODA_IOTHREADPOOL_H
Definition: IOThreadPool.h:18
virtual ~IOThreadPool()=default
size_t recommendedThreads() const override
Definition: IOThreadPool.h:128
bool hasRunningThreads()
Definition: IOThreadPool.h:136
size_t getUsedThreads() const override
Definition: IOThreadPool.h:90
size_t getMaxThreads() const override
Definition: IOThreadPool.h:114
IOThreadPool(IQueue *iqueue, OQueue *oqueue, size_t maxThreads, WConf conf)
Definition: IOThreadPool.h:61
void setWorkerConfig(const WConf &workerConfig)
Definition: IOThreadPool.h:143
IOThreadPool(IQueue *iqueue, OQueue *oqueue, WConf conf)
Definition: IOThreadPool.h:42
void forceThreads(size_t threads) override
Definition: IOThreadPool.h:77
Worker::WConf WConf
Definition: IOThreadPool.h:27
void setMaxThreads(size_t maxThreads) override
Definition: IOThreadPool.h:120
Worker::OPayload OPayload
Definition: IOThreadPool.h:26
Worker::OQueue OQueue
Definition: IOThreadPool.h:24
void wait()
Definition: IOThreadPool.h:103
Worker::IPayload IPayload
Definition: IOThreadPool.h:25
Worker::IQueue IQueue
Definition: IOThreadPool.h:23
Definition: IThreadUser.h:13
virtual void setMaxThreads(size_t maxThreads)
Definition: IThreadUser.h:23
virtual size_t getMaxThreads() const
Definition: IThreadUser.h:21
size_t maxThreads
Definition: IThreadUser.h:30
Definition: IOThreadPool.h:182
IQueue * iqueue
Definition: IOThreadPool.h:218
IWorkerThread(IQueue *iqueue, OQueue *oqueue, WConf &&conf)
Definition: IOThreadPool.h:200
bool shouldRun
Definition: IOThreadPool.h:221
static constexpr auto getOQueueFlags()
Definition: IOThreadPool.h:196
virtual void work()=0
static constexpr auto getIQueueFlags()
Definition: IOThreadPool.h:194
IWorkerThread(IWorkerThread &&)=default
OQueue * oqueue
Definition: IOThreadPool.h:219
void stop()
Definition: IOThreadPool.h:212
static const size_t recommendedThreads()
Definition: IOThreadPool.h:215
WConf conf
Definition: IOThreadPool.h:220
IWorkerThread & operator=(const IWorkerThread &)=delete
void waitFor()
Definition: IOThreadPool.h:211
WConfig WConf
Definition: IOThreadPool.h:192
bool requestedToStop() const
Definition: IOThreadPool.h:213
OQueueStruct::queue_t OQueue
Definition: IOThreadPool.h:189
IWorkerThread(const IWorkerThread &)=delete
OQueueStruct::payload_t OPayload
Definition: IOThreadPool.h:191
IWorkerThread(IQueue *iqueue, OQueue *oqueue, const WConf &conf)
Definition: IOThreadPool.h:198
virtual ~IWorkerThread()
Definition: IOThreadPool.h:206
void start()
Definition: IOThreadPool.h:208
IQueueStruct::queue_t IQueue
Definition: IOThreadPool.h:188
bool isRunning() const
Definition: IOThreadPool.h:214
IQueueStruct::payload_t IPayload
Definition: IOThreadPool.h:190
IWorkerThread & operator=(IWorkerThread &&)=default