File indexing completed on 2024-04-06 12:10:06
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080 #include <sys/types.h>
0081 #include <sys/stat.h>
0082 #include <fcntl.h>
0083 #include <vector>
0084 #include <set>
0085 #include <string>
0086 #include <iostream>
0087 #include <memory>
0088 #include <thread>
0089 #include <mutex>
0090 #include <list>
0091 #include "DQMServices/Core/interface/ROOTFilePB.pb.h"
0092 #include <google/protobuf/io/coded_stream.h>
0093 #include <google/protobuf/io/gzip_stream.h>
0094 #include <google/protobuf/io/zero_copy_stream_impl.h>
0095 #include <TROOT.h>
0096 #include <TFile.h>
0097 #include <TBufferFile.h>
0098 #include <TObject.h>
0099 #include <TObjString.h>
0100 #include <TH1.h>
0101 #include <TProfile.h>
0102 #include <TKey.h>
0103 #include <TClass.h>
0104
0105 #include <sys/prctl.h>
0106 #include <sys/wait.h>
0107 #include <csignal>
0108
0109 #define DEBUG(x, msg) \
0110 if (debug >= x) \
0111 std::cout << "DEBUG: " << msg << std::flush
0112
0113 int debug = 0;
0114
0115 struct MicroME {
0116 MicroME(TObject *o, const std::string &dir, const std::string &obj, uint32_t flags = 0)
0117 : obj(o), dirname(dir), objname(obj), flags(flags) {}
0118
0119 mutable TObject *obj;
0120
0121 const std::string dirname;
0122 const std::string objname;
0123
0124 uint32_t flags;
0125
0126 bool operator<(const MicroME &rhs) const {
0127 const MicroME &lhs = *this;
0128 int diff = lhs.dirname.compare(rhs.dirname);
0129 return (diff < 0 ? true : diff == 0 ? lhs.objname < rhs.objname : false);
0130 };
0131
0132 void add(TObject *obj_to_add) const {
0133 DEBUG(1, "Merging: " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl);
0134
0135 if (dynamic_cast<TH1 *>(obj) && dynamic_cast<TH1 *>(obj_to_add)) {
0136 dynamic_cast<TH1 *>(obj)->Add(dynamic_cast<TH1 *>(obj_to_add));
0137 } else if (dynamic_cast<TObjString *>(obj) && dynamic_cast<TObjString *>(obj_to_add)) {
0138 } else {
0139 DEBUG(1, "Cannot merge (different types): " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl);
0140 }
0141 };
0142
0143 const std::string fullname() const { return dirname + '/' + objname; };
0144 };
0145
0146 using MEStore = std::set<MicroME>;
0147
0148 enum TaskType { TASK_ADD, TASK_DUMP, TASK_CONVERT, TASK_ENCODE };
0149
0150 enum ErrType { ERR_BADCFG = 1, ERR_NOFILE };
0151
0152 using google::protobuf::io::ArrayInputStream;
0153 using google::protobuf::io::CodedInputStream;
0154 using google::protobuf::io::FileInputStream;
0155 using google::protobuf::io::FileOutputStream;
0156 using google::protobuf::io::GzipInputStream;
0157 using google::protobuf::io::GzipOutputStream;
0158
0159
0160
0161
0162 inline TObject *extractNextObject(TBufferFile &buf) {
0163 if (buf.Length() == buf.BufferSize())
0164 return nullptr;
0165
0166 buf.InitMap();
0167 return reinterpret_cast<TObject *>(buf.ReadObjectAny(nullptr));
0168 }
0169
0170 static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject **obj) {
0171 size_t slash = h.full_pathname().rfind('/');
0172 size_t dirpos = (slash == std::string::npos ? 0 : slash);
0173 size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0174 dirname.assign(h.full_pathname(), 0, dirpos);
0175 objname.assign(h.full_pathname(), namepos, std::string::npos);
0176 TBufferFile buf(TBufferFile::kRead, h.size(), (void *)h.streamed_histo().data(), kFALSE);
0177 buf.Reset();
0178 *obj = extractNextObject(buf);
0179 if (!*obj) {
0180 std::cerr << "Error reading element: " << h.full_pathname() << std::endl;
0181 }
0182 }
0183
0184 void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd) {
0185 FileOutputStream out_stream(out_fd);
0186 GzipOutputStream::Options options;
0187 options.format = GzipOutputStream::GZIP;
0188 options.compression_level = 2;
0189 GzipOutputStream gzip_stream(&out_stream, options);
0190 dqmstore_output_msg.SerializeToZeroCopyStream(&gzip_stream);
0191
0192
0193 gzip_stream.Close();
0194 out_stream.Close();
0195 }
0196
0197 void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename) {
0198 DEBUG(1, "Writing file" << std::endl);
0199
0200 int out_fd =
0201 ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
0202
0203 writeMessageFD(dqmstore_output_msg, out_fd);
0204 ::close(out_fd);
0205 }
0206
0207 void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const MEStore µmes) {
0208 auto mi = micromes.begin();
0209 auto me = micromes.end();
0210
0211 DEBUG(1, "Streaming ROOT objects" << std::endl);
0212 for (; mi != me; ++mi) {
0213 dqmstorepb::ROOTFilePB::Histo *h = dqmstore_output_msg.add_histo();
0214 DEBUG(2, "Streaming ROOT object " << mi->fullname() << "\n");
0215 h->set_full_pathname(mi->fullname());
0216 TBufferFile buffer(TBufferFile::kWrite);
0217 buffer.WriteObject(mi->obj);
0218 h->set_size(buffer.Length());
0219 h->set_flags(mi->flags);
0220 h->set_streamed_histo((const void *)buffer.Buffer(), buffer.Length());
0221 delete mi->obj;
0222 }
0223 }
0224
0225 void processDirectory(TFile *file, const std::string &curdir, MEStore µmes) {
0226 DEBUG(1, "Processing directory " << curdir << "\n");
0227 file->cd(curdir.c_str());
0228 TKey *key;
0229 TIter next(gDirectory->GetListOfKeys());
0230 while ((key = (TKey *)next())) {
0231 TObject *obj = key->ReadObj();
0232 if (dynamic_cast<TDirectory *>(obj)) {
0233 std::string subdir;
0234 subdir.reserve(curdir.size() + strlen(obj->GetName()) + 2);
0235 subdir += curdir;
0236 if (!curdir.empty())
0237 subdir += '/';
0238 subdir += obj->GetName();
0239 processDirectory(file, subdir, micromes);
0240 } else if ((dynamic_cast<TH1 *>(obj)) || (dynamic_cast<TObjString *>(obj))) {
0241 if (dynamic_cast<TH1 *>(obj)) {
0242 dynamic_cast<TH1 *>(obj)->SetDirectory(nullptr);
0243 }
0244
0245 DEBUG(2, curdir << "/" << obj->GetName() << "\n");
0246 MicroME mme(obj, curdir, obj->GetName());
0247
0248 micromes.insert(mme);
0249 }
0250 }
0251 }
0252
0253 int encodeFile(const std::string &output_filename, const std::vector<std::string> &filenames) {
0254 assert(filenames.size() == 1);
0255 TFile input(filenames[0].c_str());
0256 DEBUG(0, "Encoding file " << filenames[0] << std::endl);
0257 MEStore micromes;
0258 dqmstorepb::ROOTFilePB dqmstore_message;
0259
0260 processDirectory(&input, "", micromes);
0261 fillMessage(dqmstore_message, micromes);
0262 writeMessage(dqmstore_message, output_filename);
0263
0264 return 0;
0265 }
0266
0267 int convertFile(const std::string &output_filename, const std::vector<std::string> &filenames) {
0268 assert(filenames.size() == 1);
0269 TFile output(output_filename.c_str(), "RECREATE");
0270 DEBUG(0, "Converting file " << filenames[0] << std::endl);
0271 dqmstorepb::ROOTFilePB dqmstore_message;
0272
0273 int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
0274 FileInputStream fin(filedescriptor);
0275 GzipInputStream input(&fin);
0276 CodedInputStream input_coded(&input);
0277 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0278 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
0279 std::cout << "Fatal Error opening file " << filenames[0] << std::endl;
0280 return ERR_NOFILE;
0281 }
0282 ::close(filedescriptor);
0283
0284 for (int i = 0; i < dqmstore_message.histo_size(); i++) {
0285 const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_message.histo(i);
0286 DEBUG(1, h.full_pathname() << std::endl);
0287 DEBUG(1, h.size() << std::endl);
0288 TBufferFile buf(TBufferFile::kRead, h.size(), (void *)h.streamed_histo().data(), kFALSE);
0289 buf.Reset();
0290 TObject *obj = extractNextObject(buf);
0291 std::string path, objname;
0292 get_info(h, path, objname, &obj);
0293 gDirectory->cd("/");
0294
0295 size_t start = 0;
0296 size_t end = path.find('/', start);
0297 if (end == std::string::npos)
0298 end = path.size();
0299 while (true) {
0300 std::string part(path, start, end - start);
0301 if (!gDirectory->Get(part.c_str()))
0302 gDirectory->mkdir(part.c_str());
0303 gDirectory->cd(part.c_str());
0304
0305 if (end + 1 >= path.size())
0306 break;
0307
0308 start = end + 1;
0309 end = path.find('/', start);
0310 if (end == std::string::npos)
0311 end = path.size();
0312 }
0313 obj->Write();
0314 DEBUG(1, obj->GetName() << std::endl);
0315 }
0316 output.Close();
0317 return 0;
0318 }
0319
0320 int dumpFiles(const std::vector<std::string> &filenames) {
0321 assert(!filenames.empty());
0322 for (int i = 0, e = filenames.size(); i != e; ++i) {
0323 DEBUG(0, "Dumping file " << filenames[i] << std::endl);
0324 dqmstorepb::ROOTFilePB dqmstore_message;
0325
0326 int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
0327 FileInputStream fin(filedescriptor);
0328 GzipInputStream input(&fin);
0329 CodedInputStream input_coded(&input);
0330 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0331 if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
0332 std::cout << "Fatal Error opening file " << filenames[0] << std::endl;
0333 return ERR_NOFILE;
0334 }
0335 ::close(filedescriptor);
0336
0337 for (int i = 0; i < dqmstore_message.histo_size(); i++) {
0338 const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_message.histo(i);
0339 DEBUG(1, h.full_pathname() << std::endl);
0340 DEBUG(1, h.size() << std::endl);
0341 TBufferFile buf(TBufferFile::kRead, h.size(), (void *)h.streamed_histo().data(), kFALSE);
0342 buf.Reset();
0343 TObject *obj = extractNextObject(buf);
0344 DEBUG(1, obj->GetName() << std::endl);
0345 DEBUG(1, "Flags: " << h.flags() << std::endl);
0346 }
0347 }
0348
0349 return 0;
0350 }
0351
0352 int addFile(MEStore µmes, int fd) {
0353 dqmstorepb::ROOTFilePB dqmstore_msg;
0354
0355 FileInputStream fin(fd);
0356 GzipInputStream input(&fin);
0357 CodedInputStream input_coded(&input);
0358 input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0359 if (!dqmstore_msg.ParseFromCodedStream(&input_coded)) {
0360 std::cout << "Fatal decoding stream: " << fd << std::endl;
0361 return ERR_NOFILE;
0362 }
0363
0364 auto hint = micromes.begin();
0365 for (int i = 0; i < dqmstore_msg.histo_size(); i++) {
0366 std::string path;
0367 std::string objname;
0368 TObject *obj = nullptr;
0369 const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_msg.histo(i);
0370 get_info(h, path, objname, &obj);
0371
0372 MicroME mme(nullptr, path, objname, h.flags());
0373 auto ir = micromes.insert(hint, mme);
0374 if (ir->obj != nullptr) {
0375
0376
0377
0378 ir->add(obj);
0379 delete obj;
0380 DEBUG(2, "Merged MicroME " << mme.fullname() << std::endl);
0381 } else {
0382 ir->obj = obj;
0383 DEBUG(2, "Inserted MicroME " << mme.fullname() << std::endl);
0384 }
0385
0386 hint = ir;
0387 ++hint;
0388 }
0389
0390 return 0;
0391 }
0392
0393
0394
0395
0396 void tryRootPreload() {
0397
0398 TH1F obj_th1f("preload_th1f", "preload_th1f", 2, 0, 1);
0399
0400 TBufferFile write_buffer(TBufferFile::kWrite);
0401 write_buffer.WriteObject(&obj_th1f);
0402
0403 dqmstorepb::ROOTFilePB preload_file;
0404 dqmstorepb::ROOTFilePB::Histo *hw = preload_file.add_histo();
0405 hw->set_size(write_buffer.Length());
0406 hw->set_flags(0);
0407 hw->set_streamed_histo((const void *)write_buffer.Buffer(), write_buffer.Length());
0408
0409
0410 const dqmstorepb::ROOTFilePB::Histo &hr = preload_file.histo(0);
0411 std::string path;
0412 std::string objname;
0413 TObject *obj = nullptr;
0414 get_info(hr, path, objname, &obj);
0415 delete obj;
0416
0417
0418 }
0419
0420
0421 void addFilesWithFork(int parent_fd,
0422 const int fork_id,
0423 const int fork_total,
0424 const std::vector<std::string> &filenames) {
0425 DEBUG(1, "Start process: " << fork_id << " parent: " << (fork_id / 2) << std::endl);
0426
0427 std::list<std::pair<int, int> > children;
0428
0429
0430 for (int i = 0; i < 2; ++i) {
0431 int child_id = fork_id * 2 + i;
0432 if (child_id > fork_total)
0433 continue;
0434
0435 int fd[2];
0436 ::pipe(fd);
0437
0438 int child_pid = ::fork();
0439 if (child_pid == 0) {
0440 ::prctl(PR_SET_PDEATHSIG, SIGKILL);
0441 ::close(fd[0]);
0442
0443 addFilesWithFork(fd[1], child_id, fork_total, filenames);
0444 ::close(fd[1]);
0445
0446 ::_exit(0);
0447 } else {
0448 ::close(fd[1]);
0449 children.push_back(std::make_pair(fd[0], child_pid));
0450 }
0451 }
0452
0453
0454 MEStore microme;
0455
0456
0457
0458 for (unsigned int fi = fork_id - 1; fi < filenames.size(); fi += fork_total) {
0459 const std::string &file = filenames[fi];
0460 DEBUG(1, "Adding file " << file << std::endl);
0461
0462 int filedescriptor;
0463 if ((filedescriptor = ::open(file.c_str(), O_RDONLY)) == -1) {
0464 std::cout << "Fatal Error opening file " << file << std::endl;
0465
0466 exit(ERR_NOFILE);
0467 }
0468
0469 addFile(microme, filedescriptor);
0470 ::close(filedescriptor);
0471 }
0472
0473
0474 for (auto &chpair : children) {
0475 int fd = chpair.first;
0476 addFile(microme, fd);
0477 ::close(fd);
0478
0479
0480 int status;
0481 ::waitpid(chpair.second, &status, 0);
0482 }
0483
0484
0485 dqmstorepb::ROOTFilePB dqmstore_output_msg;
0486 fillMessage(dqmstore_output_msg, microme);
0487 writeMessageFD(dqmstore_output_msg, parent_fd);
0488 };
0489
0490 int addFiles(const std::string &output_filename, const std::vector<std::string> &filenames, int nthreads) {
0491 tryRootPreload();
0492
0493 DEBUG(1, "Writing file" << std::endl);
0494 int out_fd =
0495 ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
0496
0497 addFilesWithFork(out_fd, 1, nthreads, filenames);
0498 ::close(out_fd);
0499
0500 return 0;
0501 }
0502
0503 static int showusage() {
0504 static const std::string app_name("fasthadd");
0505
0506 std::cerr << "Usage: " << app_name << " [--[no-]debug] TASK OPTIONS\n\n " << app_name
0507 << " [OPTIONS] add [-j NUM_THREADS] -o OUTPUT_FILE [DAT FILE...]\n " << app_name
0508 << " [OPTIONS] convert -o ROOT_FILE DAT_FILE\n " << app_name
0509 << " [OPTIONS] encode -o DAT_FILE ROOT_FILE\n " << app_name << " [OPTIONS] dump [DAT FILE...]\n ";
0510 return ERR_BADCFG;
0511 }
0512
0513 int main(int argc, char *argv[]) {
0514 int arg;
0515 int ret = 0;
0516 int jobs = 1;
0517 std::string output_file;
0518 std::vector<std::string> filenames;
0519 TaskType task;
0520
0521 filenames.reserve(argc);
0522
0523 for (arg = 1; arg < argc; ++arg) {
0524 if (!strcmp(argv[arg], "--no-debug"))
0525 debug = 0;
0526 else if (!strcmp(argv[arg], "--debug") || !strcmp(argv[arg], "-d"))
0527 debug++;
0528 else
0529 break;
0530 }
0531
0532 if (arg < argc) {
0533 if (!strcmp(argv[arg], "add")) {
0534 ++arg;
0535 task = TASK_ADD;
0536 } else if (!strcmp(argv[arg], "dump")) {
0537 ++arg;
0538 task = TASK_DUMP;
0539 } else if (!strcmp(argv[arg], "convert")) {
0540 ++arg;
0541 task = TASK_CONVERT;
0542 } else if (!strcmp(argv[arg], "encode")) {
0543 ++arg;
0544 task = TASK_ENCODE;
0545 } else {
0546 std::cerr << "Unknown action: " << argv[arg] << std::endl;
0547 return showusage();
0548 }
0549 } else {
0550 std::cerr << "Not enough arguments\n";
0551 return showusage();
0552 }
0553
0554 if (task == TASK_ADD) {
0555 if ((arg != argc) && (strcmp(argv[arg], "-j") == 0)) {
0556 jobs = atoi(argv[arg + 1]);
0557
0558 if ((jobs < 1) || (jobs > 128)) {
0559 std::cerr << "Invalid argument for -j\n";
0560 return showusage();
0561 };
0562
0563 arg += 2;
0564 }
0565 }
0566
0567 if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) {
0568 if (arg == argc) {
0569 std::cerr << "add|convert|encode actions requires a -o option to be set\n";
0570 return showusage();
0571 }
0572 if (!strcmp(argv[arg], "-o")) {
0573 if (arg < argc - 1) {
0574 output_file = argv[++arg];
0575 } else {
0576 std::cerr << " -o option requires a value\n";
0577 return showusage();
0578 }
0579 }
0580 } else if (task == TASK_DUMP) {
0581 if (arg == argc) {
0582 std::cerr << "Missing input file(s)\n";
0583 return showusage();
0584 }
0585 for (; arg < argc; ++arg) {
0586 filenames.emplace_back(argv[arg]);
0587 }
0588 }
0589
0590 if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) {
0591 if (++arg == argc) {
0592 std::cerr << "Missing input file(s)\n";
0593 return showusage();
0594 }
0595 for (; arg < argc; ++arg) {
0596 filenames.emplace_back(argv[arg]);
0597 }
0598 }
0599
0600 if (task == TASK_ADD)
0601 ret = addFiles(output_file, filenames, jobs);
0602 else if (task == TASK_DUMP)
0603 ret = dumpFiles(filenames);
0604 else if (task == TASK_CONVERT)
0605 ret = convertFile(output_file, filenames);
0606 else if (task == TASK_ENCODE)
0607 ret = encodeFile(output_file, filenames);
0608
0609 google::protobuf::ShutdownProtobufLibrary();
0610 return ret;
0611 }