Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-09-07 04:34:51

0001 #include "IOPool/Streamer/interface/MsgTools.h"
0002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0003 #include "CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h"
0004 #include "FWCore/Utilities/interface/Exception.h"
0005 
0006 #include <cerrno>
0007 #include <climits>
0008 #include <cstdio>
0009 #include <cstdlib>
0010 #include <cstring>
0011 #include <fcntl.h>
0012 #include <fstream>
0013 #include <libgen.h>
0014 #include <memory>
0015 #include <sys/stat.h>
0016 #include <sys/time.h>
0017 #include <sys/types.h>
0018 #include <unistd.h>
0019 
0020 //using namespace edm;
0021 using namespace std;
0022 using namespace edm::streamer;
0023 
0024 //std::string WatcherStreamFileReader::fileName_;
0025 
0026 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
0027 /* getline implementation is copied from glibc. */
0028 
0029 #ifndef SIZE_MAX
0030 #define SIZE_MAX ((size_t) - 1)
0031 #endif
0032 #ifndef SSIZE_MAX
0033 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2))
0034 #endif
0035 namespace {
0036   ssize_t getline(char** lineptr, size_t* n, FILE* fp) {
0037     ssize_t result = -1;
0038     size_t cur_len = 0;
0039 
0040     if (lineptr == NULL || n == NULL || fp == NULL) {
0041       errno = EINVAL;
0042       return -1;
0043     }
0044 
0045     if (*lineptr == NULL || *n == 0) {
0046       *n = 120;
0047       *lineptr = (char*)malloc(*n);
0048       if (*lineptr == NULL) {
0049         result = -1;
0050         goto end;
0051       }
0052     }
0053 
0054     for (;;) {
0055       int i;
0056 
0057       i = getc(fp);
0058       if (i == EOF) {
0059         result = -1;
0060         break;
0061       }
0062 
0063       /* Make enough space for len+1 (for final NUL) bytes.  */
0064       if (cur_len + 1 >= *n) {
0065         size_t needed_max = SSIZE_MAX < SIZE_MAX ? (size_t)SSIZE_MAX + 1 : SIZE_MAX;
0066         size_t needed = 2 * *n + 1; /* Be generous. */
0067         char* new_lineptr;
0068 
0069         if (needed_max < needed)
0070           needed = needed_max;
0071         if (cur_len + 1 >= needed) {
0072           result = -1;
0073           goto end;
0074         }
0075 
0076         new_lineptr = (char*)realloc(*lineptr, needed);
0077         if (new_lineptr == NULL) {
0078           result = -1;
0079           goto end;
0080         }
0081 
0082         *lineptr = new_lineptr;
0083         *n = needed;
0084       }
0085 
0086       (*lineptr)[cur_len] = i;
0087       cur_len++;
0088 
0089       if (i == '\n')
0090         break;
0091     }
0092     (*lineptr)[cur_len] = '\0';
0093     result = cur_len ? (ssize_t)cur_len : result;
0094 
0095   end:
0096     return result;
0097   }
0098 }  // namespace
0099 #endif
0100 
0101 static std::string now() {
0102   struct timeval t;
0103   gettimeofday(&t, nullptr);
0104 
0105   char buf[256];
0106   strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
0107   buf[sizeof(buf) - 1] = 0;
0108 
0109   stringstream buf2;
0110   buf2 << buf << " " << ((t.tv_usec + 500) / 1000) << " ms";
0111 
0112   return buf2.str();
0113 }
0114 
0115 WatcherStreamFileReader::WatcherStreamFileReader(edm::ParameterSet const& pset)
0116     : inputDir_(pset.getParameter<std::string>("inputDir")),
0117       filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
0118       inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
0119       processedDir_(pset.getParameter<std::string>("processedDir")),
0120       corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
0121       tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile", "watcherSourceToken")),
0122       timeOut_(pset.getParameter<int>("timeOutInSec")),
0123       end_(false),
0124       verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)) {
0125   struct stat buf;
0126   if (stat(tokenFile_.c_str(), &buf)) {
0127     FILE* f = fopen(tokenFile_.c_str(), "w");
0128     if (f) {
0129       fclose(f);
0130     } else {
0131       throw cms::Exception("WatcherSource") << "Failed to create token file.";
0132     }
0133   }
0134   vector<string> dirs;
0135   dirs.push_back(inprocessDir_);
0136   dirs.push_back(processedDir_);
0137   dirs.push_back(corruptedDir_);
0138 
0139   for (unsigned i = 0; i < dirs.size(); ++i) {
0140     const string& dir = dirs[i];
0141     struct stat fileStat;
0142     if (0 == stat(dir.c_str(), &fileStat)) {
0143       if (!S_ISDIR(fileStat.st_mode)) {
0144         throw cms::Exception("[WatcherSource]") << "File " << dir << " exists but is not a directory "
0145                                                 << " as expected.";
0146       }
0147     } else {  //directory does not exists, let's try to create it
0148       if (0 != mkdir(dir.c_str(), 0755)) {
0149         throw cms::Exception("[WatcherSource]") << "Failed to create directory " << dir << " for writing data.";
0150       }
0151     }
0152   }
0153 }
0154 
0155 WatcherStreamFileReader::~WatcherStreamFileReader() {}
0156 
0157 const bool WatcherStreamFileReader::newHeader() { return getInputFile() != nullptr; }
0158 
0159 const InitMsgView* WatcherStreamFileReader::getHeader() {
0160   StreamerInputFile* inputFile = getInputFile();
0161 
0162   //TODO: shall better send an exception...
0163   if (inputFile == nullptr) {
0164     throw cms::Exception("WatcherSource") << "No input file found.";
0165   }
0166 
0167   const InitMsgView* header = inputFile->startMessage();
0168 
0169   if (header->code() != Header::INIT)  //INIT Msg
0170     throw cms::Exception("readHeader", "WatcherStreamFileReader")
0171         << "received wrong message type: expected INIT, got " << header->code() << "\n";
0172 
0173   return header;
0174 }
0175 
0176 const EventMsgView* WatcherStreamFileReader::getNextEvent() {
0177   if (end_) {
0178     moveJustReadFile();
0179     return nullptr;
0180   }
0181 
0182   StreamerInputFile* inputFile;
0183   if ((inputFile = getInputFile()) != nullptr and inputFile->next() == StreamerInputFile::Next::kStop) {
0184     moveJustReadFile();
0185     return nullptr;
0186   }
0187 
0188   return inputFile == nullptr ? nullptr : inputFile->currentRecord();
0189 }
0190 
0191 StreamerInputFile* WatcherStreamFileReader::getInputFile() {
0192   char* lineptr = nullptr;
0193   size_t n = 0;
0194   static stringstream cmd;
0195   static bool cmdSet = false;
0196   static char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
0197 
0198   if (!cmdSet) {
0199     cmd.str("");
0200     //    cmd << "/bin/ls -rt " << inputDir_ << " | egrep '(";
0201     //by default ls will sort the file alphabetically which will results
0202     //in ordering the files in increasing LB number, which is the desired
0203     //order.
0204     //    cmd << "/bin/ls " << inputDir_ << " | egrep '(";
0205     cmd << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
0206     //TODO: validate patternDir (see ;, &&, ||) and escape special character
0207     if (filePatterns_.empty())
0208       return nullptr;
0209     if (getcwd(curDir, sizeof(curDir)) == nullptr) {
0210       throw cms::Exception("WatcherSource") << "Failed to retreived working directory path: " << strerror(errno);
0211     }
0212 
0213     for (unsigned i = 0; i < filePatterns_.size(); ++i) {
0214       if (i > 0)
0215         cmd << "|";
0216       //     if(filePatterns_[i].size()>0 && filePatterns_[0] != "/"){//relative path
0217       //       cmd << curDir << "/";
0218       //     }
0219       cmd << filePatterns_[i];
0220     }
0221     cmd << ")' | sort";
0222 
0223     cout << "[WatcherSource " << now() << "]"
0224          << " Command to retrieve input files: " << cmd.str() << "\n";
0225     cmdSet = true;
0226   }
0227 
0228   struct stat buf;
0229 
0230   if (stat(tokenFile_.c_str(), &buf) != 0) {
0231     end_ = true;
0232   }
0233 
0234   bool waiting = false;
0235   static bool firstWait = true;
0236   timeval waitStart;
0237   //if no cached input file, look for new files until one is found:
0238   if (!end_ && streamerInputFile_.get() == nullptr) {
0239     fileName_.assign("");
0240 
0241     //check if we have file in the queue, if not look for new files:
0242     while (filesInQueue_.empty()) {
0243       if (stat(tokenFile_.c_str(), &buf) != 0) {
0244         end_ = true;
0245         break;
0246       }
0247       FILE* s = popen(cmd.str().c_str(), "r");
0248       if (s == nullptr) {
0249         throw cms::Exception("WatcherSource") << "Failed to retrieve list of input file: " << strerror(errno);
0250       }
0251 
0252       ssize_t len;
0253       while (!feof(s)) {
0254         if ((len = getline(&lineptr, &n, s)) > 0) {
0255           //remove end-of-line character:
0256           lineptr[len - 1] = 0;
0257           string fileName;
0258           if (lineptr[0] != '/') {
0259             if (!inputDir_.empty() && inputDir_[0] != '/') {  //relative path
0260               fileName.assign(curDir);
0261               fileName.append("/");
0262               fileName.append(inputDir_);
0263             } else {
0264               fileName.assign(inputDir_);
0265             }
0266             fileName.append("/");
0267           }
0268           fileName.append(lineptr);
0269           filesInQueue_.push_back(fileName);
0270           if (verbosity_)
0271             cout << "[WatcherSource " << now() << "]"
0272                  << " File to process: '" << fileName << "'\n";
0273         }
0274       }
0275       while (!feof(s))
0276         fgetc(s);
0277       pclose(s);
0278       if (filesInQueue_.empty()) {
0279         if (!waiting) {
0280           cout << "[WatcherSource " << now() << "]"
0281                << " No file found. Waiting for new file...\n";
0282           cout << flush;
0283           waiting = true;
0284           gettimeofday(&waitStart, nullptr);
0285         } else if (!firstWait) {
0286           timeval t;
0287           gettimeofday(&t, nullptr);
0288           float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
0289           if ((timeOut_ >= 0) && (dt > timeOut_)) {
0290             cout << "[WatcherSource " << now() << "]"
0291                  << " Having waited for new file for " << (int)dt << " sec. "
0292                  << "Timeout exceeded. Exits.\n";
0293             //remove(tokenFile_.c_str()); //we do not delete the token, otherwise sorting process on the monitoring farm will not be restarted by the runloop.sh script.
0294             end_ = true;
0295             break;
0296           }
0297         }
0298       }
0299       sleep(1);
0300     }  //end of file queue update
0301     firstWait = false;
0302     free(lineptr);
0303     lineptr = nullptr;
0304 
0305     while (streamerInputFile_.get() == nullptr && !filesInQueue_.empty()) {
0306       fileName_ = filesInQueue_.front();
0307       filesInQueue_.pop_front();
0308       int fd = open(fileName_.c_str(), 0);
0309       if (fd != 0) {
0310         struct stat buf;
0311         off_t size = -1;
0312         //check that file transfer is finished, by monitoring its size:
0313         time_t t = time(nullptr);
0314         for (;;) {
0315           fstat(fd, &buf);
0316           if (verbosity_)
0317             cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
0318           if (buf.st_size == size)
0319             break;
0320           else
0321             size = buf.st_size;
0322           if (difftime(t, buf.st_mtime) > 60)
0323             break;  //file older then 1 min=> tansfer must be finished
0324           sleep(1);
0325         }
0326 
0327         if (fd != 0 && buf.st_size == 0) {  //file is empty. streamer reader
0328           //                   does not like empty file=> skip it
0329           stringstream c;
0330           c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_ << "/.\"";
0331           if (verbosity_)
0332             cout << "[WatcherSource " << now() << "]"
0333                  << " Executing " << c.str() << "\n";
0334           int i = system(c.str().c_str());
0335           if (i != 0) {
0336             //throw cms::Exception("WatcherSource")
0337             cout << "[WatcherSource " << now() << "] "
0338                  << "Failed to move empty file '" << fileName_ << "'"
0339                  << " to corrupted directory '" << corruptedDir_ << "'\n";
0340           }
0341           continue;
0342         }
0343 
0344         close(fd);
0345 
0346         vector<char> buf1(fileName_.size() + 1);
0347         copy(fileName_.begin(), fileName_.end(), buf1.begin());
0348         buf1[buf1.size() - 1] = 0;
0349 
0350         vector<char> buf2(fileName_.size() + 1);
0351         copy(fileName_.begin(), fileName_.end(), buf2.begin());
0352         buf2[buf2.size() - 1] = 0;
0353 
0354         string dirnam(dirname(&buf1[0]));
0355         string filenam(basename(&buf2[0]));
0356 
0357         string dest = inprocessDir_ + "/" + filenam;
0358 
0359         if (verbosity_)
0360           cout << "[WatcherSource " << now() << "]"
0361                << " Moving file " << fileName_ << " to " << dest << "\n";
0362 
0363         //  stringstream c;
0364         //c << "/bin/mv -f \"" << fileName_ << "\" \"" << dest
0365         // << "/.\"";
0366 
0367         if (0 != rename(fileName_.c_str(), dest.c_str())) {
0368           //if(0!=system(c.str().c_str())){
0369           throw cms::Exception("WatcherSource") << "Failed to move file '" << fileName_ << "' "
0370                                                 << "to processing directory " << inprocessDir_ << ": "
0371                                                 << strerror(errno) << " (Error no " << errno << ")";
0372         }
0373 
0374         fileName_ = dest;
0375 
0376         cout << "[WatcherSource " << now() << "]"
0377              << " Opening file " << fileName_ << "\n"
0378              << flush;
0379         streamerInputFile_ = std::make_unique<StreamerInputFile>(fileName_);
0380 
0381         ofstream f(".watcherfile");
0382         f << fileName_;
0383       } else {
0384         cout << "[WatcherSource " << now() << "]"
0385              << " Failed to open file " << fileName_ << endl;
0386       }
0387     }  //loop on file queue to find one file which opening succeeded
0388   }
0389   return streamerInputFile_.get();
0390 }
0391 
0392 void WatcherStreamFileReader::closeFile() {}
0393 
0394 void WatcherStreamFileReader::moveJustReadFile() {
0395   if (streamerInputFile_.get() == nullptr)
0396     return;
0397   //delete the streamer input file:
0398   streamerInputFile_.reset();
0399   stringstream cmd;
0400   //TODO: validation of processDir
0401   //cmd << "/bin/mv -f \"" << fileName_ << "\" \"" << processedDir_ << "/.\"";
0402   //if(verbosity_) cout << "[WatcherSource " << now() << "]"
0403   //<< " Executing (in closeFile())" << cmd.str() << "\n";
0404   //int i = system(cmd.str().c_str());
0405   //cout << "move command done at " << now() << "\n";
0406   vector<char> buf(fileName_.size() + 1);
0407   copy(fileName_.begin(), fileName_.end(), buf.begin());
0408   buf[buf.size() - 1] = 0;
0409   string dest = processedDir_ + "/" + basename(&buf[0]);
0410   if (verbosity_)
0411     cout << "[WatcherSource " << now() << "]"
0412          << " Moving " << fileName_ << " to " << dest << "... ";
0413   int i = rename(fileName_.c_str(), dest.c_str());
0414   if (i != 0) {
0415     throw cms::Exception("WatcherSource")
0416         << "Failed to move processed file '" << fileName_ << "'"
0417         << " to processed directory '" << processedDir_ << ": " << strerror(errno) << " (Error no " << errno << ")";
0418 
0419     //Stop further processing to prevent endless loop:
0420     end_ = true;
0421   }
0422   if (verbosity_)
0423     cout << "Done at " << now() << "\n";
0424 
0425   //  cout << flush;
0426 }