JODA  0.13.1 (59b41972)
JSON On-Demand Analysis
QueryThread.h
Go to the documentation of this file.
1 //
2 // Created by Nico Schäfer on 11/7/17.
3 //
4 
5 #ifndef JODA_QUERYTHREAD_H
6 #define JODA_QUERYTHREAD_H
7 
9 #include <joda/misc/Benchmark.h>
10 #include <joda/query/Query.h>
12 
14 
21  std::shared_ptr<joda::query::Query> q;
23  std::vector<std::unique_ptr<IQueryExecutor>> executors;
24  std::vector<std::unique_ptr<joda::query::IAggregator>> aggregators;
25  std::shared_ptr<JoinManager> joinManager;
27  std::string tmpdir;
28 
29  QueryThreadConfig() = default;
30 
31  QueryThreadConfig(std::shared_ptr<joda::query::Query> &q)
32  : q(q),
33  bench(nullptr),
34  executors(),
35  aggregators(),
36  joinManager(q->getStoreJoinManager()) {
37  for (const auto &agg : this->q->getAggregators()) {
38  aggregators.emplace_back(agg->duplicate());
39  }
40  }
41 
43  : q(o.q),
44  bench(o.bench),
45  executors(),
46  aggregators(),
48  aggQueue(o.aggQueue),
49  tmpdir(o.tmpdir) {
50  for (const auto &exec : o.executors) {
51  executors.emplace_back(exec->duplicate());
52  }
53  for (const auto &agg : o.aggregators) {
54  aggregators.emplace_back(agg->duplicate());
55  }
56  }
57 
58  void addExecutor(std::unique_ptr<IQueryExecutor> &&exec) {
59  executors.emplace_back(std::move(exec));
60  }
61 };
62 
70  : public IWorkerThread<JsonContainerRefQueue, JsonContainerQueue,
71  QueryThreadConfig> {
72  public:
77  ~QueryThread() override;
78 
79  typedef IPayload ContRef;
81 
86  const std::string getThreadID() const;
87 
88  /*
89  * Aggregation
90  */
91 
96  bool hasAggregators() const;
97 
98  protected:
99  bool hasToProject() const;
100  bool canCreateView() const;
101 
102  /*
103  * Functions
104  */
110  std::shared_ptr<const DocIndex> select(JSONContainer &cont) const;
111 
122  std::vector<std::unique_ptr<RJDocument>> defaultProject(
123  JSONContainer &cont, const DocIndex &ids, RJMemoryPoolAlloc &alloc) const;
124 
130  void aggregate(const std::vector<RapidJsonDocument> &docs) const;
131 
139  void aggregate(ContRef pipelineCont,
140  const std::shared_ptr<const DocIndex> &selectResult,
141  bool isSelected) const;
142 
143  void work() override;
144 
145  void logTimers() const;
153 };
154 
155 #endif // JODA_QUERYTHREAD_H
std::vector< bool > DocIndex
Definition: JSONContainer.h:31
rapidjson::MemoryPoolAllocator< RJBaseAlloc > RJMemoryPoolAlloc
Definition: RJFwd.h:26
Definition: Benchmark.h:27
Definition: IOThreadPool.h:182
OQueueStruct::queue_t OQueue
Definition: IOThreadPool.h:189
OQueueStruct::payload_t OPayload
Definition: IOThreadPool.h:191
IQueueStruct::queue_t IQueue
Definition: IOThreadPool.h:188
IQueueStruct::payload_t IPayload
Definition: IOThreadPool.h:190
Definition: JSONContainer.h:37
Definition: QueryThread.h:71
bool canCreateView() const
Definition: QueryThread.cpp:334
const std::string getThreadID() const
Definition: QueryThread.cpp:275
std::shared_ptr< const DocIndex > select(JSONContainer &cont) const
Definition: QueryThread.cpp:281
bool hasAggregators() const
Definition: QueryThread.cpp:332
OPayload OwnedCont
Definition: QueryThread.h:80
RecurringTimer bloom_timer
Definition: QueryThread.h:146
RecurringTimer copy_timer
Definition: QueryThread.h:150
std::vector< std::unique_ptr< RJDocument > > defaultProject(JSONContainer &cont, const DocIndex &ids, RJMemoryPoolAlloc &alloc) const
Definition: QueryThread.cpp:316
void work() override
Definition: QueryThread.cpp:35
RecurringTimer project_timer
Definition: QueryThread.h:148
bool hasToProject() const
Definition: QueryThread.cpp:338
IPayload ContRef
Definition: QueryThread.h:79
void logTimers() const
Definition: QueryThread.cpp:348
~QueryThread() override
Definition: QueryThread.cpp:25
void aggregate(const std::vector< RapidJsonDocument > &docs) const
Definition: QueryThread.cpp:266
QueryThread(IQueue *iqueue, OQueue *oqueue, WConf &conf)
Definition: QueryThread.cpp:13
RecurringTimer select_timer
Definition: QueryThread.h:147
RecurringTimer aggregate_timer
Definition: QueryThread.h:149
RecurringTimer serialize_timer
Definition: QueryThread.h:151
RecurringTimer sample_view_cost_timer
Definition: QueryThread.h:152
Definition: RecurringTimer.h:14
Definition: Queue.h:19
Definition: QueryThread.h:20
std::string tmpdir
Definition: QueryThread.h:27
std::shared_ptr< joda::query::Query > q
Definition: QueryThread.h:21
QueryThreadConfig()=default
std::vector< std::unique_ptr< IQueryExecutor > > executors
Definition: QueryThread.h:23
joda::query::AggregatorQueue::queue_t * aggQueue
Definition: QueryThread.h:26
QueryThreadConfig(const QueryThreadConfig &o)
Definition: QueryThread.h:42
void addExecutor(std::unique_ptr< IQueryExecutor > &&exec)
Definition: QueryThread.h:58
QueryThreadConfig(std::shared_ptr< joda::query::Query > &q)
Definition: QueryThread.h:31
Benchmark * bench
Definition: QueryThread.h:22
std::shared_ptr< JoinManager > joinManager
Definition: QueryThread.h:25
std::vector< std::unique_ptr< joda::query::IAggregator > > aggregators
Definition: QueryThread.h:24