Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:10:06

0001 /** Copyright (c) 2013 "Marco Rovere"
0002 
0003 This code is free software: you can redistribute it and/or modify it
0004 under the terms of the GNU General Public License as published by the
0005 Free Software Foundation, either version 3 of the License, or (at your
0006 option) any later version.
0007 
0008 This program is distributed in the hope that it will be useful, but
0009 WITHOUT ANY WARRANTY; without even the implied warranty of
0010 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
0011 General Public License for more details.
0012 
0013 You should have received a copy of the GNU General Public License
0014 along with this program.  If not, see
0015 <http://www.gnu.org/licenses/>.
0016 
0017 This code is simple: its sole purpose is to either dump or add
0018 ProtocolBuffer-gzipped files that are meant to replace ordinary ROOT
0019 files containing only hierarchies of histograms, arranged in
0020 arbitrarily complex levels of directories. The merging logic is such
0021 that plots present in all files are added, while plots present in some
0022 of the files are anyway tracked and added, if similar ones are found
0023 in other files.
0024 
0025 The logic of the merging algorithm is trivial and fully rely on the
0026 ordered nature of the ProtocolBuffer files read in input. An internal
0027 set container of MicroME is used to host the final results. The
0028 relational ordering of the set must be guaranteed to match the one
0029 used to order the ProtocolBuffer files for optimal performance and
0030 correctness.
0031 
0032 A dependency on protocolbuffer is needed and should be alrady included
0033 out of the box into any recent CMSSW release.
0034 
0035 In case the protoclBuffer package is not avaialble, you need to
0036 install it as an external toolfile. Therefore, in order to be able to
0037 compile and run the code, you need to locally install protocol buffer
0038 2.4.1 and add it as a scram tool to your preferred CMSSW development
0039 area.
0040 
0041 The toolfile I used is:
0042 
0043 <tool name="protocolbuf" version="2.4.1">
0044   <client>
0045     <environment name="PROTOCOLBUF_CLIENT_BASE" default="/afs/cern.ch/work/r/rovere/protocolbuf"/>
0046     <environment name="LIBDIR"  value="$PROTOCOLBUF_CLIENT_BASE/lib"/>
0047     <environment name="INCLUDE" value="$PROTOCOLBUF_CLIENT_BASE/include"/>
0048     <environment name="PATH"    value="$PROTOCOLBUF_CLIENT_BASE/bin"/>
0049     <lib name="protobuf"/>
0050     <use name="zlib"/>
0051   </client>
0052   <runtime name="PATH"    value="$PROTOCOLBUF_CLIENT_BASE/bin"/>
0053 </tool>
0054 
0055 To register it into your development area you can simply do:
0056 
0057 scram setup protocolbuf.xml
0058 
0059 To verify the correctness of the information, do:
0060 
0061 scram tool info protocolbuf. You should see an output similar to the
0062 following:
0063 
0064 Tool info as configured in location /afs/cern.ch/work/r/rovere/fastHistoMergingPB/CMSSW_7_0_X_2013-07-08-0200
0065 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
0066 
0067 Name : protocolbuf
0068 Version : 2.4.1
0069 ++++++++++++++++++++
0070 SCRAM_PROJECT=no
0071 PROTOCOLBUF_CLIENT_BASE=/afs/cern.ch/work/r/rovere/protocolbuf
0072 LIB=protobuf
0073 LIBDIR=/afs/cern.ch/work/r/rovere/protocolbuf/lib
0074 INCLUDE=/afs/cern.ch/work/r/rovere/protocolbuf/include
0075 USE=zlib
0076 PATH=/afs/cern.ch/work/r/rovere/protocolbuf/bin
0077 
0078 */
0079 
0080 #include <sys/types.h>
0081 #include <sys/stat.h>
0082 #include <fcntl.h>
0083 #include <vector>
0084 #include <set>
0085 #include <string>
0086 #include <iostream>
0087 #include <memory>
0088 #include <thread>
0089 #include <mutex>
0090 #include <list>
0091 #include "DQMServices/Core/interface/ROOTFilePB.pb.h"
0092 #include <google/protobuf/io/coded_stream.h>
0093 #include <google/protobuf/io/gzip_stream.h>
0094 #include <google/protobuf/io/zero_copy_stream_impl.h>
0095 #include <TROOT.h>
0096 #include <TFile.h>
0097 #include <TBufferFile.h>
0098 #include <TObject.h>
0099 #include <TObjString.h>
0100 #include <TH1.h>
0101 #include <TProfile.h>
0102 #include <TKey.h>
0103 #include <TClass.h>
0104 
0105 #include <sys/prctl.h>
0106 #include <sys/wait.h>
0107 #include <csignal>
0108 
0109 #define DEBUG(x, msg) \
0110   if (debug >= x)     \
0111   std::cout << "DEBUG: " << msg << std::flush
0112 
0113 int debug = 0;
0114 
0115 struct MicroME {
0116   MicroME(TObject *o, const std::string &dir, const std::string &obj, uint32_t flags = 0)
0117       : obj(o), dirname(dir), objname(obj), flags(flags) {}
0118 
0119   mutable TObject *obj;
0120 
0121   const std::string dirname;
0122   const std::string objname;
0123 
0124   uint32_t flags;
0125 
0126   bool operator<(const MicroME &rhs) const {
0127     const MicroME &lhs = *this;
0128     int diff = lhs.dirname.compare(rhs.dirname);
0129     return (diff < 0 ? true : diff == 0 ? lhs.objname < rhs.objname : false);
0130   };
0131 
0132   void add(TObject *obj_to_add) const {
0133     DEBUG(1, "Merging: " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl);
0134 
0135     if (dynamic_cast<TH1 *>(obj) && dynamic_cast<TH1 *>(obj_to_add)) {
0136       dynamic_cast<TH1 *>(obj)->Add(dynamic_cast<TH1 *>(obj_to_add));
0137     } else if (dynamic_cast<TObjString *>(obj) && dynamic_cast<TObjString *>(obj_to_add)) {
0138     } else {
0139       DEBUG(1, "Cannot merge (different types): " << obj->GetName() << " << " << obj_to_add->GetName() << std::endl);
0140     }
0141   };
0142 
0143   const std::string fullname() const { return dirname + '/' + objname; };
0144 };
0145 
0146 using MEStore = std::set<MicroME>;
0147 
0148 enum TaskType { TASK_ADD, TASK_DUMP, TASK_CONVERT, TASK_ENCODE };
0149 
0150 enum ErrType { ERR_BADCFG = 1, ERR_NOFILE };
0151 
0152 using google::protobuf::io::ArrayInputStream;
0153 using google::protobuf::io::CodedInputStream;
0154 using google::protobuf::io::FileInputStream;
0155 using google::protobuf::io::FileOutputStream;
0156 using google::protobuf::io::GzipInputStream;
0157 using google::protobuf::io::GzipOutputStream;
0158 
0159 /** Extract the next serialised ROOT object from @a buf. Returns null
0160 if there are no more objects in the buffer, or a null pointer was
0161 serialised at this location. */
0162 inline TObject *extractNextObject(TBufferFile &buf) {
0163   if (buf.Length() == buf.BufferSize())
0164     return nullptr;
0165 
0166   buf.InitMap();
0167   return reinterpret_cast<TObject *>(buf.ReadObjectAny(nullptr));
0168 }
0169 
0170 static void get_info(const dqmstorepb::ROOTFilePB::Histo &h, std::string &dirname, std::string &objname, TObject **obj) {
0171   size_t slash = h.full_pathname().rfind('/');
0172   size_t dirpos = (slash == std::string::npos ? 0 : slash);
0173   size_t namepos = (slash == std::string::npos ? 0 : slash + 1);
0174   dirname.assign(h.full_pathname(), 0, dirpos);
0175   objname.assign(h.full_pathname(), namepos, std::string::npos);
0176   TBufferFile buf(TBufferFile::kRead, h.size(), (void *)h.streamed_histo().data(), kFALSE);
0177   buf.Reset();
0178   *obj = extractNextObject(buf);
0179   if (!*obj) {
0180     std::cerr << "Error reading element: " << h.full_pathname() << std::endl;
0181   }
0182 }
0183 
0184 void writeMessageFD(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, int out_fd) {
0185   FileOutputStream out_stream(out_fd);
0186   GzipOutputStream::Options options;
0187   options.format = GzipOutputStream::GZIP;
0188   options.compression_level = 2;
0189   GzipOutputStream gzip_stream(&out_stream, options);
0190   dqmstore_output_msg.SerializeToZeroCopyStream(&gzip_stream);
0191 
0192   // make sure we flush before close
0193   gzip_stream.Close();
0194   out_stream.Close();
0195 }
0196 
0197 void writeMessage(const dqmstorepb::ROOTFilePB &dqmstore_output_msg, const std::string &output_filename) {
0198   DEBUG(1, "Writing file" << std::endl);
0199 
0200   int out_fd =
0201       ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
0202 
0203   writeMessageFD(dqmstore_output_msg, out_fd);
0204   ::close(out_fd);
0205 }
0206 
0207 void fillMessage(dqmstorepb::ROOTFilePB &dqmstore_output_msg, const MEStore &micromes) {
0208   auto mi = micromes.begin();
0209   auto me = micromes.end();
0210 
0211   DEBUG(1, "Streaming ROOT objects" << std::endl);
0212   for (; mi != me; ++mi) {
0213     dqmstorepb::ROOTFilePB::Histo *h = dqmstore_output_msg.add_histo();
0214     DEBUG(2, "Streaming ROOT object " << mi->fullname() << "\n");
0215     h->set_full_pathname(mi->fullname());
0216     TBufferFile buffer(TBufferFile::kWrite);
0217     buffer.WriteObject(mi->obj);
0218     h->set_size(buffer.Length());
0219     h->set_flags(mi->flags);
0220     h->set_streamed_histo((const void *)buffer.Buffer(), buffer.Length());
0221     delete mi->obj;
0222   }
0223 }
0224 
0225 void processDirectory(TFile *file, const std::string &curdir, MEStore &micromes) {
0226   DEBUG(1, "Processing directory " << curdir << "\n");
0227   file->cd(curdir.c_str());
0228   TKey *key;
0229   TIter next(gDirectory->GetListOfKeys());
0230   while ((key = (TKey *)next())) {
0231     TObject *obj = key->ReadObj();
0232     if (dynamic_cast<TDirectory *>(obj)) {
0233       std::string subdir;
0234       subdir.reserve(curdir.size() + strlen(obj->GetName()) + 2);
0235       subdir += curdir;
0236       if (!curdir.empty())
0237         subdir += '/';
0238       subdir += obj->GetName();
0239       processDirectory(file, subdir, micromes);
0240     } else if ((dynamic_cast<TH1 *>(obj)) || (dynamic_cast<TObjString *>(obj))) {
0241       if (dynamic_cast<TH1 *>(obj)) {
0242         dynamic_cast<TH1 *>(obj)->SetDirectory(nullptr);
0243       }
0244 
0245       DEBUG(2, curdir << "/" << obj->GetName() << "\n");
0246       MicroME mme(obj, curdir, obj->GetName());
0247 
0248       micromes.insert(mme);
0249     }
0250   }
0251 }
0252 
0253 int encodeFile(const std::string &output_filename, const std::vector<std::string> &filenames) {
0254   assert(filenames.size() == 1);
0255   TFile input(filenames[0].c_str());
0256   DEBUG(0, "Encoding file " << filenames[0] << std::endl);
0257   MEStore micromes;
0258   dqmstorepb::ROOTFilePB dqmstore_message;
0259 
0260   processDirectory(&input, "", micromes);
0261   fillMessage(dqmstore_message, micromes);
0262   writeMessage(dqmstore_message, output_filename);
0263 
0264   return 0;
0265 }
0266 
0267 int convertFile(const std::string &output_filename, const std::vector<std::string> &filenames) {
0268   assert(filenames.size() == 1);
0269   TFile output(output_filename.c_str(), "RECREATE");
0270   DEBUG(0, "Converting file " << filenames[0] << std::endl);
0271   dqmstorepb::ROOTFilePB dqmstore_message;
0272 
0273   int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
0274   FileInputStream fin(filedescriptor);
0275   GzipInputStream input(&fin);
0276   CodedInputStream input_coded(&input);
0277   input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0278   if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
0279     std::cout << "Fatal Error opening file " << filenames[0] << std::endl;
0280     return ERR_NOFILE;
0281   }
0282   ::close(filedescriptor);
0283 
0284   for (int i = 0; i < dqmstore_message.histo_size(); i++) {
0285     const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_message.histo(i);
0286     DEBUG(1, h.full_pathname() << std::endl);
0287     DEBUG(1, h.size() << std::endl);
0288     TBufferFile buf(TBufferFile::kRead, h.size(), (void *)h.streamed_histo().data(), kFALSE);
0289     buf.Reset();
0290     TObject *obj = extractNextObject(buf);
0291     std::string path, objname;
0292     get_info(h, path, objname, &obj);
0293     gDirectory->cd("/");
0294     // Find the first path component.
0295     size_t start = 0;
0296     size_t end = path.find('/', start);
0297     if (end == std::string::npos)
0298       end = path.size();
0299     while (true) {
0300       std::string part(path, start, end - start);
0301       if (!gDirectory->Get(part.c_str()))
0302         gDirectory->mkdir(part.c_str());
0303       gDirectory->cd(part.c_str());
0304       // Stop if we reached the end, ignoring any trailing '/'.
0305       if (end + 1 >= path.size())
0306         break;
0307       // Find the next path component.
0308       start = end + 1;
0309       end = path.find('/', start);
0310       if (end == std::string::npos)
0311         end = path.size();
0312     }
0313     obj->Write();
0314     DEBUG(1, obj->GetName() << std::endl);
0315   }
0316   output.Close();
0317   return 0;
0318 }
0319 
0320 int dumpFiles(const std::vector<std::string> &filenames) {
0321   assert(!filenames.empty());
0322   for (int i = 0, e = filenames.size(); i != e; ++i) {
0323     DEBUG(0, "Dumping file " << filenames[i] << std::endl);
0324     dqmstorepb::ROOTFilePB dqmstore_message;
0325 
0326     int filedescriptor = ::open(filenames[0].c_str(), O_RDONLY);
0327     FileInputStream fin(filedescriptor);
0328     GzipInputStream input(&fin);
0329     CodedInputStream input_coded(&input);
0330     input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0331     if (!dqmstore_message.ParseFromCodedStream(&input_coded)) {
0332       std::cout << "Fatal Error opening file " << filenames[0] << std::endl;
0333       return ERR_NOFILE;
0334     }
0335     ::close(filedescriptor);
0336 
0337     for (int i = 0; i < dqmstore_message.histo_size(); i++) {
0338       const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_message.histo(i);
0339       DEBUG(1, h.full_pathname() << std::endl);
0340       DEBUG(1, h.size() << std::endl);
0341       TBufferFile buf(TBufferFile::kRead, h.size(), (void *)h.streamed_histo().data(), kFALSE);
0342       buf.Reset();
0343       TObject *obj = extractNextObject(buf);
0344       DEBUG(1, obj->GetName() << std::endl);
0345       DEBUG(1, "Flags: " << h.flags() << std::endl);
0346     }
0347   }
0348 
0349   return 0;
0350 }
0351 
0352 int addFile(MEStore &micromes, int fd) {
0353   dqmstorepb::ROOTFilePB dqmstore_msg;
0354 
0355   FileInputStream fin(fd);
0356   GzipInputStream input(&fin);
0357   CodedInputStream input_coded(&input);
0358   input_coded.SetTotalBytesLimit(1024 * 1024 * 1024);
0359   if (!dqmstore_msg.ParseFromCodedStream(&input_coded)) {
0360     std::cout << "Fatal decoding stream: " << fd << std::endl;
0361     return ERR_NOFILE;
0362   }
0363 
0364   auto hint = micromes.begin();
0365   for (int i = 0; i < dqmstore_msg.histo_size(); i++) {
0366     std::string path;
0367     std::string objname;
0368     TObject *obj = nullptr;
0369     const dqmstorepb::ROOTFilePB::Histo &h = dqmstore_msg.histo(i);
0370     get_info(h, path, objname, &obj);
0371 
0372     MicroME mme(nullptr, path, objname, h.flags());
0373     auto ir = micromes.insert(hint, mme);
0374     if (ir->obj != nullptr) {
0375       // new element was not added
0376       // so we merge
0377 
0378       ir->add(obj);
0379       delete obj;
0380       DEBUG(2, "Merged MicroME " << mme.fullname() << std::endl);
0381     } else {
0382       ir->obj = obj;
0383       DEBUG(2, "Inserted MicroME " << mme.fullname() << std::endl);
0384     }
0385 
0386     hint = ir;
0387     ++hint;
0388   }
0389 
0390   return 0;
0391 }
0392 
0393 // The idea is to preload root library (before forking).
0394 // Which is significant for performance and especially memory usage,
0395 // because root aakes a long time to init (and somehow manages to launch a subshell).
0396 void tryRootPreload() {
0397   // write a single histogram
0398   TH1F obj_th1f("preload_th1f", "preload_th1f", 2, 0, 1);
0399 
0400   TBufferFile write_buffer(TBufferFile::kWrite);
0401   write_buffer.WriteObject(&obj_th1f);
0402 
0403   dqmstorepb::ROOTFilePB preload_file;
0404   dqmstorepb::ROOTFilePB::Histo *hw = preload_file.add_histo();
0405   hw->set_size(write_buffer.Length());
0406   hw->set_flags(0);
0407   hw->set_streamed_histo((const void *)write_buffer.Buffer(), write_buffer.Length());
0408 
0409   // now load this th1f
0410   const dqmstorepb::ROOTFilePB::Histo &hr = preload_file.histo(0);
0411   std::string path;
0412   std::string objname;
0413   TObject *obj = nullptr;
0414   get_info(hr, path, objname, &obj);
0415   delete obj;
0416 
0417   // all done
0418 }
0419 
0420 /* fork_id represents the position in a node (node number). */
0421 void addFilesWithFork(int parent_fd,
0422                       const int fork_id,
0423                       const int fork_total,
0424                       const std::vector<std::string> &filenames) {
0425   DEBUG(1, "Start process: " << fork_id << " parent: " << (fork_id / 2) << std::endl);
0426 
0427   std::list<std::pair<int, int> > children;
0428 
0429   // if this node has a subtree, start it
0430   for (int i = 0; i < 2; ++i) {
0431     int child_id = fork_id * 2 + i;
0432     if (child_id > fork_total)
0433       continue;
0434 
0435     int fd[2];
0436     ::pipe(fd);
0437 
0438     int child_pid = ::fork();
0439     if (child_pid == 0) {
0440       ::prctl(PR_SET_PDEATHSIG, SIGKILL);
0441       ::close(fd[0]);  // close read end
0442 
0443       addFilesWithFork(fd[1], child_id, fork_total, filenames);
0444       ::close(fd[1]);
0445 
0446       ::_exit(0);
0447     } else {
0448       ::close(fd[1]);  // close write end
0449       children.push_back(std::make_pair(fd[0], child_pid));
0450     }
0451   }
0452 
0453   // merge all my files
0454   MEStore microme;
0455 
0456   // select the filenames to process
0457   // with threads=1, this just selects all the files
0458   for (unsigned int fi = fork_id - 1; fi < filenames.size(); fi += fork_total) {
0459     const std::string &file = filenames[fi];
0460     DEBUG(1, "Adding file " << file << std::endl);
0461 
0462     int filedescriptor;
0463     if ((filedescriptor = ::open(file.c_str(), O_RDONLY)) == -1) {
0464       std::cout << "Fatal Error opening file " << file << std::endl;
0465 
0466       exit(ERR_NOFILE);
0467     }
0468 
0469     addFile(microme, filedescriptor);
0470     ::close(filedescriptor);
0471   }
0472 
0473   // merge all children
0474   for (auto &chpair : children) {
0475     int fd = chpair.first;
0476     addFile(microme, fd);
0477     ::close(fd);
0478 
0479     // collect the child, not necessary, but avoids <defunct>
0480     int status;
0481     ::waitpid(chpair.second, &status, 0);
0482   }
0483 
0484   // output everything to fd
0485   dqmstorepb::ROOTFilePB dqmstore_output_msg;
0486   fillMessage(dqmstore_output_msg, microme);
0487   writeMessageFD(dqmstore_output_msg, parent_fd);
0488 };
0489 
0490 int addFiles(const std::string &output_filename, const std::vector<std::string> &filenames, int nthreads) {
0491   tryRootPreload();
0492 
0493   DEBUG(1, "Writing file" << std::endl);
0494   int out_fd =
0495       ::open(output_filename.c_str(), O_WRONLY | O_CREAT | O_TRUNC, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH);
0496 
0497   addFilesWithFork(out_fd, 1, nthreads, filenames);
0498   ::close(out_fd);
0499 
0500   return 0;
0501 }
0502 
0503 static int showusage() {
0504   static const std::string app_name("fasthadd");
0505 
0506   std::cerr << "Usage: " << app_name << " [--[no-]debug] TASK OPTIONS\n\n  " << app_name
0507             << " [OPTIONS] add [-j NUM_THREADS] -o OUTPUT_FILE [DAT FILE...]\n  " << app_name
0508             << " [OPTIONS] convert -o ROOT_FILE DAT_FILE\n  " << app_name
0509             << " [OPTIONS] encode -o DAT_FILE ROOT_FILE\n  " << app_name << " [OPTIONS] dump [DAT FILE...]\n  ";
0510   return ERR_BADCFG;
0511 }
0512 
0513 int main(int argc, char *argv[]) {
0514   int arg;
0515   int ret = 0;
0516   int jobs = 1;
0517   std::string output_file;
0518   std::vector<std::string> filenames;
0519   TaskType task;
0520 
0521   filenames.reserve(argc);
0522 
0523   for (arg = 1; arg < argc; ++arg) {
0524     if (!strcmp(argv[arg], "--no-debug"))
0525       debug = 0;
0526     else if (!strcmp(argv[arg], "--debug") || !strcmp(argv[arg], "-d"))
0527       debug++;
0528     else
0529       break;
0530   }
0531 
0532   if (arg < argc) {
0533     if (!strcmp(argv[arg], "add")) {
0534       ++arg;
0535       task = TASK_ADD;
0536     } else if (!strcmp(argv[arg], "dump")) {
0537       ++arg;
0538       task = TASK_DUMP;
0539     } else if (!strcmp(argv[arg], "convert")) {
0540       ++arg;
0541       task = TASK_CONVERT;
0542     } else if (!strcmp(argv[arg], "encode")) {
0543       ++arg;
0544       task = TASK_ENCODE;
0545     } else {
0546       std::cerr << "Unknown action: " << argv[arg] << std::endl;
0547       return showusage();
0548     }
0549   } else {
0550     std::cerr << "Not enough arguments\n";
0551     return showusage();
0552   }
0553 
0554   if (task == TASK_ADD) {
0555     if ((arg != argc) && (strcmp(argv[arg], "-j") == 0)) {
0556       jobs = atoi(argv[arg + 1]);
0557 
0558       if ((jobs < 1) || (jobs > 128)) {
0559         std::cerr << "Invalid argument for -j\n";
0560         return showusage();
0561       };
0562 
0563       arg += 2;
0564     }
0565   }
0566 
0567   if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) {
0568     if (arg == argc) {
0569       std::cerr << "add|convert|encode actions requires a -o option to be set\n";
0570       return showusage();
0571     }
0572     if (!strcmp(argv[arg], "-o")) {
0573       if (arg < argc - 1) {
0574         output_file = argv[++arg];
0575       } else {
0576         std::cerr << " -o option requires a value\n";
0577         return showusage();
0578       }
0579     }
0580   } else if (task == TASK_DUMP) {
0581     if (arg == argc) {
0582       std::cerr << "Missing input file(s)\n";
0583       return showusage();
0584     }
0585     for (; arg < argc; ++arg) {
0586       filenames.emplace_back(argv[arg]);
0587     }
0588   }
0589 
0590   if (task == TASK_ADD || task == TASK_CONVERT || task == TASK_ENCODE) {
0591     if (++arg == argc) {
0592       std::cerr << "Missing input file(s)\n";
0593       return showusage();
0594     }
0595     for (; arg < argc; ++arg) {
0596       filenames.emplace_back(argv[arg]);
0597     }
0598   }
0599 
0600   if (task == TASK_ADD)
0601     ret = addFiles(output_file, filenames, jobs);
0602   else if (task == TASK_DUMP)
0603     ret = dumpFiles(filenames);
0604   else if (task == TASK_CONVERT)
0605     ret = convertFile(output_file, filenames);
0606   else if (task == TASK_ENCODE)
0607     ret = encodeFile(output_file, filenames);
0608 
0609   google::protobuf::ShutdownProtobufLibrary();
0610   return ret;
0611 }