JODA  0.13.1 (59b41972)
JSON On-Demand Analysis
JSONMetaParser.h
Go to the documentation of this file.
1 //
2 // Created by Nico on 27/11/2018.
3 //
4 
5 #ifndef JODA_JSONMETAPARSER_H
6 #define JODA_JSONMETAPARSER_H
7 
13 
14 namespace joda::docparsing {
22 template <class Scheduler = DefaultContainerScheduler<true>>
24  : public IWorkerThread<JsonTextParserQueue, JsonContainerQueue, size_t> {
25  public:
26  JSONMetaParser(IQueue *iqueue, OQueue *oqueue, size_t contSize)
27  : IWorkerThread(iqueue, oqueue, contSize), contSize(this->conf) {
28  oqueue->registerProducer();
29  DLOG(INFO) << "Started JSONMetaParser";
30  };
31 
32  ~JSONMetaParser() override {
33  oqueue->unregisterProducer();
34  DLOG(INFO) << "Stopped JSONMetaParser";
35  };
36 
37  protected:
38  void work() override {
39  auto tok = IQueue::ctok_t(iqueue->queue);
40  // auto ptok = OQueue::ptok_t(oqueue->queue);
41  std::vector<IPayload> buff;
42  buff.reserve(config::parse_bulk_size);
43  Scheduler sched(oqueue, contSize);
44  while (shouldRun) {
45  if (!iqueue->isFinished()) {
46  DCHECK(buff.empty()) << "Buffer should be empty before receiving";
47  auto count =
48  iqueue->retrieve(tok, std::back_inserter(buff), buff.capacity());
49  DCHECK_EQ(count, buff.size()) << "# of elements in buffer and # of "
50  "returned elements should match";
51  if (count == 0) continue;
52 
53  for (auto &text : buff) {
54  if (text.first == nullptr) {
55  LOG(WARNING) << "Got empty Origin for document";
56  continue;
57  }
58  if (text.second.empty()) {
59  LOG(WARNING) << "Got empty line for document";
60  continue;
61  }
62 
63  // Insert doc in container
64  auto cont = sched.getContainerForDoc(text.second);
65  sched.scheduleDocument(cont, nullptr, std::move(text.first),
66  text.second.size());
67  }
68  buff.clear();
69  } else {
70  shouldRun = false;
71  }
72  }
73  sched.finalize();
74  };
75 
76  private:
77  size_t contSize;
78 };
79 } // namespace joda::docparsing
80 
81 #endif // JODA_JSONMETAPARSER_H
Definition: IOThreadPool.h:182
OQueueStruct::queue_t OQueue
Definition: IOThreadPool.h:189
IQueueStruct::queue_t IQueue
Definition: IOThreadPool.h:188
static size_t parse_bulk_size
Definition: config.h:61
Definition: JSONMetaParser.h:24
JSONMetaParser(IQueue *iqueue, OQueue *oqueue, size_t contSize)
Definition: JSONMetaParser.h:26
void work() override
Definition: JSONMetaParser.h:38
~JSONMetaParser() override
Definition: JSONMetaParser.h:32
Definition: IImportSource.h:12