Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 10:41:35

0001 #include "IOPool/Streamer/interface/MsgTools.h"
0002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0003 #include "CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h"
0004 #include "FWCore/Utilities/interface/Exception.h"
0005 
0006 #include <cerrno>
0007 #include <climits>
0008 #include <cstdio>
0009 #include <cstdlib>
0010 #include <cstring>
0011 #include <fcntl.h>
0012 #include <fstream>
0013 #include <libgen.h>
0014 #include <memory>
0015 #include <sys/stat.h>
0016 #include <sys/time.h>
0017 #include <sys/types.h>
0018 #include <unistd.h>
0019 
0020 //using namespace edm;
0021 using namespace std;
0022 
0023 //std::string WatcherStreamFileReader::fileName_;
0024 
0025 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
0026 /* getline implementation is copied from glibc. */
0027 
0028 #ifndef SIZE_MAX
0029 #define SIZE_MAX ((size_t)-1)
0030 #endif
0031 #ifndef SSIZE_MAX
0032 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2))
0033 #endif
0034 namespace {
0035   ssize_t getline(char** lineptr, size_t* n, FILE* fp) {
0036     ssize_t result = -1;
0037     size_t cur_len = 0;
0038 
0039     if (lineptr == NULL || n == NULL || fp == NULL) {
0040       errno = EINVAL;
0041       return -1;
0042     }
0043 
0044     if (*lineptr == NULL || *n == 0) {
0045       *n = 120;
0046       *lineptr = (char*)malloc(*n);
0047       if (*lineptr == NULL) {
0048         result = -1;
0049         goto end;
0050       }
0051     }
0052 
0053     for (;;) {
0054       int i;
0055 
0056       i = getc(fp);
0057       if (i == EOF) {
0058         result = -1;
0059         break;
0060       }
0061 
0062       /* Make enough space for len+1 (for final NUL) bytes.  */
0063       if (cur_len + 1 >= *n) {
0064         size_t needed_max = SSIZE_MAX < SIZE_MAX ? (size_t)SSIZE_MAX + 1 : SIZE_MAX;
0065         size_t needed = 2 * *n + 1; /* Be generous. */
0066         char* new_lineptr;
0067 
0068         if (needed_max < needed)
0069           needed = needed_max;
0070         if (cur_len + 1 >= needed) {
0071           result = -1;
0072           goto end;
0073         }
0074 
0075         new_lineptr = (char*)realloc(*lineptr, needed);
0076         if (new_lineptr == NULL) {
0077           result = -1;
0078           goto end;
0079         }
0080 
0081         *lineptr = new_lineptr;
0082         *n = needed;
0083       }
0084 
0085       (*lineptr)[cur_len] = i;
0086       cur_len++;
0087 
0088       if (i == '\n')
0089         break;
0090     }
0091     (*lineptr)[cur_len] = '\0';
0092     result = cur_len ? (ssize_t)cur_len : result;
0093 
0094   end:
0095     return result;
0096   }
0097 }  // namespace
0098 #endif
0099 
0100 static std::string now() {
0101   struct timeval t;
0102   gettimeofday(&t, nullptr);
0103 
0104   char buf[256];
0105   strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
0106   buf[sizeof(buf) - 1] = 0;
0107 
0108   stringstream buf2;
0109   buf2 << buf << " " << ((t.tv_usec + 500) / 1000) << " ms";
0110 
0111   return buf2.str();
0112 }
0113 
0114 WatcherStreamFileReader::WatcherStreamFileReader(edm::ParameterSet const& pset)
0115     : inputDir_(pset.getParameter<std::string>("inputDir")),
0116       filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
0117       inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
0118       processedDir_(pset.getParameter<std::string>("processedDir")),
0119       corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
0120       tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile", "watcherSourceToken")),
0121       timeOut_(pset.getParameter<int>("timeOutInSec")),
0122       end_(false),
0123       verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)) {
0124   struct stat buf;
0125   if (stat(tokenFile_.c_str(), &buf)) {
0126     FILE* f = fopen(tokenFile_.c_str(), "w");
0127     if (f) {
0128       fclose(f);
0129     } else {
0130       throw cms::Exception("WatcherSource") << "Failed to create token file.";
0131     }
0132   }
0133   vector<string> dirs;
0134   dirs.push_back(inprocessDir_);
0135   dirs.push_back(processedDir_);
0136   dirs.push_back(corruptedDir_);
0137 
0138   for (unsigned i = 0; i < dirs.size(); ++i) {
0139     const string& dir = dirs[i];
0140     struct stat fileStat;
0141     if (0 == stat(dir.c_str(), &fileStat)) {
0142       if (!S_ISDIR(fileStat.st_mode)) {
0143         throw cms::Exception("[WatcherSource]") << "File " << dir << " exists but is not a directory "
0144                                                 << " as expected.";
0145       }
0146     } else {  //directory does not exists, let's try to create it
0147       if (0 != mkdir(dir.c_str(), 0755)) {
0148         throw cms::Exception("[WatcherSource]") << "Failed to create directory " << dir << " for writing data.";
0149       }
0150     }
0151   }
0152 }
0153 
0154 WatcherStreamFileReader::~WatcherStreamFileReader() {}
0155 
0156 const bool WatcherStreamFileReader::newHeader() {
0157   edm::StreamerInputFile* inputFile = getInputFile();
0158   return inputFile ? inputFile->newHeader() : false;
0159 }
0160 
0161 const InitMsgView* WatcherStreamFileReader::getHeader() {
0162   edm::StreamerInputFile* inputFile = getInputFile();
0163 
0164   //TODO: shall better send an exception...
0165   if (inputFile == nullptr) {
0166     throw cms::Exception("WatcherSource") << "No input file found.";
0167   }
0168 
0169   const InitMsgView* header = inputFile->startMessage();
0170 
0171   if (header->code() != Header::INIT)  //INIT Msg
0172     throw cms::Exception("readHeader", "WatcherStreamFileReader")
0173         << "received wrong message type: expected INIT, got " << header->code() << "\n";
0174 
0175   return header;
0176 }
0177 
0178 const EventMsgView* WatcherStreamFileReader::getNextEvent() {
0179   if (end_) {
0180     closeFile();
0181     return nullptr;
0182   }
0183 
0184   edm::StreamerInputFile* inputFile;
0185 
0186   //go to next input file, till no new event is found
0187   while ((inputFile = getInputFile()) != nullptr && inputFile->next() != edm::StreamerInputFile::Next::kEvent) {
0188     closeFile();
0189   }
0190 
0191   return inputFile == nullptr ? nullptr : inputFile->currentRecord();
0192 }
0193 
0194 edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() {
0195   char* lineptr = nullptr;
0196   size_t n = 0;
0197   static stringstream cmd;
0198   static bool cmdSet = false;
0199   static char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
0200 
0201   if (!cmdSet) {
0202     cmd.str("");
0203     //    cmd << "/bin/ls -rt " << inputDir_ << " | egrep '(";
0204     //by default ls will sort the file alphabetically which will results
0205     //in ordering the files in increasing LB number, which is the desired
0206     //order.
0207     //    cmd << "/bin/ls " << inputDir_ << " | egrep '(";
0208     cmd << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
0209     //TODO: validate patternDir (see ;, &&, ||) and escape special character
0210     if (filePatterns_.empty())
0211       return nullptr;
0212     if (getcwd(curDir, sizeof(curDir)) == nullptr) {
0213       throw cms::Exception("WatcherSource") << "Failed to retreived working directory path: " << strerror(errno);
0214     }
0215 
0216     for (unsigned i = 0; i < filePatterns_.size(); ++i) {
0217       if (i > 0)
0218         cmd << "|";
0219       //     if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
0220       //       cmd << curDir << "/";
0221       //     }
0222       cmd << filePatterns_[i];
0223     }
0224     cmd << ")' | sort";
0225 
0226     cout << "[WatcherSource " << now() << "]"
0227          << " Command to retrieve input files: " << cmd.str() << "\n";
0228     cmdSet = true;
0229   }
0230 
0231   struct stat buf;
0232 
0233   if (stat(tokenFile_.c_str(), &buf) != 0) {
0234     end_ = true;
0235   }
0236 
0237   bool waiting = false;
0238   static bool firstWait = true;
0239   timeval waitStart;
0240   //if no cached input file, look for new files until one is found:
0241   if (!end_ && streamerInputFile_.get() == nullptr) {
0242     fileName_.assign("");
0243 
0244     //check if we have file in the queue, if not look for new files:
0245     while (filesInQueue_.empty()) {
0246       if (stat(tokenFile_.c_str(), &buf) != 0) {
0247         end_ = true;
0248         break;
0249       }
0250       FILE* s = popen(cmd.str().c_str(), "r");
0251       if (s == nullptr) {
0252         throw cms::Exception("WatcherSource") << "Failed to retrieve list of input file: " << strerror(errno);
0253       }
0254 
0255       ssize_t len;
0256       while (!feof(s)) {
0257         if ((len = getline(&lineptr, &n, s)) > 0) {
0258           //remove end-of-line character:
0259           lineptr[len - 1] = 0;
0260           string fileName;
0261           if (lineptr[0] != '/') {
0262             if (!inputDir_.empty() && inputDir_[0] != '/') {  //relative path
0263               fileName.assign(curDir);
0264               fileName.append("/");
0265               fileName.append(inputDir_);
0266             } else {
0267               fileName.assign(inputDir_);
0268             }
0269             fileName.append("/");
0270           }
0271           fileName.append(lineptr);
0272           filesInQueue_.push_back(fileName);
0273           if (verbosity_)
0274             cout << "[WatcherSource " << now() << "]"
0275                  << " File to process: '" << fileName << "'\n";
0276         }
0277       }
0278       while (!feof(s))
0279         fgetc(s);
0280       pclose(s);
0281       if (filesInQueue_.empty()) {
0282         if (!waiting) {
0283           cout << "[WatcherSource " << now() << "]"
0284                << " No file found. Waiting for new file...\n";
0285           cout << flush;
0286           waiting = true;
0287           gettimeofday(&waitStart, nullptr);
0288         } else if (!firstWait) {
0289           timeval t;
0290           gettimeofday(&t, nullptr);
0291           float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
0292           if ((timeOut_ >= 0) && (dt > timeOut_)) {
0293             cout << "[WatcherSource " << now() << "]"
0294                  << " Having waited for new file for " << (int)dt << " sec. "
0295                  << "Timeout exceeded. Exits.\n";
0296             //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
0297             end_ = true;
0298             break;
0299           }
0300         }
0301       }
0302       sleep(1);
0303     }  //end of file queue update
0304     firstWait = false;
0305     free(lineptr);
0306     lineptr = nullptr;
0307 
0308     while (streamerInputFile_.get() == nullptr && !filesInQueue_.empty()) {
0309       fileName_ = filesInQueue_.front();
0310       filesInQueue_.pop_front();
0311       int fd = open(fileName_.c_str(), 0);
0312       if (fd != 0) {
0313         struct stat buf;
0314         off_t size = -1;
0315         //check that file transfer is finished, by monitoring its size:
0316         time_t t = time(nullptr);
0317         for (;;) {
0318           fstat(fd, &buf);
0319           if (verbosity_)
0320             cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
0321           if (buf.st_size == size)
0322             break;
0323           else
0324             size = buf.st_size;
0325           if (difftime(t, buf.st_mtime) > 60)
0326             break;  //file older then 1 min=> tansfer must be finished
0327           sleep(1);
0328         }
0329 
0330         if (fd != 0 && buf.st_size == 0) {  //file is empty. streamer reader
0331           //                   does not like empty file=> skip it
0332           stringstream c;
0333           c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_ << "/.\"";
0334           if (verbosity_)
0335             cout << "[WatcherSource " << now() << "]"
0336                  << " Executing " << c.str() << "\n";
0337           int i = system(c.str().c_str());
0338           if (i != 0) {
0339             //throw cms::Exception("WatcherSource")
0340             cout << "[WatcherSource " << now() << "] "
0341                  << "Failed to move empty file '" << fileName_ << "'"
0342                  << " to corrupted directory '" << corruptedDir_ << "'\n";
0343           }
0344           continue;
0345         }
0346 
0347         close(fd);
0348 
0349         vector<char> buf1(fileName_.size() + 1);
0350         copy(fileName_.begin(), fileName_.end(), buf1.begin());
0351         buf1[buf1.size() - 1] = 0;
0352 
0353         vector<char> buf2(fileName_.size() + 1);
0354         copy(fileName_.begin(), fileName_.end(), buf2.begin());
0355         buf2[buf2.size() - 1] = 0;
0356 
0357         string dirnam(dirname(&buf1[0]));
0358         string filenam(basename(&buf2[0]));
0359 
0360         string dest = inprocessDir_ + "/" + filenam;
0361 
0362         if (verbosity_)
0363           cout << "[WatcherSource " << now() << "]"
0364                << " Moving file " << fileName_ << " to " << dest << "\n";
0365 
0366         //  stringstream c;
0367         //c << "/bin/mv -f \"" << fileName_ << "\" \"" << dest
0368         // << "/.\"";
0369 
0370         if (0 != rename(fileName_.c_str(), dest.c_str())) {
0371           //if(0!=system(c.str().c_str())){
0372           throw cms::Exception("WatcherSource") << "Failed to move file '" << fileName_ << "' "
0373                                                 << "to processing directory " << inprocessDir_ << ": "
0374                                                 << strerror(errno) << " (Error no " << errno << ")";
0375         }
0376 
0377         fileName_ = dest;
0378 
0379         cout << "[WatcherSource " << now() << "]"
0380              << " Opening file " << fileName_ << "\n"
0381              << flush;
0382         streamerInputFile_ = std::make_unique<edm::StreamerInputFile>(fileName_);
0383 
0384         ofstream f(".watcherfile");
0385         f << fileName_;
0386       } else {
0387         cout << "[WatcherSource " << now() << "]"
0388              << " Failed to open file " << fileName_ << endl;
0389       }
0390     }  //loop on file queue to find one file which opening succeeded
0391   }
0392   return streamerInputFile_.get();
0393 }
0394 
0395 void WatcherStreamFileReader::closeFile() {
0396   if (streamerInputFile_.get() == nullptr)
0397     return;
0398   //delete the streamer input file:
0399   streamerInputFile_.reset();
0400   stringstream cmd;
0401   //TODO: validation of processDir
0402   //cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
0403   //if(verbosity_) cout << "[WatcherSource " << now() << "]"
0404   //<< " Executing (in closeFile())" << cmd.str() << "\n";
0405   //int i = system(cmd.str().c_str());
0406   //cout << "move command done at " << now() << "\n";
0407   vector<char> buf(fileName_.size() + 1);
0408   copy(fileName_.begin(), fileName_.end(), buf.begin());
0409   buf[buf.size() - 1] = 0;
0410   string dest = processedDir_ + "/" + basename(&buf[0]);
0411   if (verbosity_)
0412     cout << "[WatcherSource " << now() << "]"
0413          << " Moving " << fileName_ << " to " << dest << "... ";
0414   int i = rename(fileName_.c_str(), dest.c_str());
0415   if (i != 0) {
0416     throw cms::Exception("WatcherSource")
0417         << "Failed to move processed file '" << fileName_ << "'"
0418         << " to processed directory '" << processedDir_ << ": " << strerror(errno) << " (Error no " << errno << ")";
0419 
0420     //Stop further processing to prevent endless loop:
0421     end_ = true;
0422   }
0423   if (verbosity_)
0424     cout << "Done at " << now() << "\n";
0425 
0426   //  cout << flush;
0427 }