JODA  0.13.1 (59b41972)
JSON On-Demand Analysis
JSONStreamParser.h
Go to the documentation of this file.
1 //
2 // Created by Nico on 09/05/2019.
3 //
4 
5 #ifndef JODA_JSONSTREAMPARSER_H
6 #define JODA_JSONSTREAMPARSER_H
7 
9 #include <joda/config/config.h>
13 #include <rapidjson/error/en.h>
14 
16 
17 namespace joda::docparsing {
24 template <class Scheduler = DefaultContainerScheduler<false>>
25 class JSONStreamParser : public IWorkerThread<JsonTextStreamParserQueue,
26  JsonContainerQueue, size_t> {
27  public:
28  JSONStreamParser(IQueue *iqueue, OQueue *oqueue, size_t sourceSize)
29  : IWorkerThread(iqueue, oqueue, sourceSize), sourceSize(this->conf) {
30  oqueue->registerProducer();
31  DLOG(INFO) << "Started JSONStreamParser";
32  };
33 
34  ~JSONStreamParser() override {
35  oqueue->unregisterProducer();
36  DLOG(INFO) << "Stopped JSONTextParser";
37  };
38 
39  static const size_t recommendedThreads() {
41  };
42 
43  protected:
44  void work() override {
45  auto tok = IQueue::ctok_t(iqueue->queue);
46  auto ptok = OQueue::ptok_t(oqueue->queue);
47 
48  Scheduler sched(oqueue, sourceSize);
49  while (shouldRun) {
50  if (!iqueue->isFinished()) {
51  IPayload stream;
52  iqueue->retrieve(tok, stream);
53  if (stream.first == nullptr) {
54  LOG(WARNING) << "Got empty stream origin";
55  continue;
56  }
57  if (stream.second == nullptr) {
58  LOG(WARNING) << "Got empty stream";
59  continue;
60  }
61 
62  size_t index = 0;
63  size_t begin, end = 0;
64  if (*stream.second) {
65  begin = stream.second->tellg();
66  end = begin;
67  } else {
68  LOG(WARNING) << "Stream closed prematurely";
69  continue;
70  }
71 
72  rapidjson::IStreamWrapper isw(*stream.second);
73  while (*stream.second) {
74  std::unique_ptr<RJDocument> doc;
75  typename Scheduler::ContainerIdentifier cont;
76  if (std::is_same<
77  Scheduler,
79  std::is_same<
80  Scheduler,
82  doc = std::move(sched.getNewDoc(0));
83  doc->ParseStream<rapidjson::kParseStopWhenDoneFlag>(isw);
84  if (doc->HasParseError()) {
85  if (doc->GetParseError() != rapidjson::kParseErrorDocumentEmpty)
86  LOG(WARNING) << std::string(rapidjson::GetParseError_En(
87  doc->GetParseError()))
88  << " from stream " << stream.first->getStreamName()
89  << " : " << begin << "-" << isw.Tell();
90 
91  continue;
92  }
93  } else {
94  auto tmpDoc = std::make_unique<RJDocument>();
95  tmpDoc->ParseStream<rapidjson::kParseStopWhenDoneFlag>(isw);
96  if (doc->HasParseError()) {
97  if (doc->GetParseError() != rapidjson::kParseErrorDocumentEmpty)
98  LOG(WARNING) << std::string(rapidjson::GetParseError_En(
99  doc->GetParseError()))
100  << " from stream " << stream.first->getStreamName()
101  << " : " << begin << "-" << isw.Tell();
102 
103  continue;
104  }
105  cont = sched.getContainerForDoc(*tmpDoc);
106  doc = std::move(sched.getNewDoc(cont));
107  doc->CopyFrom(*tmpDoc, doc->GetAllocator(), true);
108  }
109 
110  end = isw.Tell();
111 
112  auto ptr = stream.first->clone();
113  auto *tmp = dynamic_cast<IDPositionOrigin *>(ptr.get());
114  std::unique_ptr<IDPositionOrigin> orig;
115  if (tmp != nullptr) {
116  ptr.release();
117  orig.reset(tmp);
118  }
119  if (orig == nullptr) {
120  LOG(ERROR) << "Unsupported origin retrieved.";
121  continue;
122  }
123  orig->setStart(begin);
124  orig->setEnd(end);
125  orig->setIndex(index);
126  index++;
127 
128  sched.scheduleDocument(cont, std::move(doc), std::move(orig),
129  end - begin);
130  begin = end + 1;
131  }
132  if (stream.second->bad()) {
133  LOG(ERROR) << "Error streaming: " << strerror(errno);
134  }
135 
136  } else {
137  shouldRun = false;
138  }
139  }
140  sched.finalize();
141  };
142 
143  private:
144  size_t sourceSize;
145 };
146 } // namespace joda::docparsing
147 
148 #endif // JODA_JSONSTREAMPARSER_H
Definition: IDPositionOrigin.h:16
void setStart(long start)
Definition: IDPositionOrigin.cpp:16
Definition: IOThreadPool.h:182
OQueueStruct::queue_t OQueue
Definition: IOThreadPool.h:189
IQueueStruct::queue_t IQueue
Definition: IOThreadPool.h:188
IQueueStruct::payload_t IPayload
Definition: IOThreadPool.h:190
static size_t storageRetrievalThreads
Definition: config.h:54
Definition: DefaultContainerScheduler.h:19
Definition: JSONStreamParser.h:26
JSONStreamParser(IQueue *iqueue, OQueue *oqueue, size_t sourceSize)
Definition: JSONStreamParser.h:28
void work() override
Definition: JSONStreamParser.h:44
~JSONStreamParser() override
Definition: JSONStreamParser.h:34
static const size_t recommendedThreads()
Definition: JSONStreamParser.h:39
Definition: IImportSource.h:12