File indexing completed on 2023-03-17 10:41:35
0001 #include "IOPool/Streamer/interface/MsgTools.h"
0002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0003 #include "CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h"
0004 #include "FWCore/Utilities/interface/Exception.h"
0005
0006 #include <cerrno>
0007 #include <climits>
0008 #include <cstdio>
0009 #include <cstdlib>
0010 #include <cstring>
0011 #include <fcntl.h>
0012 #include <fstream>
0013 #include <libgen.h>
0014 #include <memory>
0015 #include <sys/stat.h>
0016 #include <sys/time.h>
0017 #include <sys/types.h>
0018 #include <unistd.h>
0019
0020
0021 using namespace std;
0022
0023
0024
0025 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
0026
0027
0028 #ifndef SIZE_MAX
0029 #define SIZE_MAX ((size_t)-1)
0030 #endif
0031 #ifndef SSIZE_MAX
0032 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2))
0033 #endif
0034 namespace {
0035 ssize_t getline(char** lineptr, size_t* n, FILE* fp) {
0036 ssize_t result = -1;
0037 size_t cur_len = 0;
0038
0039 if (lineptr == NULL || n == NULL || fp == NULL) {
0040 errno = EINVAL;
0041 return -1;
0042 }
0043
0044 if (*lineptr == NULL || *n == 0) {
0045 *n = 120;
0046 *lineptr = (char*)malloc(*n);
0047 if (*lineptr == NULL) {
0048 result = -1;
0049 goto end;
0050 }
0051 }
0052
0053 for (;;) {
0054 int i;
0055
0056 i = getc(fp);
0057 if (i == EOF) {
0058 result = -1;
0059 break;
0060 }
0061
0062
0063 if (cur_len + 1 >= *n) {
0064 size_t needed_max = SSIZE_MAX < SIZE_MAX ? (size_t)SSIZE_MAX + 1 : SIZE_MAX;
0065 size_t needed = 2 * *n + 1;
0066 char* new_lineptr;
0067
0068 if (needed_max < needed)
0069 needed = needed_max;
0070 if (cur_len + 1 >= needed) {
0071 result = -1;
0072 goto end;
0073 }
0074
0075 new_lineptr = (char*)realloc(*lineptr, needed);
0076 if (new_lineptr == NULL) {
0077 result = -1;
0078 goto end;
0079 }
0080
0081 *lineptr = new_lineptr;
0082 *n = needed;
0083 }
0084
0085 (*lineptr)[cur_len] = i;
0086 cur_len++;
0087
0088 if (i == '\n')
0089 break;
0090 }
0091 (*lineptr)[cur_len] = '\0';
0092 result = cur_len ? (ssize_t)cur_len : result;
0093
0094 end:
0095 return result;
0096 }
0097 }
0098 #endif
0099
0100 static std::string now() {
0101 struct timeval t;
0102 gettimeofday(&t, nullptr);
0103
0104 char buf[256];
0105 strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
0106 buf[sizeof(buf) - 1] = 0;
0107
0108 stringstream buf2;
0109 buf2 << buf << " " << ((t.tv_usec + 500) / 1000) << " ms";
0110
0111 return buf2.str();
0112 }
0113
0114 WatcherStreamFileReader::WatcherStreamFileReader(edm::ParameterSet const& pset)
0115 : inputDir_(pset.getParameter<std::string>("inputDir")),
0116 filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
0117 inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
0118 processedDir_(pset.getParameter<std::string>("processedDir")),
0119 corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
0120 tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile", "watcherSourceToken")),
0121 timeOut_(pset.getParameter<int>("timeOutInSec")),
0122 end_(false),
0123 verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)) {
0124 struct stat buf;
0125 if (stat(tokenFile_.c_str(), &buf)) {
0126 FILE* f = fopen(tokenFile_.c_str(), "w");
0127 if (f) {
0128 fclose(f);
0129 } else {
0130 throw cms::Exception("WatcherSource") << "Failed to create token file.";
0131 }
0132 }
0133 vector<string> dirs;
0134 dirs.push_back(inprocessDir_);
0135 dirs.push_back(processedDir_);
0136 dirs.push_back(corruptedDir_);
0137
0138 for (unsigned i = 0; i < dirs.size(); ++i) {
0139 const string& dir = dirs[i];
0140 struct stat fileStat;
0141 if (0 == stat(dir.c_str(), &fileStat)) {
0142 if (!S_ISDIR(fileStat.st_mode)) {
0143 throw cms::Exception("[WatcherSource]") << "File " << dir << " exists but is not a directory "
0144 << " as expected.";
0145 }
0146 } else {
0147 if (0 != mkdir(dir.c_str(), 0755)) {
0148 throw cms::Exception("[WatcherSource]") << "Failed to create directory " << dir << " for writing data.";
0149 }
0150 }
0151 }
0152 }
0153
0154 WatcherStreamFileReader::~WatcherStreamFileReader() {}
0155
0156 const bool WatcherStreamFileReader::newHeader() {
0157 edm::StreamerInputFile* inputFile = getInputFile();
0158 return inputFile ? inputFile->newHeader() : false;
0159 }
0160
0161 const InitMsgView* WatcherStreamFileReader::getHeader() {
0162 edm::StreamerInputFile* inputFile = getInputFile();
0163
0164
0165 if (inputFile == nullptr) {
0166 throw cms::Exception("WatcherSource") << "No input file found.";
0167 }
0168
0169 const InitMsgView* header = inputFile->startMessage();
0170
0171 if (header->code() != Header::INIT)
0172 throw cms::Exception("readHeader", "WatcherStreamFileReader")
0173 << "received wrong message type: expected INIT, got " << header->code() << "\n";
0174
0175 return header;
0176 }
0177
0178 const EventMsgView* WatcherStreamFileReader::getNextEvent() {
0179 if (end_) {
0180 closeFile();
0181 return nullptr;
0182 }
0183
0184 edm::StreamerInputFile* inputFile;
0185
0186
0187 while ((inputFile = getInputFile()) != nullptr && inputFile->next() != edm::StreamerInputFile::Next::kEvent) {
0188 closeFile();
0189 }
0190
0191 return inputFile == nullptr ? nullptr : inputFile->currentRecord();
0192 }
0193
0194 edm::StreamerInputFile* WatcherStreamFileReader::getInputFile() {
0195 char* lineptr = nullptr;
0196 size_t n = 0;
0197 static stringstream cmd;
0198 static bool cmdSet = false;
0199 static char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
0200
0201 if (!cmdSet) {
0202 cmd.str("");
0203
0204
0205
0206
0207
0208 cmd << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
0209
0210 if (filePatterns_.empty())
0211 return nullptr;
0212 if (getcwd(curDir, sizeof(curDir)) == nullptr) {
0213 throw cms::Exception("WatcherSource") << "Failed to retreived working directory path: " << strerror(errno);
0214 }
0215
0216 for (unsigned i = 0; i < filePatterns_.size(); ++i) {
0217 if (i > 0)
0218 cmd << "|";
0219
0220
0221
0222 cmd << filePatterns_[i];
0223 }
0224 cmd << ")' | sort";
0225
0226 cout << "[WatcherSource " << now() << "]"
0227 << " Command to retrieve input files: " << cmd.str() << "\n";
0228 cmdSet = true;
0229 }
0230
0231 struct stat buf;
0232
0233 if (stat(tokenFile_.c_str(), &buf) != 0) {
0234 end_ = true;
0235 }
0236
0237 bool waiting = false;
0238 static bool firstWait = true;
0239 timeval waitStart;
0240
0241 if (!end_ && streamerInputFile_.get() == nullptr) {
0242 fileName_.assign("");
0243
0244
0245 while (filesInQueue_.empty()) {
0246 if (stat(tokenFile_.c_str(), &buf) != 0) {
0247 end_ = true;
0248 break;
0249 }
0250 FILE* s = popen(cmd.str().c_str(), "r");
0251 if (s == nullptr) {
0252 throw cms::Exception("WatcherSource") << "Failed to retrieve list of input file: " << strerror(errno);
0253 }
0254
0255 ssize_t len;
0256 while (!feof(s)) {
0257 if ((len = getline(&lineptr, &n, s)) > 0) {
0258
0259 lineptr[len - 1] = 0;
0260 string fileName;
0261 if (lineptr[0] != '/') {
0262 if (!inputDir_.empty() && inputDir_[0] != '/') {
0263 fileName.assign(curDir);
0264 fileName.append("/");
0265 fileName.append(inputDir_);
0266 } else {
0267 fileName.assign(inputDir_);
0268 }
0269 fileName.append("/");
0270 }
0271 fileName.append(lineptr);
0272 filesInQueue_.push_back(fileName);
0273 if (verbosity_)
0274 cout << "[WatcherSource " << now() << "]"
0275 << " File to process: '" << fileName << "'\n";
0276 }
0277 }
0278 while (!feof(s))
0279 fgetc(s);
0280 pclose(s);
0281 if (filesInQueue_.empty()) {
0282 if (!waiting) {
0283 cout << "[WatcherSource " << now() << "]"
0284 << " No file found. Waiting for new file...\n";
0285 cout << flush;
0286 waiting = true;
0287 gettimeofday(&waitStart, nullptr);
0288 } else if (!firstWait) {
0289 timeval t;
0290 gettimeofday(&t, nullptr);
0291 float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
0292 if ((timeOut_ >= 0) && (dt > timeOut_)) {
0293 cout << "[WatcherSource " << now() << "]"
0294 << " Having waited for new file for " << (int)dt << " sec. "
0295 << "Timeout exceeded. Exits.\n";
0296
0297 end_ = true;
0298 break;
0299 }
0300 }
0301 }
0302 sleep(1);
0303 }
0304 firstWait = false;
0305 free(lineptr);
0306 lineptr = nullptr;
0307
0308 while (streamerInputFile_.get() == nullptr && !filesInQueue_.empty()) {
0309 fileName_ = filesInQueue_.front();
0310 filesInQueue_.pop_front();
0311 int fd = open(fileName_.c_str(), 0);
0312 if (fd != 0) {
0313 struct stat buf;
0314 off_t size = -1;
0315
0316 time_t t = time(nullptr);
0317 for (;;) {
0318 fstat(fd, &buf);
0319 if (verbosity_)
0320 cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
0321 if (buf.st_size == size)
0322 break;
0323 else
0324 size = buf.st_size;
0325 if (difftime(t, buf.st_mtime) > 60)
0326 break;
0327 sleep(1);
0328 }
0329
0330 if (fd != 0 && buf.st_size == 0) {
0331
0332 stringstream c;
0333 c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_ << "/.\"";
0334 if (verbosity_)
0335 cout << "[WatcherSource " << now() << "]"
0336 << " Executing " << c.str() << "\n";
0337 int i = system(c.str().c_str());
0338 if (i != 0) {
0339
0340 cout << "[WatcherSource " << now() << "] "
0341 << "Failed to move empty file '" << fileName_ << "'"
0342 << " to corrupted directory '" << corruptedDir_ << "'\n";
0343 }
0344 continue;
0345 }
0346
0347 close(fd);
0348
0349 vector<char> buf1(fileName_.size() + 1);
0350 copy(fileName_.begin(), fileName_.end(), buf1.begin());
0351 buf1[buf1.size() - 1] = 0;
0352
0353 vector<char> buf2(fileName_.size() + 1);
0354 copy(fileName_.begin(), fileName_.end(), buf2.begin());
0355 buf2[buf2.size() - 1] = 0;
0356
0357 string dirnam(dirname(&buf1[0]));
0358 string filenam(basename(&buf2[0]));
0359
0360 string dest = inprocessDir_ + "/" + filenam;
0361
0362 if (verbosity_)
0363 cout << "[WatcherSource " << now() << "]"
0364 << " Moving file " << fileName_ << " to " << dest << "\n";
0365
0366
0367
0368
0369
0370 if (0 != rename(fileName_.c_str(), dest.c_str())) {
0371
0372 throw cms::Exception("WatcherSource") << "Failed to move file '" << fileName_ << "' "
0373 << "to processing directory " << inprocessDir_ << ": "
0374 << strerror(errno) << " (Error no " << errno << ")";
0375 }
0376
0377 fileName_ = dest;
0378
0379 cout << "[WatcherSource " << now() << "]"
0380 << " Opening file " << fileName_ << "\n"
0381 << flush;
0382 streamerInputFile_ = std::make_unique<edm::StreamerInputFile>(fileName_);
0383
0384 ofstream f(".watcherfile");
0385 f << fileName_;
0386 } else {
0387 cout << "[WatcherSource " << now() << "]"
0388 << " Failed to open file " << fileName_ << endl;
0389 }
0390 }
0391 }
0392 return streamerInputFile_.get();
0393 }
0394
0395 void WatcherStreamFileReader::closeFile() {
0396 if (streamerInputFile_.get() == nullptr)
0397 return;
0398
0399 streamerInputFile_.reset();
0400 stringstream cmd;
0401
0402
0403
0404
0405
0406
0407 vector<char> buf(fileName_.size() + 1);
0408 copy(fileName_.begin(), fileName_.end(), buf.begin());
0409 buf[buf.size() - 1] = 0;
0410 string dest = processedDir_ + "/" + basename(&buf[0]);
0411 if (verbosity_)
0412 cout << "[WatcherSource " << now() << "]"
0413 << " Moving " << fileName_ << " to " << dest << "... ";
0414 int i = rename(fileName_.c_str(), dest.c_str());
0415 if (i != 0) {
0416 throw cms::Exception("WatcherSource")
0417 << "Failed to move processed file '" << fileName_ << "'"
0418 << " to processed directory '" << processedDir_ << ": " << strerror(errno) << " (Error no " << errno << ")";
0419
0420
0421 end_ = true;
0422 }
0423 if (verbosity_)
0424 cout << "Done at " << now() << "\n";
0425
0426
0427 }