5 #ifndef JODA_JSONSTREAMPARSER_H
6 #define JODA_JSONSTREAMPARSER_H
13 #include <rapidjson/error/en.h>
24 template <
class Scheduler = DefaultContainerScheduler<false>>
26 JsonContainerQueue, size_t> {
30 oqueue->registerProducer();
31 DLOG(INFO) <<
"Started JSONStreamParser";
35 oqueue->unregisterProducer();
36 DLOG(INFO) <<
"Stopped JSONTextParser";
45 auto tok = IQueue::ctok_t(
iqueue->queue);
46 auto ptok = OQueue::ptok_t(
oqueue->queue);
48 Scheduler sched(
oqueue, sourceSize);
50 if (!
iqueue->isFinished()) {
52 iqueue->retrieve(tok, stream);
53 if (stream.first ==
nullptr) {
54 LOG(WARNING) <<
"Got empty stream origin";
57 if (stream.second ==
nullptr) {
58 LOG(WARNING) <<
"Got empty stream";
63 size_t begin, end = 0;
65 begin = stream.second->tellg();
68 LOG(WARNING) <<
"Stream closed prematurely";
72 rapidjson::IStreamWrapper isw(*stream.second);
73 while (*stream.second) {
74 std::unique_ptr<RJDocument> doc;
75 typename Scheduler::ContainerIdentifier cont;
82 doc = std::move(sched.getNewDoc(0));
83 doc->ParseStream<rapidjson::kParseStopWhenDoneFlag>(isw);
84 if (doc->HasParseError()) {
85 if (doc->GetParseError() != rapidjson::kParseErrorDocumentEmpty)
86 LOG(WARNING) << std::string(rapidjson::GetParseError_En(
87 doc->GetParseError()))
88 <<
" from stream " << stream.first->getStreamName()
89 <<
" : " << begin <<
"-" << isw.Tell();
94 auto tmpDoc = std::make_unique<RJDocument>();
95 tmpDoc->ParseStream<rapidjson::kParseStopWhenDoneFlag>(isw);
96 if (doc->HasParseError()) {
97 if (doc->GetParseError() != rapidjson::kParseErrorDocumentEmpty)
98 LOG(WARNING) << std::string(rapidjson::GetParseError_En(
99 doc->GetParseError()))
100 <<
" from stream " << stream.first->getStreamName()
101 <<
" : " << begin <<
"-" << isw.Tell();
105 cont = sched.getContainerForDoc(*tmpDoc);
106 doc = std::move(sched.getNewDoc(cont));
107 doc->CopyFrom(*tmpDoc, doc->GetAllocator(),
true);
112 auto ptr = stream.first->clone();
114 std::unique_ptr<IDPositionOrigin> orig;
115 if (tmp !=
nullptr) {
119 if (orig ==
nullptr) {
120 LOG(ERROR) <<
"Unsupported origin retrieved.";
125 orig->setIndex(index);
128 sched.scheduleDocument(cont, std::move(doc), std::move(orig),
132 if (stream.second->bad()) {
133 LOG(ERROR) <<
"Error streaming: " << strerror(errno);
Definition: IDPositionOrigin.h:16
void setStart(long start)
Definition: IDPositionOrigin.cpp:16
Definition: IOThreadPool.h:182
IQueue * iqueue
Definition: IOThreadPool.h:218
bool shouldRun
Definition: IOThreadPool.h:221
OQueue * oqueue
Definition: IOThreadPool.h:219
WConf conf
Definition: IOThreadPool.h:220
OQueueStruct::queue_t OQueue
Definition: IOThreadPool.h:189
IQueueStruct::queue_t IQueue
Definition: IOThreadPool.h:188
IQueueStruct::payload_t IPayload
Definition: IOThreadPool.h:190
static size_t storageRetrievalThreads
Definition: config.h:54
Definition: DefaultContainerScheduler.h:19
Definition: JSONStreamParser.h:26
JSONStreamParser(IQueue *iqueue, OQueue *oqueue, size_t sourceSize)
Definition: JSONStreamParser.h:28
void work() override
Definition: JSONStreamParser.h:44
~JSONStreamParser() override
Definition: JSONStreamParser.h:34
static const size_t recommendedThreads()
Definition: JSONStreamParser.h:39
Definition: IImportSource.h:12