Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:31:51

0001 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0002 #include "Utilities/StorageFactory/interface/StorageAccount.h"
0003 #include "Utilities/StorageFactory/interface/Storage.h"
0004 #include "FWCore/PluginManager/interface/PluginManager.h"
0005 #include "FWCore/PluginManager/interface/standard.h"
0006 #include "FWCore/Utilities/interface/Exception.h"
0007 #include <memory>
0008 #include <mutex>
0009 #include <condition_variable>
0010 #include <thread>
0011 #include <iostream>
0012 #include <vector>
0013 
0014 typedef std::lock_guard<std::mutex> ScopedLock;
0015 
0016 //<<<<<< PRIVATE DEFINES                                                >>>>>>
0017 //<<<<<< PRIVATE CONSTANTS                                              >>>>>>
0018 //<<<<<< PRIVATE TYPES                                                  >>>>>>
0019 //<<<<<< PRIVATE VARIABLE DEFINITIONS                                   >>>>>>
0020 //<<<<<< PUBLIC VARIABLE DEFINITIONS                                    >>>>>>
0021 //<<<<<< CLASS STRUCTURE INITIALIZATION                                 >>>>>>
0022 //<<<<<< PRIVATE FUNCTION DEFINITIONS                                   >>>>>>
0023 //<<<<<< PUBLIC FUNCTION DEFINITIONS                                    >>>>>>
0024 //<<<<<< MEMBER FUNCTION DEFINITIONS                                    >>>>>>
0025 
0026 using namespace edm::storage;
0027 
0028 struct IsDoneWriting {
0029   int &flag_;
0030   IsDoneWriting(int &flag) : flag_(flag) {}
0031   bool operator()() { return flag_ == 0; }
0032 };
0033 
0034 class OutputDropBox {
0035 public:
0036   OutputDropBox() : outbuf(1000000), nout(0), ce(""), writing(0) {}
0037 
0038   // called by main
0039   bool set(std::vector<char> &ibuf, IOSize n) {
0040     // wait that all threads finish write to swap
0041     std::unique_lock<std::mutex> gl(lock);
0042     done.wait(gl, IsDoneWriting(writing));
0043 
0044     bool ret = true;
0045     // if error in previous write return....
0046     if (ce.empty()) {
0047       outbuf.swap(ibuf);
0048       nout = n;
0049     } else
0050       ret = false;
0051 
0052     undo();  // clean al bits, set writing to its size...
0053 
0054     // notify threads buffer is ready
0055     doit.notify_all();
0056     return ret;
0057   }
0058 
0059   // called by thread
0060   bool write(Storage *os, int id) {
0061     std::unique_lock<std::mutex> gl(lock);
0062     // wait if box empty or this thread already consumed...
0063     if (m_done[id])
0064       doit.wait(gl);
0065 
0066     bool ret = true;
0067     if (nout == 0)
0068       // notify thread to exit....
0069       ret = false;
0070     else {
0071       try {
0072         os->write(&outbuf[0], nout);
0073       } catch (cms::Exception &lce) {
0074         ce = lce.explainSelf();
0075       }
0076     }
0077 
0078     {
0079       // declare it finishes
0080       ScopedLock wl(wlock);
0081       m_done[id] = true;
0082       writing--;
0083     }
0084     done.notify_all();
0085     return ret;
0086   }
0087 
0088   // add a writer (called by thread itself), return thread index....
0089   int addWriter(void) {
0090     ScopedLock wl(wlock);
0091     m_done.push_back(true);
0092     return m_done.size() - 1;
0093   }
0094 
0095   // clear bits (declare box ready...)
0096   void undo() {
0097     ScopedLock wl(wlock);
0098     writing = m_done.size();
0099     std::fill(m_done.begin(), m_done.end(), false);
0100   }
0101 
0102   std::vector<bool> m_done;
0103   std::vector<char> outbuf;
0104   IOSize nout;
0105   std::string ce;
0106   int writing;
0107   // writing lock
0108   mutable std::mutex wlock;
0109   // swap lock
0110   mutable std::mutex lock;
0111   mutable std::condition_variable doit;
0112   mutable std::condition_variable done;
0113 };
0114 
0115 class InputDropBox {
0116 public:
0117   InputDropBox() : end(false), inbuf(1000000), nin(0), ce("") {}
0118 
0119   // called by main
0120   IOSize get(std::vector<char> &ibuf) {
0121     std::unique_lock<std::mutex> gl(lock);
0122     if (end)
0123       // if thread is over return...
0124       return 0;
0125 
0126     if (nin == 0)
0127       // wait the thread to finish to read before swapping
0128       done.wait(gl);
0129 
0130     IOSize ret = 0;
0131     // if error in previous write return....
0132     if (ce.empty()) {
0133       inbuf.swap(ibuf);
0134       ret = nin;
0135     }
0136     nin = 0;
0137     // notify threads buffer is ready
0138     doit.notify_all();
0139     return ret;
0140   }
0141 
0142   // called by thread
0143   bool read(Storage *os) {
0144     std::unique_lock<std::mutex> gl(lock);
0145     if (nin != 0)
0146       // wait if box empty or this thread already consumed...
0147       doit.wait(gl);
0148 
0149     bool ret = true;
0150 
0151     if (inbuf.empty()) {
0152       // inbuf empty notify thread to exit....
0153       end = true;
0154       ret = false;
0155     } else {
0156       try {
0157         nin = os->read(&inbuf[0], inbuf.size());
0158         if (nin == 0) {
0159           end = true;
0160           ret = false;  // stop thread
0161         }
0162       } catch (cms::Exception &e) {
0163         ce = e.explainSelf();
0164       }
0165     }
0166 
0167     done.notify_all();
0168     return ret;
0169   }
0170 
0171   bool end;
0172   std::vector<char> inbuf;
0173   IOSize nin;
0174   std::string ce;
0175   // swap lock
0176   mutable std::mutex lock;
0177   mutable std::condition_variable doit;
0178   mutable std::condition_variable done;
0179 };
0180 
0181 static InputDropBox inbox;
0182 static OutputDropBox dropbox;
0183 
0184 static void writeThread(Storage *os) {
0185   int myid = dropbox.addWriter();
0186 
0187   std::cout << "start writing thread " << myid << std::endl;
0188   while (dropbox.write(os, myid))
0189     ;
0190   std::cout << "end writing thread " << myid << std::endl;
0191 }
0192 
0193 static void readThread(Storage *os) {
0194   std::cout << "start reading thread" << std::endl;
0195   while (inbox.read(os))
0196     ;
0197   std::cout << "end reading thread" << std::endl;
0198 }
0199 
0200 int main(int argc, char **argv) try {
0201   edmplugin::PluginManager::configure(edmplugin::standard::config());
0202 
0203   if (argc < 3) {
0204     std::cerr << "usage: " << argv[0] << " INFILE OUTFILE...\n";
0205     return EXIT_FAILURE;
0206   }
0207 
0208   std::shared_ptr<Storage> is;
0209   std::vector<std::unique_ptr<Storage> > os(argc - 2);
0210   std::vector<std::thread> threads;
0211   bool readThreadActive = true;
0212   bool writeThreadActive = true;
0213   IOOffset size = -1;
0214 
0215   StorageFactory::getToModify()->enableAccounting(true);
0216   bool exists = StorageFactory::getToModify()->check(argv[1], &size);
0217   std::cerr << "input file exists = " << exists << ", size = " << size << "\n";
0218   if (!exists)
0219     return EXIT_SUCCESS;
0220 
0221   try {
0222     is = StorageFactory::getToModify()->open(argv[1]);
0223     if (readThreadActive)
0224       threads.emplace_back(&readThread, is.get());
0225   } catch (cms::Exception &e) {
0226     std::cerr << "error in opening input file " << argv[1] << ":\n" << e.explainSelf() << std::endl;
0227     return EXIT_FAILURE;
0228   }
0229 
0230   // open output files and create threads, one thread per output
0231   for (int i = 0; i < argc - 2; i++)
0232     try {
0233       os[i] = StorageFactory::getToModify()->open(argv[i + 2],
0234                                                   IOFlags::OpenWrite | IOFlags::OpenCreate | IOFlags::OpenTruncate);
0235       if (writeThreadActive)
0236         threads.emplace_back(&writeThread, os[i].get());
0237     } catch (cms::Exception &e) {
0238       std::cerr << "error in opening output file " << argv[i + 2] << ":\n" << e.explainSelf() << std::endl;
0239       return EXIT_FAILURE;
0240     }
0241 
0242   std::vector<char> inbuf(1048576);
0243   std::vector<char> outbuf(1048576);
0244   IOSize n;
0245 
0246   while ((n = readThreadActive ? inbox.get(inbuf) : is->read(&inbuf[0], inbuf.size()))) {
0247     //free reading thread
0248     inbuf.swap(outbuf);
0249     // wait threads have finished to write
0250     // drop buffer in thread
0251     if (writeThreadActive) {
0252       if (!dropbox.set(outbuf, n))
0253         break;
0254     } else
0255       for (size_t i = 0; i < os.size(); i++)
0256         os[i]->write(&outbuf[0], n);
0257   }
0258 
0259   std::cout << "main end reading" << std::endl;
0260 
0261   // tell thread to end
0262   inbuf.clear();
0263 
0264   if (readThreadActive)
0265     inbox.get(inbuf);
0266   if (writeThreadActive)
0267     dropbox.set(outbuf, 0);
0268 
0269   if (writeThreadActive || readThreadActive) {
0270     for (auto &t : threads) {
0271       t.join();
0272     }
0273   }
0274 
0275   std::cout << StorageAccount::summaryText(true) << std::endl;
0276   return EXIT_SUCCESS;
0277 } catch (cms::Exception const &e) {
0278   std::cerr << e.explainSelf() << std::endl;
0279   return EXIT_FAILURE;
0280 } catch (std::exception const &e) {
0281   std::cerr << e.what() << std::endl;
0282   return EXIT_FAILURE;
0283 }