JODA  0.13.1 (59b41972)
JSON On-Demand Analysis
JSONTextParser.h
Go to the documentation of this file.
1 //
2 // Created by Nico on 27/11/2018.
3 //
4 
5 #ifndef JODA_JSONTEXTPARSER_H
6 #define JODA_JSONTEXTPARSER_H
7 
9 #include <joda/config/config.h>
13 #include <rapidjson/error/en.h>
15 
16 namespace joda::docparsing {
23 template <class Scheduler = DefaultContainerScheduler<false>>
25  : public IWorkerThread<JsonTextParserQueue, JsonContainerQueue, size_t> {
26  public:
27  JSONTextParser(IQueue *iqueue, OQueue *oqueue, size_t sourceSize)
28  : IWorkerThread(iqueue, oqueue, sourceSize), sourceSize(this->conf) {
29  oqueue->registerProducer();
30  DLOG(INFO) << "Started JSONTextParser";
31  }
32 
33  ~JSONTextParser() override {
34  oqueue->unregisterProducer();
35  DLOG(INFO) << "Stopped JSONTextParser";
36  };
37 
38  static const size_t recommendedThreads() {
40  };
41 
42  protected:
43  void work() override {
44  auto tok = IQueue::ctok_t(iqueue->queue);
45  // auto ptok = OQueue::ptok_t(oqueue->queue);
46  std::vector<IPayload> buff;
47  buff.reserve(config::parse_bulk_size);
48  Scheduler sched(oqueue, sourceSize);
49  while (shouldRun) {
50  if (!iqueue->isFinished()) {
51  DCHECK(buff.empty()) << "Buffer should be empty before receiving";
52  auto count =
53  iqueue->retrieve(tok, std::back_inserter(buff), buff.capacity());
54  DCHECK_EQ(count, buff.size()) << "# of elements in buffer and # of "
55  "returned elements should match";
56  if (count == 0) continue;
57 
58  for (auto &text : buff) {
59  if (text.first == nullptr) {
60  LOG(WARNING) << "Got empty Origin for document";
61  continue;
62  }
63  if (text.second.empty()) {
64  LOG(WARNING) << "Got empty line for document";
65  continue;
66  }
67 
68  // Parse line
69  auto cont = sched.getContainerForDoc(text.second);
70  auto doc = sched.getNewDoc(cont);
71  doc->Parse(text.second.c_str());
72  if (doc->HasParseError()) {
73  LOG(WARNING) << std::string(rapidjson::GetParseError_En(
74  doc->GetParseError()))
75  << " with Origin: " << text.first->toString();
76 
77  continue;
78  }
79  // Insert doc in container
80  sched.scheduleDocument(cont, std::move(doc), std::move(text.first),
81  text.second.size());
82  }
83  buff.clear();
84  } else {
85  shouldRun = false;
86  }
87  }
88  sched.finalize();
89  };
90 
91  private:
92  size_t sourceSize;
93 };
94 } // namespace joda::docparsing
95 
96 #endif // JODA_JSONTEXTPARSER_H
Definition: IOThreadPool.h:182
OQueueStruct::queue_t OQueue
Definition: IOThreadPool.h:189
IQueueStruct::queue_t IQueue
Definition: IOThreadPool.h:188
static size_t parse_bulk_size
Definition: config.h:61
static size_t storageRetrievalThreads
Definition: config.h:54
Definition: JSONTextParser.h:25
static const size_t recommendedThreads()
Definition: JSONTextParser.h:38
void work() override
Definition: JSONTextParser.h:43
JSONTextParser(IQueue *iqueue, OQueue *oqueue, size_t sourceSize)
Definition: JSONTextParser.h:27
~JSONTextParser() override
Definition: JSONTextParser.h:33
Definition: IImportSource.h:12