5 #ifndef JODA_READERPARSER_H 
    6 #define JODA_READERPARSER_H 
    8 #include <glog/logging.h> 
   17 #include <boost/functional/hash.hpp> 
   18 #include "../../../../storage/container/include/joda/container/ContainerFlags.h" 
   25   typedef std::pair<ReaderFlag, ParserFlag> 
ReaderID;
 
   26   typedef std::pair<ParserFlag, ContainerFlag> 
ParserID;
 
   30   void parse(
const std::vector<std::unique_ptr<IImportSource>> &sources,
 
   31              std::shared_ptr<JSONStorage> &storage);
 
   45   void startParser(
const std::vector<std::unique_ptr<IImportSource>> &sources,
 
   47                                       boost::hash<ReaderID>> &readerIds,
 
   49                                       boost::hash<ParserID>> &parserIds,
 
   50                    std::unique_ptr<JsonContainerQueue::queue_t> &containerQueue,
 
   51                    std::future<void> &jsonContInserter);
 
   54       const std::vector<std::unique_ptr<IImportSource>> &sources,
 
   56                          boost::hash<ReaderParser::ReaderID>> &readerIds,
 
   57       std::tuple<std::unique_ptr<JsonTextParserQueue::queue_t>,
 
   58                  std::vector<size_t>, std::vector<size_t>> &textParserQueue,
 
   59       std::tuple<std::unique_ptr<JsonTextStreamParserQueue::queue_t>,
 
   60                  std::vector<size_t>, std::vector<size_t>> &streamParserQueue,
 
   61       std::vector<size_t> &threadIDs, std::future<void> &jsonContInserter);
 
   63   template <
template <
class...> 
class T, 
class... Args>
 
   64   inline std::unique_ptr<IThreadUser> createParserFromConfig(Args... args);
 
   66   template <
class Queue>
 
   67   inline auto createReaderQueue(
 
   69                          boost::hash<ReaderParser::ReaderID>> &readerIds);
 
   71   template <
class Reader, 
class RQueueCont, 
class PQueueCont>
 
   72   inline std::unique_ptr<IOThreadPool<Reader>> createReader(
 
   73       RQueueCont &rqueue, PQueueCont &pqueue,
 
   75                          boost::hash<ReaderParser::ReaderID>> &readerIds,
 
   76       std::vector<size_t> &threadIDs);
 
   78   size_t maxThreads = 1;
 
   81 template <
template <
class...> 
class T, 
class... Args>
 
   82 std::unique_ptr<IThreadUser> ReaderParser::createParserFromConfig(
 
   85     return std::make_unique<IOThreadPool<T<>>>(args...);
 
   87     return std::make_unique<IOThreadPool<T<SimilarityScheduler<PathJaccard>>>>(
 
   90     return std::make_unique<
 
   93     DCHECK(
false) << 
"Similarity Measure missing in Parser construction";
 
   98 template <
typename Queue>
 
   99 auto ReaderParser::createReaderQueue(
 
  101                        boost::hash<ReaderParser::ReaderID>> &readerIds) {
 
  102   std::pair<std::unique_ptr<typename Queue::queue_t>,
 
  103             std::unique_ptr<typename Queue::queue_t::ptok_t>>
 
  105   for (
const auto &readerId : readerIds) {
 
  106     if (readerId.first == Queue::getFlag()) {
 
  107       queue.first = Queue::getQueue(
 
  109           std::thread::hardware_concurrency());
 
  111           std::make_unique<typename Queue::queue_t::ptok_t>(queue.first->queue);
 
  118 template <
class Reader, 
class RQueueCont, 
class PQueueCont>
 
  119 std::unique_ptr<IOThreadPool<Reader>> ReaderParser::createReader(
 
  120     RQueueCont &rqueue, PQueueCont &pqueue,
 
  122                        boost::hash<ReaderParser::ReaderID>> &readerIds,
 
  123     std::vector<size_t> &threadIDs) {
 
  124   const auto ReaderName = 
typeid(Reader).name();
 
  125   for (
const auto &readerId : readerIds) {
 
  126     if (readerId.first == Reader::getIQueueFlags() &&
 
  127         readerId.second == Reader::getOQueueFlags()) {
 
  128       DCHECK(rqueue.first != 
nullptr)
 
  129           << 
"Input queue missing for " << ReaderName;
 
  130       DCHECK((
ReaderFlag)rqueue.first->getFlag() == Reader::getIQueueFlags())
 
  131           << 
"Input queue type mismatching for " << ReaderName;
 
  132       DCHECK(std::get<0>(pqueue) != 
nullptr)
 
  133           << 
"Output queue missing for " << ReaderName;
 
  134       DCHECK((
ParserFlag)std::get<0>(pqueue)->getFlag() ==
 
  135              Reader::getOQueueFlags())
 
  136           << 
"Output queue type mismatching for " << ReaderName;
 
  138       auto tmp = std::make_unique<IOThreadPool<Reader>>(
 
  142       threadIDs.push_back(x);
 
  143       std::get<1>(pqueue).push_back(x);
 
ParserFlag
Definition: ParserFlags.h:13
 
ReaderFlag
Definition: ReaderFlags.h:11
 
ThreadManager g_ThreadManagerInstance
Definition: ThreadManager.cpp:8
 
Definition: IOThreadPool.h:18
 
size_t registerThreadUser(IThreadUser *user)
Definition: ThreadManager.cpp:21
 
@ NO_SIMILARITY
Definition: config.h:68
 
static size_t readingThreads
Definition: config.h:56
 
static Sim_Measures sim_measure
Definition: config.h:70
 
static size_t read_bulk_size
Definition: config.h:59
 
Definition: ReaderParser.h:23
 
size_t getParsedConts() const
Definition: ReaderParser.cpp:25
 
size_t getMaxThreads() const
Definition: ReaderParser.cpp:13
 
bool isParsing() const
Definition: ReaderParser.cpp:29
 
size_t parsedConts
Definition: ReaderParser.h:41
 
std::pair< ReaderFlag, ParserFlag > ReaderID
Definition: ReaderParser.h:25
 
std::pair< ParserFlag, ContainerFlag > ParserID
Definition: ReaderParser.h:26
 
size_t getParsedDocs() const
Definition: ReaderParser.cpp:21
 
void setMaxThreads(size_t maxThreads)
Definition: ReaderParser.cpp:17
 
size_t parsedDocs
Definition: ReaderParser.h:40
 
void parse(const std::vector< std::unique_ptr< IImportSource >> &sources, std::shared_ptr< JSONStorage > &storage)
Definition: ReaderParser.cpp:31
 
bool parsing
Definition: ReaderParser.h:42
 
Definition: IImportSource.h:12