5 #ifndef JODA_JSONMETAPARSER_H
6 #define JODA_JSONMETAPARSER_H
22 template <
class Scheduler = DefaultContainerScheduler<true>>
24 :
public IWorkerThread<JsonTextParserQueue, JsonContainerQueue, size_t> {
28 oqueue->registerProducer();
29 DLOG(INFO) <<
"Started JSONMetaParser";
33 oqueue->unregisterProducer();
34 DLOG(INFO) <<
"Stopped JSONMetaParser";
39 auto tok = IQueue::ctok_t(
iqueue->queue);
41 std::vector<IPayload> buff;
43 Scheduler sched(
oqueue, contSize);
45 if (!
iqueue->isFinished()) {
46 DCHECK(buff.empty()) <<
"Buffer should be empty before receiving";
48 iqueue->retrieve(tok, std::back_inserter(buff), buff.capacity());
49 DCHECK_EQ(count, buff.size()) <<
"# of elements in buffer and # of "
50 "returned elements should match";
51 if (count == 0)
continue;
53 for (
auto &text : buff) {
54 if (text.first ==
nullptr) {
55 LOG(WARNING) <<
"Got empty Origin for document";
58 if (text.second.empty()) {
59 LOG(WARNING) <<
"Got empty line for document";
64 auto cont = sched.getContainerForDoc(text.second);
65 sched.scheduleDocument(cont,
nullptr, std::move(text.first),
Definition: IOThreadPool.h:182
IQueue * iqueue
Definition: IOThreadPool.h:218
bool shouldRun
Definition: IOThreadPool.h:221
OQueue * oqueue
Definition: IOThreadPool.h:219
WConf conf
Definition: IOThreadPool.h:220
OQueueStruct::queue_t OQueue
Definition: IOThreadPool.h:189
IQueueStruct::queue_t IQueue
Definition: IOThreadPool.h:188
static size_t parse_bulk_size
Definition: config.h:61
Definition: IImportSource.h:12