5 #ifndef JODA_JSONTEXTPARSER_H
6 #define JODA_JSONTEXTPARSER_H
13 #include <rapidjson/error/en.h>
23 template <
class Scheduler = DefaultContainerScheduler<false>>
25 :
public IWorkerThread<JsonTextParserQueue, JsonContainerQueue, size_t> {
29 oqueue->registerProducer();
30 DLOG(INFO) <<
"Started JSONTextParser";
34 oqueue->unregisterProducer();
35 DLOG(INFO) <<
"Stopped JSONTextParser";
44 auto tok = IQueue::ctok_t(
iqueue->queue);
46 std::vector<IPayload> buff;
48 Scheduler sched(
oqueue, sourceSize);
50 if (!
iqueue->isFinished()) {
51 DCHECK(buff.empty()) <<
"Buffer should be empty before receiving";
53 iqueue->retrieve(tok, std::back_inserter(buff), buff.capacity());
54 DCHECK_EQ(count, buff.size()) <<
"# of elements in buffer and # of "
55 "returned elements should match";
56 if (count == 0)
continue;
58 for (
auto &text : buff) {
59 if (text.first ==
nullptr) {
60 LOG(WARNING) <<
"Got empty Origin for document";
63 if (text.second.empty()) {
64 LOG(WARNING) <<
"Got empty line for document";
69 auto cont = sched.getContainerForDoc(text.second);
70 auto doc = sched.getNewDoc(cont);
71 doc->Parse(text.second.c_str());
72 if (doc->HasParseError()) {
73 LOG(WARNING) << std::string(rapidjson::GetParseError_En(
74 doc->GetParseError()))
75 <<
" with Origin: " << text.first->toString();
80 sched.scheduleDocument(cont, std::move(doc), std::move(text.first),
Definition: IOThreadPool.h:182
IQueue * iqueue
Definition: IOThreadPool.h:218
bool shouldRun
Definition: IOThreadPool.h:221
OQueue * oqueue
Definition: IOThreadPool.h:219
WConf conf
Definition: IOThreadPool.h:220
OQueueStruct::queue_t OQueue
Definition: IOThreadPool.h:189
IQueueStruct::queue_t IQueue
Definition: IOThreadPool.h:188
static size_t parse_bulk_size
Definition: config.h:61
static size_t storageRetrievalThreads
Definition: config.h:54
Definition: JSONTextParser.h:25
static const size_t recommendedThreads()
Definition: JSONTextParser.h:38
void work() override
Definition: JSONTextParser.h:43
JSONTextParser(IQueue *iqueue, OQueue *oqueue, size_t sourceSize)
Definition: JSONTextParser.h:27
~JSONTextParser() override
Definition: JSONTextParser.h:33
Definition: IImportSource.h:12