File indexing completed on 2024-04-06 12:31:51
0001 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0002 #include "Utilities/StorageFactory/interface/StorageAccount.h"
0003 #include "Utilities/StorageFactory/interface/Storage.h"
0004 #include "FWCore/PluginManager/interface/PluginManager.h"
0005 #include "FWCore/PluginManager/interface/standard.h"
0006 #include "FWCore/Utilities/interface/Exception.h"
0007 #include <memory>
0008 #include <mutex>
0009 #include <condition_variable>
0010 #include <thread>
0011 #include <iostream>
0012 #include <vector>
0013
0014 typedef std::lock_guard<std::mutex> ScopedLock;
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026 using namespace edm::storage;
0027
0028 struct IsDoneWriting {
0029 int &flag_;
0030 IsDoneWriting(int &flag) : flag_(flag) {}
0031 bool operator()() { return flag_ == 0; }
0032 };
0033
0034 class OutputDropBox {
0035 public:
0036 OutputDropBox() : outbuf(1000000), nout(0), ce(""), writing(0) {}
0037
0038
0039 bool set(std::vector<char> &ibuf, IOSize n) {
0040
0041 std::unique_lock<std::mutex> gl(lock);
0042 done.wait(gl, IsDoneWriting(writing));
0043
0044 bool ret = true;
0045
0046 if (ce.empty()) {
0047 outbuf.swap(ibuf);
0048 nout = n;
0049 } else
0050 ret = false;
0051
0052 undo();
0053
0054
0055 doit.notify_all();
0056 return ret;
0057 }
0058
0059
0060 bool write(Storage *os, int id) {
0061 std::unique_lock<std::mutex> gl(lock);
0062
0063 if (m_done[id])
0064 doit.wait(gl);
0065
0066 bool ret = true;
0067 if (nout == 0)
0068
0069 ret = false;
0070 else {
0071 try {
0072 os->write(&outbuf[0], nout);
0073 } catch (cms::Exception &lce) {
0074 ce = lce.explainSelf();
0075 }
0076 }
0077
0078 {
0079
0080 ScopedLock wl(wlock);
0081 m_done[id] = true;
0082 writing--;
0083 }
0084 done.notify_all();
0085 return ret;
0086 }
0087
0088
0089 int addWriter(void) {
0090 ScopedLock wl(wlock);
0091 m_done.push_back(true);
0092 return m_done.size() - 1;
0093 }
0094
0095
0096 void undo() {
0097 ScopedLock wl(wlock);
0098 writing = m_done.size();
0099 std::fill(m_done.begin(), m_done.end(), false);
0100 }
0101
0102 std::vector<bool> m_done;
0103 std::vector<char> outbuf;
0104 IOSize nout;
0105 std::string ce;
0106 int writing;
0107
0108 mutable std::mutex wlock;
0109
0110 mutable std::mutex lock;
0111 mutable std::condition_variable doit;
0112 mutable std::condition_variable done;
0113 };
0114
0115 class InputDropBox {
0116 public:
0117 InputDropBox() : end(false), inbuf(1000000), nin(0), ce("") {}
0118
0119
0120 IOSize get(std::vector<char> &ibuf) {
0121 std::unique_lock<std::mutex> gl(lock);
0122 if (end)
0123
0124 return 0;
0125
0126 if (nin == 0)
0127
0128 done.wait(gl);
0129
0130 IOSize ret = 0;
0131
0132 if (ce.empty()) {
0133 inbuf.swap(ibuf);
0134 ret = nin;
0135 }
0136 nin = 0;
0137
0138 doit.notify_all();
0139 return ret;
0140 }
0141
0142
0143 bool read(Storage *os) {
0144 std::unique_lock<std::mutex> gl(lock);
0145 if (nin != 0)
0146
0147 doit.wait(gl);
0148
0149 bool ret = true;
0150
0151 if (inbuf.empty()) {
0152
0153 end = true;
0154 ret = false;
0155 } else {
0156 try {
0157 nin = os->read(&inbuf[0], inbuf.size());
0158 if (nin == 0) {
0159 end = true;
0160 ret = false;
0161 }
0162 } catch (cms::Exception &e) {
0163 ce = e.explainSelf();
0164 }
0165 }
0166
0167 done.notify_all();
0168 return ret;
0169 }
0170
0171 bool end;
0172 std::vector<char> inbuf;
0173 IOSize nin;
0174 std::string ce;
0175
0176 mutable std::mutex lock;
0177 mutable std::condition_variable doit;
0178 mutable std::condition_variable done;
0179 };
0180
0181 static InputDropBox inbox;
0182 static OutputDropBox dropbox;
0183
0184 static void writeThread(Storage *os) {
0185 int myid = dropbox.addWriter();
0186
0187 std::cout << "start writing thread " << myid << std::endl;
0188 while (dropbox.write(os, myid))
0189 ;
0190 std::cout << "end writing thread " << myid << std::endl;
0191 }
0192
0193 static void readThread(Storage *os) {
0194 std::cout << "start reading thread" << std::endl;
0195 while (inbox.read(os))
0196 ;
0197 std::cout << "end reading thread" << std::endl;
0198 }
0199
0200 int main(int argc, char **argv) try {
0201 edmplugin::PluginManager::configure(edmplugin::standard::config());
0202
0203 if (argc < 3) {
0204 std::cerr << "usage: " << argv[0] << " INFILE OUTFILE...\n";
0205 return EXIT_FAILURE;
0206 }
0207
0208 std::shared_ptr<Storage> is;
0209 std::vector<std::unique_ptr<Storage> > os(argc - 2);
0210 std::vector<std::thread> threads;
0211 bool readThreadActive = true;
0212 bool writeThreadActive = true;
0213 IOOffset size = -1;
0214
0215 StorageFactory::getToModify()->enableAccounting(true);
0216 bool exists = StorageFactory::getToModify()->check(argv[1], &size);
0217 std::cerr << "input file exists = " << exists << ", size = " << size << "\n";
0218 if (!exists)
0219 return EXIT_SUCCESS;
0220
0221 try {
0222 is = StorageFactory::getToModify()->open(argv[1]);
0223 if (readThreadActive)
0224 threads.emplace_back(&readThread, is.get());
0225 } catch (cms::Exception &e) {
0226 std::cerr << "error in opening input file " << argv[1] << ":\n" << e.explainSelf() << std::endl;
0227 return EXIT_FAILURE;
0228 }
0229
0230
0231 for (int i = 0; i < argc - 2; i++)
0232 try {
0233 os[i] = StorageFactory::getToModify()->open(argv[i + 2],
0234 IOFlags::OpenWrite | IOFlags::OpenCreate | IOFlags::OpenTruncate);
0235 if (writeThreadActive)
0236 threads.emplace_back(&writeThread, os[i].get());
0237 } catch (cms::Exception &e) {
0238 std::cerr << "error in opening output file " << argv[i + 2] << ":\n" << e.explainSelf() << std::endl;
0239 return EXIT_FAILURE;
0240 }
0241
0242 std::vector<char> inbuf(1048576);
0243 std::vector<char> outbuf(1048576);
0244 IOSize n;
0245
0246 while ((n = readThreadActive ? inbox.get(inbuf) : is->read(&inbuf[0], inbuf.size()))) {
0247
0248 inbuf.swap(outbuf);
0249
0250
0251 if (writeThreadActive) {
0252 if (!dropbox.set(outbuf, n))
0253 break;
0254 } else
0255 for (size_t i = 0; i < os.size(); i++)
0256 os[i]->write(&outbuf[0], n);
0257 }
0258
0259 std::cout << "main end reading" << std::endl;
0260
0261
0262 inbuf.clear();
0263
0264 if (readThreadActive)
0265 inbox.get(inbuf);
0266 if (writeThreadActive)
0267 dropbox.set(outbuf, 0);
0268
0269 if (writeThreadActive || readThreadActive) {
0270 for (auto &t : threads) {
0271 t.join();
0272 }
0273 }
0274
0275 std::cout << StorageAccount::summaryText(true) << std::endl;
0276 return EXIT_SUCCESS;
0277 } catch (cms::Exception const &e) {
0278 std::cerr << e.explainSelf() << std::endl;
0279 return EXIT_FAILURE;
0280 } catch (std::exception const &e) {
0281 std::cerr << e.what() << std::endl;
0282 return EXIT_FAILURE;
0283 }