5 #ifndef JODA_READERPARSER_H
6 #define JODA_READERPARSER_H
8 #include <glog/logging.h>
17 #include <boost/functional/hash.hpp>
18 #include "../../../../storage/container/include/joda/container/ContainerFlags.h"
25 typedef std::pair<ReaderFlag, ParserFlag>
ReaderID;
26 typedef std::pair<ParserFlag, ContainerFlag>
ParserID;
30 void parse(
const std::vector<std::unique_ptr<IImportSource>> &sources,
31 std::shared_ptr<JSONStorage> &storage);
45 void startParser(
const std::vector<std::unique_ptr<IImportSource>> &sources,
47 boost::hash<ReaderID>> &readerIds,
49 boost::hash<ParserID>> &parserIds,
50 std::unique_ptr<JsonContainerQueue::queue_t> &containerQueue,
51 std::future<void> &jsonContInserter);
54 const std::vector<std::unique_ptr<IImportSource>> &sources,
56 boost::hash<ReaderParser::ReaderID>> &readerIds,
57 std::tuple<std::unique_ptr<JsonTextParserQueue::queue_t>,
58 std::vector<size_t>, std::vector<size_t>> &textParserQueue,
59 std::tuple<std::unique_ptr<JsonTextStreamParserQueue::queue_t>,
60 std::vector<size_t>, std::vector<size_t>> &streamParserQueue,
61 std::vector<size_t> &threadIDs, std::future<void> &jsonContInserter);
63 template <
template <
class...>
class T,
class... Args>
64 inline std::unique_ptr<IThreadUser> createParserFromConfig(Args... args);
66 template <
class Queue>
67 inline auto createReaderQueue(
69 boost::hash<ReaderParser::ReaderID>> &readerIds);
71 template <
class Reader,
class RQueueCont,
class PQueueCont>
72 inline std::unique_ptr<IOThreadPool<Reader>> createReader(
73 RQueueCont &rqueue, PQueueCont &pqueue,
75 boost::hash<ReaderParser::ReaderID>> &readerIds,
76 std::vector<size_t> &threadIDs);
78 size_t maxThreads = 1;
81 template <
template <
class...>
class T,
class... Args>
82 std::unique_ptr<IThreadUser> ReaderParser::createParserFromConfig(
85 return std::make_unique<IOThreadPool<T<>>>(args...);
87 return std::make_unique<IOThreadPool<T<SimilarityScheduler<PathJaccard>>>>(
90 return std::make_unique<
93 DCHECK(
false) <<
"Similarity Measure missing in Parser construction";
98 template <
typename Queue>
99 auto ReaderParser::createReaderQueue(
101 boost::hash<ReaderParser::ReaderID>> &readerIds) {
102 std::pair<std::unique_ptr<typename Queue::queue_t>,
103 std::unique_ptr<typename Queue::queue_t::ptok_t>>
105 for (
const auto &readerId : readerIds) {
106 if (readerId.first == Queue::getFlag()) {
107 queue.first = Queue::getQueue(
109 std::thread::hardware_concurrency());
111 std::make_unique<typename Queue::queue_t::ptok_t>(queue.first->queue);
118 template <
class Reader,
class RQueueCont,
class PQueueCont>
119 std::unique_ptr<IOThreadPool<Reader>> ReaderParser::createReader(
120 RQueueCont &rqueue, PQueueCont &pqueue,
122 boost::hash<ReaderParser::ReaderID>> &readerIds,
123 std::vector<size_t> &threadIDs) {
124 const auto ReaderName =
typeid(Reader).name();
125 for (
const auto &readerId : readerIds) {
126 if (readerId.first == Reader::getIQueueFlags() &&
127 readerId.second == Reader::getOQueueFlags()) {
128 DCHECK(rqueue.first !=
nullptr)
129 <<
"Input queue missing for " << ReaderName;
130 DCHECK((
ReaderFlag)rqueue.first->getFlag() == Reader::getIQueueFlags())
131 <<
"Input queue type mismatching for " << ReaderName;
132 DCHECK(std::get<0>(pqueue) !=
nullptr)
133 <<
"Output queue missing for " << ReaderName;
134 DCHECK((
ParserFlag)std::get<0>(pqueue)->getFlag() ==
135 Reader::getOQueueFlags())
136 <<
"Output queue type mismatching for " << ReaderName;
138 auto tmp = std::make_unique<IOThreadPool<Reader>>(
142 threadIDs.push_back(x);
143 std::get<1>(pqueue).push_back(x);
ParserFlag
Definition: ParserFlags.h:13
ReaderFlag
Definition: ReaderFlags.h:11
ThreadManager g_ThreadManagerInstance
Definition: ThreadManager.cpp:8
Definition: IOThreadPool.h:18
size_t registerThreadUser(IThreadUser *user)
Definition: ThreadManager.cpp:21
@ NO_SIMILARITY
Definition: config.h:68
static size_t readingThreads
Definition: config.h:56
static Sim_Measures sim_measure
Definition: config.h:70
static size_t read_bulk_size
Definition: config.h:59
Definition: ReaderParser.h:23
size_t getParsedConts() const
Definition: ReaderParser.cpp:25
size_t getMaxThreads() const
Definition: ReaderParser.cpp:13
bool isParsing() const
Definition: ReaderParser.cpp:29
size_t parsedConts
Definition: ReaderParser.h:41
std::pair< ReaderFlag, ParserFlag > ReaderID
Definition: ReaderParser.h:25
std::pair< ParserFlag, ContainerFlag > ParserID
Definition: ReaderParser.h:26
size_t getParsedDocs() const
Definition: ReaderParser.cpp:21
void setMaxThreads(size_t maxThreads)
Definition: ReaderParser.cpp:17
size_t parsedDocs
Definition: ReaderParser.h:40
void parse(const std::vector< std::unique_ptr< IImportSource >> &sources, std::shared_ptr< JSONStorage > &storage)
Definition: ReaderParser.cpp:31
bool parsing
Definition: ReaderParser.h:42
Definition: IImportSource.h:12