File indexing completed on 2024-09-07 04:34:51
0001 #include "IOPool/Streamer/interface/MsgTools.h"
0002 #include "IOPool/Streamer/interface/StreamerInputFile.h"
0003 #include "CalibCalorimetry/EcalLaserSorting/interface/WatcherStreamFileReader.h"
0004 #include "FWCore/Utilities/interface/Exception.h"
0005
0006 #include <cerrno>
0007 #include <climits>
0008 #include <cstdio>
0009 #include <cstdlib>
0010 #include <cstring>
0011 #include <fcntl.h>
0012 #include <fstream>
0013 #include <libgen.h>
0014 #include <memory>
0015 #include <sys/stat.h>
0016 #include <sys/time.h>
0017 #include <sys/types.h>
0018 #include <unistd.h>
0019
0020
0021 using namespace std;
0022 using namespace edm::streamer;
0023
0024
0025
0026 #if !defined(__linux__) && !(defined(__APPLE__) && __DARWIN_C_LEVEL >= 200809L)
0027
0028
0029 #ifndef SIZE_MAX
0030 #define SIZE_MAX ((size_t) - 1)
0031 #endif
0032 #ifndef SSIZE_MAX
0033 #define SSIZE_MAX ((ssize_t)(SIZE_MAX / 2))
0034 #endif
0035 namespace {
0036 ssize_t getline(char** lineptr, size_t* n, FILE* fp) {
0037 ssize_t result = -1;
0038 size_t cur_len = 0;
0039
0040 if (lineptr == NULL || n == NULL || fp == NULL) {
0041 errno = EINVAL;
0042 return -1;
0043 }
0044
0045 if (*lineptr == NULL || *n == 0) {
0046 *n = 120;
0047 *lineptr = (char*)malloc(*n);
0048 if (*lineptr == NULL) {
0049 result = -1;
0050 goto end;
0051 }
0052 }
0053
0054 for (;;) {
0055 int i;
0056
0057 i = getc(fp);
0058 if (i == EOF) {
0059 result = -1;
0060 break;
0061 }
0062
0063
0064 if (cur_len + 1 >= *n) {
0065 size_t needed_max = SSIZE_MAX < SIZE_MAX ? (size_t)SSIZE_MAX + 1 : SIZE_MAX;
0066 size_t needed = 2 * *n + 1;
0067 char* new_lineptr;
0068
0069 if (needed_max < needed)
0070 needed = needed_max;
0071 if (cur_len + 1 >= needed) {
0072 result = -1;
0073 goto end;
0074 }
0075
0076 new_lineptr = (char*)realloc(*lineptr, needed);
0077 if (new_lineptr == NULL) {
0078 result = -1;
0079 goto end;
0080 }
0081
0082 *lineptr = new_lineptr;
0083 *n = needed;
0084 }
0085
0086 (*lineptr)[cur_len] = i;
0087 cur_len++;
0088
0089 if (i == '\n')
0090 break;
0091 }
0092 (*lineptr)[cur_len] = '\0';
0093 result = cur_len ? (ssize_t)cur_len : result;
0094
0095 end:
0096 return result;
0097 }
0098 }
0099 #endif
0100
0101 static std::string now() {
0102 struct timeval t;
0103 gettimeofday(&t, nullptr);
0104
0105 char buf[256];
0106 strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
0107 buf[sizeof(buf) - 1] = 0;
0108
0109 stringstream buf2;
0110 buf2 << buf << " " << ((t.tv_usec + 500) / 1000) << " ms";
0111
0112 return buf2.str();
0113 }
0114
0115 WatcherStreamFileReader::WatcherStreamFileReader(edm::ParameterSet const& pset)
0116 : inputDir_(pset.getParameter<std::string>("inputDir")),
0117 filePatterns_(pset.getParameter<std::vector<std::string> >("filePatterns")),
0118 inprocessDir_(pset.getParameter<std::string>("inprocessDir")),
0119 processedDir_(pset.getParameter<std::string>("processedDir")),
0120 corruptedDir_(pset.getParameter<std::string>("corruptedDir")),
0121 tokenFile_(pset.getUntrackedParameter<std::string>("tokenFile", "watcherSourceToken")),
0122 timeOut_(pset.getParameter<int>("timeOutInSec")),
0123 end_(false),
0124 verbosity_(pset.getUntrackedParameter<int>("verbosity", 0)) {
0125 struct stat buf;
0126 if (stat(tokenFile_.c_str(), &buf)) {
0127 FILE* f = fopen(tokenFile_.c_str(), "w");
0128 if (f) {
0129 fclose(f);
0130 } else {
0131 throw cms::Exception("WatcherSource") << "Failed to create token file.";
0132 }
0133 }
0134 vector<string> dirs;
0135 dirs.push_back(inprocessDir_);
0136 dirs.push_back(processedDir_);
0137 dirs.push_back(corruptedDir_);
0138
0139 for (unsigned i = 0; i < dirs.size(); ++i) {
0140 const string& dir = dirs[i];
0141 struct stat fileStat;
0142 if (0 == stat(dir.c_str(), &fileStat)) {
0143 if (!S_ISDIR(fileStat.st_mode)) {
0144 throw cms::Exception("[WatcherSource]") << "File " << dir << " exists but is not a directory "
0145 << " as expected.";
0146 }
0147 } else {
0148 if (0 != mkdir(dir.c_str(), 0755)) {
0149 throw cms::Exception("[WatcherSource]") << "Failed to create directory " << dir << " for writing data.";
0150 }
0151 }
0152 }
0153 }
0154
0155 WatcherStreamFileReader::~WatcherStreamFileReader() {}
0156
0157 const bool WatcherStreamFileReader::newHeader() { return getInputFile() != nullptr; }
0158
0159 const InitMsgView* WatcherStreamFileReader::getHeader() {
0160 StreamerInputFile* inputFile = getInputFile();
0161
0162
0163 if (inputFile == nullptr) {
0164 throw cms::Exception("WatcherSource") << "No input file found.";
0165 }
0166
0167 const InitMsgView* header = inputFile->startMessage();
0168
0169 if (header->code() != Header::INIT)
0170 throw cms::Exception("readHeader", "WatcherStreamFileReader")
0171 << "received wrong message type: expected INIT, got " << header->code() << "\n";
0172
0173 return header;
0174 }
0175
0176 const EventMsgView* WatcherStreamFileReader::getNextEvent() {
0177 if (end_) {
0178 moveJustReadFile();
0179 return nullptr;
0180 }
0181
0182 StreamerInputFile* inputFile;
0183 if ((inputFile = getInputFile()) != nullptr and inputFile->next() == StreamerInputFile::Next::kStop) {
0184 moveJustReadFile();
0185 return nullptr;
0186 }
0187
0188 return inputFile == nullptr ? nullptr : inputFile->currentRecord();
0189 }
0190
0191 StreamerInputFile* WatcherStreamFileReader::getInputFile() {
0192 char* lineptr = nullptr;
0193 size_t n = 0;
0194 static stringstream cmd;
0195 static bool cmdSet = false;
0196 static char curDir[PATH_MAX > 0 ? PATH_MAX : 4096];
0197
0198 if (!cmdSet) {
0199 cmd.str("");
0200
0201
0202
0203
0204
0205 cmd << "/bin/find " << inputDir_ << " -maxdepth 2 -print | egrep '(";
0206
0207 if (filePatterns_.empty())
0208 return nullptr;
0209 if (getcwd(curDir, sizeof(curDir)) == nullptr) {
0210 throw cms::Exception("WatcherSource") << "Failed to retreived working directory path: " << strerror(errno);
0211 }
0212
0213 for (unsigned i = 0; i < filePatterns_.size(); ++i) {
0214 if (i > 0)
0215 cmd << "|";
0216
0217
0218
0219 cmd << filePatterns_[i];
0220 }
0221 cmd << ")' | sort";
0222
0223 cout << "[WatcherSource " << now() << "]"
0224 << " Command to retrieve input files: " << cmd.str() << "\n";
0225 cmdSet = true;
0226 }
0227
0228 struct stat buf;
0229
0230 if (stat(tokenFile_.c_str(), &buf) != 0) {
0231 end_ = true;
0232 }
0233
0234 bool waiting = false;
0235 static bool firstWait = true;
0236 timeval waitStart;
0237
0238 if (!end_ && streamerInputFile_.get() == nullptr) {
0239 fileName_.assign("");
0240
0241
0242 while (filesInQueue_.empty()) {
0243 if (stat(tokenFile_.c_str(), &buf) != 0) {
0244 end_ = true;
0245 break;
0246 }
0247 FILE* s = popen(cmd.str().c_str(), "r");
0248 if (s == nullptr) {
0249 throw cms::Exception("WatcherSource") << "Failed to retrieve list of input file: " << strerror(errno);
0250 }
0251
0252 ssize_t len;
0253 while (!feof(s)) {
0254 if ((len = getline(&lineptr, &n, s)) > 0) {
0255
0256 lineptr[len - 1] = 0;
0257 string fileName;
0258 if (lineptr[0] != '/') {
0259 if (!inputDir_.empty() && inputDir_[0] != '/') {
0260 fileName.assign(curDir);
0261 fileName.append("/");
0262 fileName.append(inputDir_);
0263 } else {
0264 fileName.assign(inputDir_);
0265 }
0266 fileName.append("/");
0267 }
0268 fileName.append(lineptr);
0269 filesInQueue_.push_back(fileName);
0270 if (verbosity_)
0271 cout << "[WatcherSource " << now() << "]"
0272 << " File to process: '" << fileName << "'\n";
0273 }
0274 }
0275 while (!feof(s))
0276 fgetc(s);
0277 pclose(s);
0278 if (filesInQueue_.empty()) {
0279 if (!waiting) {
0280 cout << "[WatcherSource " << now() << "]"
0281 << " No file found. Waiting for new file...\n";
0282 cout << flush;
0283 waiting = true;
0284 gettimeofday(&waitStart, nullptr);
0285 } else if (!firstWait) {
0286 timeval t;
0287 gettimeofday(&t, nullptr);
0288 float dt = (t.tv_sec - waitStart.tv_sec) * 1. + (t.tv_usec - waitStart.tv_usec) * 1.e-6;
0289 if ((timeOut_ >= 0) && (dt > timeOut_)) {
0290 cout << "[WatcherSource " << now() << "]"
0291 << " Having waited for new file for " << (int)dt << " sec. "
0292 << "Timeout exceeded. Exits.\n";
0293
0294 end_ = true;
0295 break;
0296 }
0297 }
0298 }
0299 sleep(1);
0300 }
0301 firstWait = false;
0302 free(lineptr);
0303 lineptr = nullptr;
0304
0305 while (streamerInputFile_.get() == nullptr && !filesInQueue_.empty()) {
0306 fileName_ = filesInQueue_.front();
0307 filesInQueue_.pop_front();
0308 int fd = open(fileName_.c_str(), 0);
0309 if (fd != 0) {
0310 struct stat buf;
0311 off_t size = -1;
0312
0313 time_t t = time(nullptr);
0314 for (;;) {
0315 fstat(fd, &buf);
0316 if (verbosity_)
0317 cout << "file size: " << buf.st_size << ", prev size: " << size << "\n";
0318 if (buf.st_size == size)
0319 break;
0320 else
0321 size = buf.st_size;
0322 if (difftime(t, buf.st_mtime) > 60)
0323 break;
0324 sleep(1);
0325 }
0326
0327 if (fd != 0 && buf.st_size == 0) {
0328
0329 stringstream c;
0330 c << "/bin/mv -f \"" << fileName_ << "\" \"" << corruptedDir_ << "/.\"";
0331 if (verbosity_)
0332 cout << "[WatcherSource " << now() << "]"
0333 << " Executing " << c.str() << "\n";
0334 int i = system(c.str().c_str());
0335 if (i != 0) {
0336
0337 cout << "[WatcherSource " << now() << "] "
0338 << "Failed to move empty file '" << fileName_ << "'"
0339 << " to corrupted directory '" << corruptedDir_ << "'\n";
0340 }
0341 continue;
0342 }
0343
0344 close(fd);
0345
0346 vector<char> buf1(fileName_.size() + 1);
0347 copy(fileName_.begin(), fileName_.end(), buf1.begin());
0348 buf1[buf1.size() - 1] = 0;
0349
0350 vector<char> buf2(fileName_.size() + 1);
0351 copy(fileName_.begin(), fileName_.end(), buf2.begin());
0352 buf2[buf2.size() - 1] = 0;
0353
0354 string dirnam(dirname(&buf1[0]));
0355 string filenam(basename(&buf2[0]));
0356
0357 string dest = inprocessDir_ + "/" + filenam;
0358
0359 if (verbosity_)
0360 cout << "[WatcherSource " << now() << "]"
0361 << " Moving file " << fileName_ << " to " << dest << "\n";
0362
0363
0364
0365
0366
0367 if (0 != rename(fileName_.c_str(), dest.c_str())) {
0368
0369 throw cms::Exception("WatcherSource") << "Failed to move file '" << fileName_ << "' "
0370 << "to processing directory " << inprocessDir_ << ": "
0371 << strerror(errno) << " (Error no " << errno << ")";
0372 }
0373
0374 fileName_ = dest;
0375
0376 cout << "[WatcherSource " << now() << "]"
0377 << " Opening file " << fileName_ << "\n"
0378 << flush;
0379 streamerInputFile_ = std::make_unique<StreamerInputFile>(fileName_);
0380
0381 ofstream f(".watcherfile");
0382 f << fileName_;
0383 } else {
0384 cout << "[WatcherSource " << now() << "]"
0385 << " Failed to open file " << fileName_ << endl;
0386 }
0387 }
0388 }
0389 return streamerInputFile_.get();
0390 }
0391
0392 void WatcherStreamFileReader::closeFile() {}
0393
0394 void WatcherStreamFileReader::moveJustReadFile() {
0395 if (streamerInputFile_.get() == nullptr)
0396 return;
0397
0398 streamerInputFile_.reset();
0399 stringstream cmd;
0400
0401
0402
0403
0404
0405
0406 vector<char> buf(fileName_.size() + 1);
0407 copy(fileName_.begin(), fileName_.end(), buf.begin());
0408 buf[buf.size() - 1] = 0;
0409 string dest = processedDir_ + "/" + basename(&buf[0]);
0410 if (verbosity_)
0411 cout << "[WatcherSource " << now() << "]"
0412 << " Moving " << fileName_ << " to " << dest << "... ";
0413 int i = rename(fileName_.c_str(), dest.c_str());
0414 if (i != 0) {
0415 throw cms::Exception("WatcherSource")
0416 << "Failed to move processed file '" << fileName_ << "'"
0417 << " to processed directory '" << processedDir_ << ": " << strerror(errno) << " (Error no " << errno << ")";
0418
0419
0420 end_ = true;
0421 }
0422 if (verbosity_)
0423 cout << "Done at " << now() << "\n";
0424
0425
0426 }