JODA  0.13.1 (59b41972)
JSON On-Demand Analysis
ReaderParser.h
Go to the documentation of this file.
1 //
2 // Created by Nico Schäfer on 07/02/18.
3 //
4 
5 #ifndef JODA_READERPARSER_H
6 #define JODA_READERPARSER_H
7 
8 #include <glog/logging.h>
17 #include <boost/functional/hash.hpp>
18 #include "../../../../storage/container/include/joda/container/ContainerFlags.h"
19 #include "IImportSource.h"
20 #include "joda/config/config.h"
21 
22 namespace joda::docparsing {
23 class ReaderParser {
24  public:
25  typedef std::pair<ReaderFlag, ParserFlag> ReaderID;
26  typedef std::pair<ParserFlag, ContainerFlag> ParserID;
27 
28  ReaderParser() = default;
29 
30  void parse(const std::vector<std::unique_ptr<IImportSource>> &sources,
31  std::shared_ptr<JSONStorage> &storage);
32 
33  size_t getMaxThreads() const;
34  void setMaxThreads(size_t maxThreads);
35  size_t getParsedDocs() const;
36  size_t getParsedConts() const;
37  bool isParsing() const;
38 
39  protected:
40  size_t parsedDocs = 0;
41  size_t parsedConts = 0;
42  bool parsing = false;
43 
44  private:
45  void startParser(const std::vector<std::unique_ptr<IImportSource>> &sources,
46  std::unordered_set<ReaderParser::ReaderID,
47  boost::hash<ReaderID>> &readerIds,
48  std::unordered_set<ReaderParser::ParserID,
49  boost::hash<ParserID>> &parserIds,
50  std::unique_ptr<JsonContainerQueue::queue_t> &containerQueue,
51  std::future<void> &jsonContInserter);
52 
53  void startReaders(
54  const std::vector<std::unique_ptr<IImportSource>> &sources,
55  std::unordered_set<ReaderParser::ReaderID,
56  boost::hash<ReaderParser::ReaderID>> &readerIds,
57  std::tuple<std::unique_ptr<JsonTextParserQueue::queue_t>,
58  std::vector<size_t>, std::vector<size_t>> &textParserQueue,
59  std::tuple<std::unique_ptr<JsonTextStreamParserQueue::queue_t>,
60  std::vector<size_t>, std::vector<size_t>> &streamParserQueue,
61  std::vector<size_t> &threadIDs, std::future<void> &jsonContInserter);
62 
63  template <template <class...> class T, class... Args>
64  inline std::unique_ptr<IThreadUser> createParserFromConfig(Args... args);
65 
66  template <class Queue>
67  inline auto createReaderQueue(
68  std::unordered_set<ReaderParser::ReaderID,
69  boost::hash<ReaderParser::ReaderID>> &readerIds);
70 
71  template <class Reader, class RQueueCont, class PQueueCont>
72  inline std::unique_ptr<IOThreadPool<Reader>> createReader(
73  RQueueCont &rqueue, PQueueCont &pqueue,
74  std::unordered_set<ReaderParser::ReaderID,
75  boost::hash<ReaderParser::ReaderID>> &readerIds,
76  std::vector<size_t> &threadIDs);
77 
78  size_t maxThreads = 1;
79 };
80 
81 template <template <class...> class T, class... Args>
82 std::unique_ptr<IThreadUser> ReaderParser::createParserFromConfig(
83  Args... args) {
85  return std::make_unique<IOThreadPool<T<>>>(args...);
86  } else if (config::sim_measure == config::Sim_Measures::PATH_JACCARD) {
87  return std::make_unique<IOThreadPool<T<SimilarityScheduler<PathJaccard>>>>(
88  args...);
89  } else if (config::sim_measure == config::Sim_Measures::ATTRIBUTE_JACCARD) {
90  return std::make_unique<
92  } else {
93  DCHECK(false) << "Similarity Measure missing in Parser construction";
94  }
95  return nullptr;
96 }
97 
98 template <typename Queue>
99 auto ReaderParser::createReaderQueue(
100  std::unordered_set<ReaderParser::ReaderID,
101  boost::hash<ReaderParser::ReaderID>> &readerIds) {
102  std::pair<std::unique_ptr<typename Queue::queue_t>,
103  std::unique_ptr<typename Queue::queue_t::ptok_t>>
104  queue;
105  for (const auto &readerId : readerIds) {
106  if (readerId.first == Queue::getFlag()) {
107  queue.first = Queue::getQueue(
108  config::read_bulk_size * (std::thread::hardware_concurrency()), 0,
109  std::thread::hardware_concurrency());
110  queue.second =
111  std::make_unique<typename Queue::queue_t::ptok_t>(queue.first->queue);
112  break;
113  }
114  }
115  return queue;
116 }
117 
118 template <class Reader, class RQueueCont, class PQueueCont>
119 std::unique_ptr<IOThreadPool<Reader>> ReaderParser::createReader(
120  RQueueCont &rqueue, PQueueCont &pqueue,
121  std::unordered_set<ReaderParser::ReaderID,
122  boost::hash<ReaderParser::ReaderID>> &readerIds,
123  std::vector<size_t> &threadIDs) {
124  const auto ReaderName = typeid(Reader).name();
125  for (const auto &readerId : readerIds) {
126  if (readerId.first == Reader::getIQueueFlags() &&
127  readerId.second == Reader::getOQueueFlags()) {
128  DCHECK(rqueue.first != nullptr)
129  << "Input queue missing for " << ReaderName;
130  DCHECK((ReaderFlag)rqueue.first->getFlag() == Reader::getIQueueFlags())
131  << "Input queue type mismatching for " << ReaderName;
132  DCHECK(std::get<0>(pqueue) != nullptr)
133  << "Output queue missing for " << ReaderName;
134  DCHECK((ParserFlag)std::get<0>(pqueue)->getFlag() ==
135  Reader::getOQueueFlags())
136  << "Output queue type mismatching for " << ReaderName;
137 
138  auto tmp = std::make_unique<IOThreadPool<Reader>>(
139  rqueue.first.get(), std::get<0>(pqueue).get(), config::readingThreads,
140  nullptr);
141  auto x = g_ThreadManagerInstance.registerThreadUser(tmp.get());
142  threadIDs.push_back(x);
143  std::get<1>(pqueue).push_back(x);
144  return tmp;
145  }
146  }
147  return nullptr;
148 }
149 } // namespace joda::docparsing
150 
151 #endif // JODA_READERPARSER_H
ParserFlag
Definition: ParserFlags.h:13
ReaderFlag
Definition: ReaderFlags.h:11
ThreadManager g_ThreadManagerInstance
Definition: ThreadManager.cpp:8
Definition: IOThreadPool.h:18
size_t registerThreadUser(IThreadUser *user)
Definition: ThreadManager.cpp:21
@ NO_SIMILARITY
Definition: config.h:68
static size_t readingThreads
Definition: config.h:56
static Sim_Measures sim_measure
Definition: config.h:70
static size_t read_bulk_size
Definition: config.h:59
Definition: ReaderParser.h:23
size_t getParsedConts() const
Definition: ReaderParser.cpp:25
size_t getMaxThreads() const
Definition: ReaderParser.cpp:13
bool isParsing() const
Definition: ReaderParser.cpp:29
size_t parsedConts
Definition: ReaderParser.h:41
std::pair< ReaderFlag, ParserFlag > ReaderID
Definition: ReaderParser.h:25
std::pair< ParserFlag, ContainerFlag > ParserID
Definition: ReaderParser.h:26
size_t getParsedDocs() const
Definition: ReaderParser.cpp:21
void setMaxThreads(size_t maxThreads)
Definition: ReaderParser.cpp:17
size_t parsedDocs
Definition: ReaderParser.h:40
void parse(const std::vector< std::unique_ptr< IImportSource >> &sources, std::shared_ptr< JSONStorage > &storage)
Definition: ReaderParser.cpp:31
bool parsing
Definition: ReaderParser.h:42
Definition: IImportSource.h:12