Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:10:34

0001 //#define USE_STORAGE_MANAGER
0002 
0003 #ifdef USE_STORAGE_MANAGER
0004 #include "Utilities/StorageFactory/interface/Storage.h"
0005 #include "Utilities/StorageFactory/interface/StorageFactory.h"
0006 #else  //USE_STORAGE_MANAGER not defined
0007 #ifndef _LARGEFILE64_SOURCE
0008 #define _LARGEFILE64_SOURCE
0009 #endif  //_LARGEFILE64_SOURCE not defined
0010 #define _FILE_OFFSET_BITS 64
0011 #include <cstdio>
0012 #endif  //USE_STORAGE_MANAGER defined
0013 
0014 #include "FWCore/Framework/interface/one/EDProducer.h"
0015 #include "FWCore/Utilities/interface/InputTag.h"
0016 #include "FWCore/Utilities/interface/EDGetToken.h"
0017 #include "EventFilter/EcalRawToDigi/interface/MatacqRawEvent.h"
0018 #include "EventFilter/EcalRawToDigi/src/MatacqDataFormatter.h"
0019 #include "FWCore/MessageLogger/interface/MessageLogger.h"
0020 #include "DataFormats/FEDRawData/interface/FEDRawDataCollection.h"
0021 
0022 #include <string>
0023 #include <cinttypes>
0024 #include <fstream>
0025 #include <memory>
0026 
0027 #include <sys/time.h>
0028 
0029 struct NullOut {
0030   NullOut& operator<<(std::ostream& (*pf)(std::ostream&)) { return *this; }
0031   template <typename T>
0032   inline NullOut& operator<<(const T& a) {
0033     return *this;
0034   }
0035 };
0036 
0037 class MatacqProducer : public edm::one::EDProducer<> {
0038 public:
0039   enum calibTrigType_t { laserType = 4, ledType = 5, tpType = 6, pedType = 7 };
0040 
0041 private:
0042 #ifdef USE_STORAGE_MANAGER
0043   typedef IOOffset filepos_t;
0044   typedef std::unique_ptr<Storage> FILE_t;
0045 #else
0046   typedef off_t filepos_t;
0047   typedef FILE* FILE_t;
0048 #endif
0049   struct MatacqEventId {
0050     MatacqEventId() : run(0), orbit(0) {}
0051     MatacqEventId(uint32_t r, uint32_t o) : run(r), orbit(o) {}
0052 
0053     /** Run number
0054      */
0055     uint32_t run;
0056 
0057     /** Orbit id
0058      */
0059     uint32_t orbit;
0060 
0061     bool operator<(const MatacqEventId& a) {
0062       return (this->run < a.run) || ((this->run == a.run) && (this->orbit < a.orbit));
0063     }
0064 
0065     bool operator>(const MatacqEventId& a) {
0066       return (this->run > a.run) || ((this->run == a.run) && (this->orbit > a.orbit));
0067     }
0068 
0069     bool operator==(const MatacqEventId& a) { return !((*this) < a || (*this) > a); }
0070   };
0071 
0072   /** Estimates matacq event position in a file from its orbit id. This
0073    * estimator requires that every event in the file has the same length. A
0074    * linear extrapolation of pos=f(orbit) function from first and last event
0075    * is performed. It gives only a rough estimate, relevant only to initiliaze
0076    * the event search.
0077    */
0078   class PosEstimator {
0079     //Note: a better estimate could be obtained by using segment of linear
0080     //functions. In such implementation, the estimator must be updated
0081     //each time a point with wrong estimate has been found.
0082   public:
0083     PosEstimator() : eventLength_(0), orbitStepMean_(0), firstOrbit_(0), invalid_(true), verbosity_(0) {}
0084     void init(MatacqProducer* mp);
0085     bool invalid() const { return invalid_; }
0086     int64_t pos(int orb) const;
0087     int eventLength() const { return eventLength_; }
0088     int firstOrbit() const { return firstOrbit_; }
0089     void verbosity(int verb) { verbosity_ = verb; }
0090 
0091   private:
0092     int eventLength_;
0093     int orbitStepMean_;
0094     int firstOrbit_;
0095     bool invalid_;
0096     int verbosity_;
0097   };
0098 
0099 public:
0100   /** Constructor
0101    * @param params seletive readout parameters
0102    */
0103   explicit MatacqProducer(const edm::ParameterSet& params);
0104 
0105   /** Destructor
0106    */
0107   ~MatacqProducer() override;
0108 
0109   /** Produces the EDM products
0110    * @param CMS event
0111    * @param eventSetup event conditions
0112    */
0113   void produce(edm::Event& event, const edm::EventSetup& eventSetup) override;
0114 
0115 private:
0116   /** Add matacq digi to the event
0117    * @param event the event
0118    * @param digiInstanceName_ name to give to the matacq digi instance
0119    */
0120   void addMatacqData(edm::Event& event);
0121 
0122   /** Retrieve the file containing a given matacq event
0123    * @param runNumber Number of the run the matacq event is looking from
0124    * @param orbitId Id of the orbit of the matacq event
0125    * @param fileChange if not null pointer, set to true if the file changed.
0126    * @return true if file retrieval succeeded, false otherwise.
0127    * found.
0128    */
0129   bool getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool* fileChange = nullptr);
0130 
0131   bool getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange);
0132   /*,bool doWrap = false, std::streamoff maxPos = -1);*/
0133 
0134   uint32_t getRunNumber(edm::Event& ev) const;
0135   uint32_t getOrbitId(edm::Event& ev) const;
0136 
0137   bool getOrbitRange(uint32_t& firstOrb, uint32_t& lastOrb);
0138 
0139   int getCalibTriggerType(edm::Event& ev) const;
0140 
0141   /** Loading orbit correction table from file. @see orbitOffsetFile_
0142    */
0143   void loadOrbitOffset();
0144 
0145   /** Move input file read pointer. On failure file is rewind.
0146    * @param buf buffer to store read data
0147    * @param n   size of data block
0148    * @param mess text to insert in the eventual error message.
0149    * @return true on success, false on failure
0150    */
0151   bool mseek(filepos_t offset, int whence = SEEK_SET, const char* mess = nullptr);
0152 
0153   bool mtell(filepos_t& pos);
0154 
0155   /** Read a data block from input file. On failure file position is restored
0156    * and if position restoring fails, file is rewind.
0157    * @param buf buffer to store read data
0158    * @param n   size of data block
0159    * @param mess text to insert in the eventual error message.
0160    * @param peek if true file position is restored after the data read
0161    * @return true on success, false on failure
0162    */
0163   bool mread(char* buf, size_t n, const char* mess = nullptr, bool peek = false);
0164 
0165   bool mcheck(const std::string& name);
0166 
0167   bool mopen(const std::string& name);
0168 
0169   void mclose();
0170 
0171   bool misOpened();
0172 
0173   bool meof();
0174 
0175   bool mrewind();
0176 
0177   bool msize(filepos_t& s);
0178 
0179   void newRun(int prevRun, int newRun);
0180 
0181   static std::string runSubDir(uint32_t runNumber);
0182 
0183 private:
0184   std::vector<std::string> fileNames_;
0185 
0186   /** Instance name to use for the produced Matacq digi collection
0187    */
0188   std::string digiInstanceName_;
0189 
0190   /** Instance name to use for the produced Matacq raw data collection
0191    */
0192   std::string rawInstanceName_;
0193 
0194   /** Parameter to switch module timing.
0195    */
0196   bool timing_;
0197 
0198   /** Parameter to disable matacq data production. For timing purpose.
0199    */
0200   bool disabled_;
0201 
0202   /** Verbosity level
0203    */
0204   int verbosity_;
0205 
0206   /** Swictch for Matacq digi producion
0207    */
0208   bool produceDigis_;
0209 
0210   /** Switch for Matacq FED raw data production
0211    */
0212   bool produceRaw_;
0213 
0214   /** Name of the raw data collection the Matacq data must be merge to
0215    * if merging is enabled.
0216    */
0217   edm::InputTag inputRawCollection_;
0218 
0219   /** EDM token to access the raw data collection the Matacq data must be merge to
0220    * if merging is enabled.
0221    */
0222   edm::EDGetTokenT<FEDRawDataCollection> inputRawCollectionToken_;
0223 
0224   /** Switch for merging Matacq raw data with existing raw data
0225    * collection.
0226    */
0227   bool mergeRaw_;
0228 
0229   /** When true look for matacq data independently of trigger type.
0230    */
0231   bool ignoreTriggerType_;
0232 
0233   MatacqRawEvent matacq_;
0234 
0235   /** Stream of currently opened matacq file
0236    */
0237   FILE_t inFile_;
0238 
0239   static const int bufferSize = 30000;  //must greater or equal to maximum
0240   //                                      matacq event size.
0241   std::vector<unsigned char> data_;
0242   MatacqDataFormatter formatter_;
0243   const static int orbitTolerance_;
0244   uint32_t openedFileRunNumber_;
0245   int32_t lastOrb_;
0246   int fastRetrievalThresh_;
0247 
0248   PosEstimator posEstim_;
0249 
0250   timeval startTime_;
0251 
0252   /** File name of table with orbit offset between
0253    * matacq event and DCC. Used to recover data suffering from orbit
0254    * miss-synchonization
0255    */
0256   std::string orbitOffsetFile_;
0257 
0258   /** Orbit offset table. @see orbitOffsetFile_
0259    */
0260   std::map<uint32_t, uint32_t> orbitOffset_;
0261 
0262   /** Switch for orbit ID correction. @see orbitOffsetFile_
0263    */
0264   bool doOrbitOffset_;
0265 
0266   /** Name of currently opened matacq file
0267    */
0268   std::string inFileName_;
0269 
0270   static const int matacqFedId_ = 655;
0271 
0272   struct stats_t {
0273     double nEvents;
0274     double nLaserEventsWithMatacq;
0275     double nNonLaserEventsWithMatacq;
0276   } stats_;
0277 
0278   const static stats_t stats_init;
0279   /** Log file name
0280    */
0281   std::string logFileName_;
0282 
0283   /** Log file
0284    */
0285   std::ofstream logFile_;
0286 
0287   /** counter for event skipping
0288    */
0289   int eventSkipCounter_;
0290 
0291   /** Number of events to skip in case of error
0292    */
0293   int onErrorDisablingEvtCnt_;
0294 
0295   /** Name of file to log timing
0296    */
0297   std::string timeLogFile_;
0298   /** Buffer for timing
0299    */
0300   timeval timer_;
0301 
0302   /** Output stream to log code timing
0303    */
0304   std::ofstream timeLog_;
0305 
0306   /** Switch for code timing.
0307    */
0308   bool logTiming_;
0309 
0310   /** Number of the currently processed run
0311    */
0312   uint32_t runNumber_;
0313 };
0314 
0315 #include <csignal>
0316 #include <cstdio>
0317 #include <iomanip>
0318 #include <iostream>
0319 
0320 #include <glob.h>
0321 #include <sys/stat.h>
0322 #include <sys/types.h>
0323 #include <unistd.h>
0324 
0325 #include <fmt/printf.h>
0326 #include <boost/algorithm/string.hpp>
0327 
0328 #include "DataFormats/EcalDigi/interface/EcalDigiCollections.h"
0329 #include "DataFormats/EcalDigi/interface/EcalMatacqDigi.h"
0330 #include "DataFormats/FEDRawData/interface/FEDNumbering.h"
0331 #include "DataFormats/FEDRawData/interface/FEDRawData.h"
0332 #include "EventFilter/EcalRawToDigi/src/Majority.h"
0333 #include "FWCore/Framework/interface/ESHandle.h"
0334 #include "FWCore/Framework/interface/Event.h"
0335 #include "FWCore/Framework/interface/EventSetup.h"
0336 #include "FWCore/ParameterSet/interface/ParameterSet.h"
0337 #include "Geometry/Records/interface/IdealGeometryRecord.h"
0338 
0339 using namespace std;
0340 using namespace boost;
0341 using namespace edm;
0342 
0343 // #undef LogInfo
0344 // #define LogInfo(a) cout << "INFO " << a << ": "
0345 // #undef LogWarning
0346 // #define LogWarning(a) cout << "WARN " << a << ": "
0347 // #undef LogDebug
0348 // #define LogDebug(a) cout << "DBG " << a << ": "
0349 
0350 //verbose mode for matacq event retrieval debugging:
0351 //static const bool searchDbg = false;
0352 
0353 //laser freq is 1 every 112 orbit => >80 orbit
0354 const int MatacqProducer::orbitTolerance_ = 80;
0355 
0356 const MatacqProducer::stats_t MatacqProducer::stats_init = {0, 0, 0};
0357 
0358 static std::string now() {
0359   struct timeval t;
0360   gettimeofday(&t, nullptr);
0361 
0362   char buf[256];
0363   strftime(buf, sizeof(buf), "%F %R %S s", localtime(&t.tv_sec));
0364   buf[sizeof(buf) - 1] = 0;
0365 
0366   stringstream buf2;
0367   buf2 << buf << " " << ((t.tv_usec + 500) / 1000) << " ms";
0368 
0369   return buf2.str();
0370 }
0371 
0372 MatacqProducer::MatacqProducer(const edm::ParameterSet& params)
0373     : fileNames_(params.getParameter<std::vector<std::string> >("fileNames")),
0374       digiInstanceName_(params.getParameter<string>("digiInstanceName")),
0375       rawInstanceName_(params.getParameter<string>("rawInstanceName")),
0376       timing_(params.getUntrackedParameter<bool>("timing", false)),
0377       disabled_(params.getParameter<bool>("disabled")),
0378       verbosity_(params.getUntrackedParameter<int>("verbosity", 0)),
0379       produceDigis_(params.getParameter<bool>("produceDigis")),
0380       produceRaw_(params.getParameter<bool>("produceRaw")),
0381       inputRawCollection_(params.getParameter<edm::InputTag>("inputRawCollection")),
0382       mergeRaw_(params.getParameter<bool>("mergeRaw")),
0383       ignoreTriggerType_(params.getParameter<bool>("ignoreTriggerType")),
0384       matacq_(nullptr, 0),
0385       inFile_(nullptr),
0386       data_(bufferSize),
0387       openedFileRunNumber_(0),
0388       lastOrb_(0),
0389       fastRetrievalThresh_(0),
0390       orbitOffsetFile_(params.getUntrackedParameter<std::string>("orbitOffsetFile", "")),
0391       inFileName_(""),
0392       stats_(stats_init),
0393       logFileName_(params.getUntrackedParameter<std::string>("logFileName", "matacqProducer.log")),
0394       eventSkipCounter_(0),
0395       onErrorDisablingEvtCnt_(params.getParameter<int>("onErrorDisablingEvtCnt")),
0396       timeLogFile_(params.getUntrackedParameter<std::string>("timeLogFile", "")),
0397       runNumber_(0) {
0398   if (verbosity_ >= 4)
0399     cout << "[Matacq " << now() << "] in MatacqProducer ctor" << endl;
0400 
0401   gettimeofday(&timer_, nullptr);
0402 
0403   if (!timeLogFile_.empty()) {
0404     timeLog_.open(timeLogFile_.c_str());
0405     if (timeLog_.fail()) {
0406       cout << "[LaserSorter " << now() << "] "
0407            << "Failed to open file " << timeLogFile_ << " to log timing.\n";
0408       logTiming_ = false;
0409     } else {
0410       logTiming_ = true;
0411     }
0412   }
0413 
0414   posEstim_.verbosity(verbosity_);
0415 
0416   logFile_.open(logFileName_.c_str(), ios::app | ios::out);
0417 
0418   if (logFile_.bad()) {
0419     throw cms::Exception("FileOpen") << "Failed to open file " << logFileName_ << " for logging.\n";
0420   }
0421 
0422   inputRawCollectionToken_ = consumes<FEDRawDataCollection>(params.getParameter<InputTag>("inputRawCollection"));
0423 
0424   if (produceDigis_) {
0425     if (verbosity_ > 0)
0426       cout << "[Matacq " << now()
0427            << "] registering new "
0428               "EcalMatacqDigiCollection product with instance name '"
0429            << digiInstanceName_ << "'\n";
0430     produces<EcalMatacqDigiCollection>(digiInstanceName_);
0431   }
0432 
0433   if (produceRaw_) {
0434     if (verbosity_ > 0)
0435       cout << "[Matacq " << now()
0436            << "] registering new FEDRawDataCollection "
0437               "product with instance name '"
0438            << rawInstanceName_ << "'\n";
0439     produces<FEDRawDataCollection>(rawInstanceName_);
0440   }
0441 
0442   startTime_.tv_sec = startTime_.tv_usec = 0;
0443   if (!orbitOffsetFile_.empty()) {
0444     doOrbitOffset_ = true;
0445     loadOrbitOffset();
0446   } else {
0447     doOrbitOffset_ = false;
0448   }
0449   if (verbosity_ >= 4)
0450     cout << "[Matacq " << now() << "] exiting MatacqProducer ctor" << endl;
0451 }
0452 
0453 void MatacqProducer::produce(edm::Event& event, const edm::EventSetup& eventSetup) {
0454   if (verbosity_ >= 4)
0455     cout << "[Matacq " << now() << "] in MatacqProducer::produce" << endl;
0456   if (logTiming_) {
0457     timeval t;
0458     gettimeofday(&t, nullptr);
0459 
0460     timeLog_ << t.tv_sec << "." << setfill('0') << setw(3) << (t.tv_usec + 500) / 1000 << setfill(' ') << "\t"
0461              << (t.tv_usec - timer_.tv_usec) * 1. + (t.tv_sec - timer_.tv_sec) * 1.e6 << "\t";
0462     timer_ = t;
0463   }
0464 
0465   if (startTime_.tv_sec == 0)
0466     gettimeofday(&startTime_, nullptr);
0467   ++stats_.nEvents;
0468   if (disabled_)
0469     return;
0470   const uint32_t runNumber = getRunNumber(event);
0471   if (runNumber != runNumber_) {
0472     newRun(runNumber_, runNumber);
0473   }
0474   addMatacqData(event);
0475 
0476   if (logTiming_) {
0477     timeval t;
0478     gettimeofday(&t, nullptr);
0479     timeLog_ << (t.tv_usec - timer_.tv_usec) * 1. + (t.tv_sec - timer_.tv_sec) * 1.e6 << "\n";
0480     timer_ = t;
0481   }
0482 }
0483 
0484 void MatacqProducer::addMatacqData(edm::Event& event) {
0485   edm::Handle<FEDRawDataCollection> sourceColl;
0486   event.getByToken(inputRawCollectionToken_, sourceColl);
0487 
0488   std::unique_ptr<FEDRawDataCollection> rawColl;
0489   if (produceRaw_) {
0490     if (mergeRaw_) {
0491       rawColl = std::make_unique<FEDRawDataCollection>(*sourceColl);
0492     } else {
0493       rawColl = std::make_unique<FEDRawDataCollection>();
0494     }
0495   }
0496 
0497   auto digiColl = std::make_unique<EcalMatacqDigiCollection>();
0498 
0499   if (eventSkipCounter_ == 0) {
0500     if (sourceColl->FEDData(matacqFedId_).size() > 4 && !produceRaw_) {
0501       //input raw data collection already contains matacqData
0502       formatter_.interpretRawData(sourceColl->FEDData(matacqFedId_), *digiColl);
0503     } else {
0504       bool isLaserEvent = (getCalibTriggerType(event) == laserType);
0505 
0506       //      cout << "---> " << (ignoreTriggerType_?"yes":"no") << " " << getCalibTriggerType(event) << endl;
0507 
0508       if (isLaserEvent || ignoreTriggerType_) {
0509         const uint32_t runNumber = getRunNumber(event);
0510         const uint32_t orbitId = getOrbitId(event);
0511 
0512         LogInfo("Matacq") << "Run " << runNumber << "\t Orbit " << orbitId << "\n";
0513 
0514         bool fileChange;
0515         if (doOrbitOffset_) {
0516           map<uint32_t, uint32_t>::iterator it = orbitOffset_.find(runNumber);
0517           if (it == orbitOffset_.end()) {
0518             LogWarning("Matacq") << "Orbit offset not found for run " << runNumber
0519                                  << ". No orbit correction will be applied.";
0520           }
0521         }
0522 
0523         if (getMatacqFile(runNumber, orbitId, &fileChange)) {
0524           //matacq file retrieval succeeded
0525           LogInfo("Matacq") << "Matacq data file found for "
0526                             << "run " << runNumber << " orbit " << orbitId;
0527           if (getMatacqEvent(runNumber, orbitId, fileChange)) {
0528             if (produceDigis_) {
0529               formatter_.interpretRawData(matacq_, *digiColl);
0530             }
0531             if (produceRaw_) {
0532               uint32_t dataLen64 = matacq_.getParsedLen();
0533               if (dataLen64 > bufferSize * 8 || matacq_.getDccLen() != dataLen64) {
0534                 LogWarning("Matacq") << " Error in Matacq event fragment length! "
0535                                      << "DCC len: " << matacq_.getDccLen()
0536                                      << "*8 Bytes, Parsed len: " << matacq_.getParsedLen() << "*8 Bytes.  "
0537                                      << "Matacq data will not be included for this event.\n";
0538               } else {
0539                 rawColl->FEDData(matacqFedId_).resize(dataLen64 * 8);
0540                 copy(data_.begin(), data_.begin() + dataLen64 * 8, rawColl->FEDData(matacqFedId_).data());
0541               }
0542             }
0543             LogInfo("Matacq") << "Associating matacq data with orbit id " << matacq_.getOrbitId()
0544                               << " to dcc event with orbit id " << orbitId << std::endl;
0545             if (isLaserEvent) {
0546               ++stats_.nLaserEventsWithMatacq;
0547             } else {
0548               ++stats_.nNonLaserEventsWithMatacq;
0549             }
0550           } else {
0551             if (isLaserEvent) {
0552               LogWarning("Matacq") << "No matacq data found for laser event "
0553                                    << "of run " << runNumber << " orbit " << orbitId;
0554             }
0555           }
0556         } else {
0557           LogWarning("Matacq") << "No matacq file found for event " << event.id();
0558         }
0559       }
0560     }
0561     if (eventSkipCounter_ > 0) {  //error occured for this events
0562       //                       and some events will be skipped following
0563       //                       to this error.
0564       LogInfo("Matacq") << " [" << now() << "] " << eventSkipCounter_
0565                         << " next events will be skipped, following to an "
0566                         << "error on the last processed event, "
0567                         << "which is expected to be persistant.";
0568     }
0569   } else {
0570     --eventSkipCounter_;
0571   }
0572 
0573   if (produceRaw_) {
0574     if (verbosity_ > 1)
0575       cout << "[Matacq " << now() << "] "
0576            << "Adding FEDRawDataCollection collection "
0577            << " to event.\n";
0578     event.put(std::move(rawColl), rawInstanceName_);
0579   }
0580 
0581   if (produceDigis_) {
0582     if (verbosity_ > 1)
0583       cout << "[Matacq " << now() << "] "
0584            << "Adding EcalMatacqDigiCollection collection "
0585            << " to event.\n";
0586     event.put(std::move(digiColl), digiInstanceName_);
0587   }
0588 }
0589 
0590 // #if 0
0591 // bool
0592 // MatacqProducer::getMatacqEvent(std::ifstream& f,
0593 //                 uint32_t runNumber,
0594 //                 uint32_t orbitId,
0595 //                 bool doWrap,
0596 //                 std::streamoff maxPos){
0597 //   bool found = false;
0598 //   streampos startPos = f.tellg();
0599 
0600 //   while(!f.eof()
0601 //  && !found
0602 //  && (maxPos<0 || f.tellg()<=maxPos)){
0603 //     const streamsize headerSize = 8*8;
0604 //     f.read((char*)&data_[0], headerSize);
0605 //     if(f.eof()) break;
0606 //     int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
0607 //     uint32_t len = MatacqRawEvent::getDccLen(&data_[0], headerSize);
0608 //     uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
0609 //     //     cout << "Matacq: orbit = " << orb
0610 //     //    << " len = " << len
0611 //     //    << " run = " << run << endl;
0612 //     if((abs(orb-(int32_t)orbitId) < orbitTolerance_)
0613 //        && (runNumber==0 || runNumber==run)){
0614 //       found = true;
0615 //       //reads the rest of the event:
0616 //       if(data_.size() < len*8){
0617 //  throw cms::Exception("Matacq") << "Buffer overflow";
0618 //       }
0619 //       f.read((char*)&data_[0]+headerSize, len*8-headerSize);
0620 //       matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len*8);
0621 //     } else{
0622 //       //moves to next event:
0623 //       f.seekg(len*8 - headerSize, ios::cur);
0624 //     }
0625 //   }
0626 
0627 //   f.clear(); //clears eof error to allow seekg
0628 //   if(doWrap && !found){
0629 //     f.seekg(0, ios::beg);
0630 //     found =  getMatacqEvent(f, runNumber, orbitId, false, startPos);
0631 //   }
0632 //   return found;
0633 // }
0634 //#endif
0635 
0636 bool MatacqProducer::getMatacqEvent(uint32_t runNumber, int32_t orbitId, bool fileChange) {
0637   filepos_t startPos;
0638   if (!mtell(startPos))
0639     return false;
0640 
0641   int32_t startOrb = -1;
0642   const size_t headerSize = 8 * 8;
0643   if (mread((char*)&data_[0], headerSize, "Reading matacq header", true)) {
0644     startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
0645     if (startOrb < 0)
0646       startOrb = 0;
0647   } else {
0648     if (verbosity_ > 2) {
0649       cout << "[Matacq " << now()
0650            << "] Failed to read matacq header. Moved to start of "
0651               " the file.\n";
0652     }
0653     mrewind();
0654     if (mread((char*)&data_[0], headerSize, "Reading matacq header", true)) {
0655       startPos = 0;
0656       startOrb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
0657     } else {
0658       if (verbosity_ > 2)
0659         cout << "[Matacq " << now() << "] Looks like matacq file is empty"
0660              << "\n";
0661       return false;
0662     }
0663   }
0664 
0665   if (verbosity_ > 2)
0666     cout << "[Matacq " << now() << "] Last read orbit: " << lastOrb_ << " looking for orbit " << orbitId
0667          << ". Current file position: " << startPos << " Orbit at current position: " << startOrb << "\n";
0668 
0669   //  f.clear();
0670   bool didCoarseMove = false;
0671 
0672   //FIXME: case where posEtim_.invalid() is false
0673   if (!posEstim_.invalid() && (abs(lastOrb_ - orbitId) > fastRetrievalThresh_)) {
0674     filepos_t pos = posEstim_.pos(orbitId);
0675 
0676     //    struct stat st;
0677     filepos_t fsize = -1;
0678     //    if(0==stat(inFileName_.c_str(), &st)){
0679     if (msize(fsize)) {
0680       //      const int64_t fsize = st.st_size;
0681       if (0 != posEstim_.eventLength() && pos > fsize) {
0682         //estimated position is beyong end of file
0683         //-> move to beginning of last event:
0684         int64_t evtSize = posEstim_.eventLength() * sizeof(uint64_t);
0685         pos = ((int64_t)fsize / evtSize - 1) * evtSize;
0686         if (verbosity_ > 2) {
0687           cout << "[Matacq " << now()
0688                << "] Estimated position was beyond end of file. "
0689                   "Changed to "
0690                << pos << "\n";
0691         }
0692       }
0693     } else {
0694       LogWarning("Matacq") << "Failed to access file " << inFileName_ << ".";
0695     }
0696     if (pos >= 0) {
0697       if (verbosity_ > 2)
0698         cout << "[Matacq " << now() << "] jumping to estimated position " << pos << "\n";
0699       mseek(pos, SEEK_SET, "Jumping to estimated event position");
0700       if (mread((char*)&data_[0], headerSize, "Reading matacq header", true)) {
0701         didCoarseMove = true;
0702       } else {
0703         //estimated position might have been beyond the end of the file,
0704         //try, with original position:
0705         didCoarseMove = false;
0706         if (!mread((char*)&data_[0], headerSize, "Reading event header", true)) {
0707           return false;
0708         }
0709       }
0710     } else {
0711       if (verbosity_)
0712         cout << "[Matacq " << now()
0713              << "] Event orbit outside of orbit range "
0714                 "of matacq data file events\n";
0715       return false;
0716     }
0717   }
0718 
0719   int32_t orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
0720 
0721   if (didCoarseMove) {
0722     //autoadjustement of threshold for coarse move:
0723     if (abs(orb - orbitId) > fastRetrievalThresh_) {
0724       if (verbosity_ > 2)
0725         cout << "[Matacq " << now() << "] Fast retrieval threshold increased from " << fastRetrievalThresh_;
0726       fastRetrievalThresh_ = 2 * abs(orb - orbitId);
0727       if (verbosity_ > 2)
0728         cout << " to " << fastRetrievalThresh_ << "\n";
0729     }
0730 
0731     //if coarse move did not improve situation, rolls back:
0732     if (startOrb > 0 && (abs(orb - orbitId) > abs(startOrb - orbitId))) {
0733       if (verbosity_ > 2)
0734         cout << "[Matacq " << now() << "] Estimation (-> orbit " << orb
0735              << ") "
0736                 "was worst than original position (-> orbit "
0737              << startOrb << "). Restoring position (" << startPos << ").\n";
0738       mseek(startPos, SEEK_SET);
0739       mread((char*)&data_[0], headerSize, "Reading event header", true);
0740       orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
0741     }
0742   }
0743 
0744   bool searchBackward = (orb > orbitId) ? true : false;
0745   //BEWARE: len must be signed, because we are using latter in the code (-len)
0746   //expression
0747   int len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
0748 
0749   if (len == 0) {
0750     cout << "[Matacq " << now() << "] read DCC length is null! Cancels matacq event search "
0751          << " and move matacq file pointer to beginning of the file. "
0752          << "(" << __FILE__ << ":" << __LINE__ << ")."
0753          << "\n";
0754     //rewind(f);
0755     mrewind();
0756     return false;
0757   }
0758 
0759   enum state_t { searching, found, failed } state = searching;
0760 
0761   while (state == searching) {
0762     orb = MatacqRawEvent::getOrbitId(&data_[0], headerSize);
0763     len = (int)MatacqRawEvent::getDccLen(&data_[0], headerSize);
0764     uint32_t run = MatacqRawEvent::getRunNum(&data_[0], headerSize);
0765     if (verbosity_ > 3) {
0766       filepos_t pos = -1;
0767       mtell(pos);
0768       cout << "[Matacq " << now() << "] Header read at file position " << pos << ":  orbit = " << orb
0769            << " len = " << len << "x8 Byte"
0770            << " run = " << run << "\n";
0771     }
0772     if ((abs(orb - orbitId) < orbitTolerance_) && (runNumber == 0 || runNumber == run)) {
0773       state = found;
0774       lastOrb_ = orb;
0775       //reads the rest of the event:
0776       if ((int)data_.size() < len * 8) {
0777         throw cms::Exception("Matacq") << "Buffer overflow";
0778       }
0779       if (verbosity_ > 2)
0780         cout << "[Matacq " << now()
0781              << "] Event found. Reading "
0782                 " matacq event."
0783              << "\n";
0784       if (!mread((char*)&data_[0], len * 8, "Reading matacq event")) {
0785         if (verbosity_ > 2)
0786           cout << "[Matacq " << now() << "] Failed to read matacq event."
0787                << "\n";
0788         state = failed;
0789       }
0790       matacq_ = MatacqRawEvent((unsigned char*)&data_[0], len * 8);
0791     } else {
0792       if ((searchBackward && (orb < orbitId)) || (!searchBackward && (orb > orbitId))) {  //search ended
0793         lastOrb_ = orb;
0794         state = failed;
0795         if (verbosity_ > 2)
0796           cout << "[Matacq " << now() << "] No matacq data found for run " << run << ", orbit ID " << orbitId << "."
0797                << "\n";
0798       } else {
0799         off_t offset = (searchBackward ? -len : len) * 8;
0800         lastOrb_ = orb;
0801         if (verbosity_ > 3) {
0802           cout << "[Matacq " << now() << "] In matacq file, moving " << abs(offset) << " byte "
0803                << (offset > 0 ? "forward" : "backward") << ".\n";
0804         }
0805 
0806         if (mseek(offset, SEEK_CUR, (searchBackward ? "Moving to previous event" : "Moving to next event")) &&
0807             mread((char*)&data_[0], headerSize, "Reading event header", true)) {
0808         } else {
0809           if (!searchBackward)
0810             mseek(-len * 8, SEEK_CUR, "Moving to start of last complete event");
0811           state = failed;
0812         }
0813       }
0814     }
0815   }
0816 
0817   if (state == found) {
0818     filepos_t pos = -1;
0819     filepos_t fsize = -1;
0820     mtell(pos);
0821     msize(fsize);
0822     if (pos == fsize - 1) {  //last byte.
0823       if (verbosity_ > 2) {
0824         cout << "[Matacq " << now()
0825              << "] Event found was at the end of the file. Moving "
0826                 "stream position to beginning of this event."
0827              << "\n";
0828       }
0829       mseek(-(int)len * 8 - 1, SEEK_CUR, "Moving to beginning of last matacq event");
0830     }
0831   }
0832   return (state == found);
0833 }
0834 
0835 bool MatacqProducer::getMatacqFile(uint32_t runNumber, uint32_t orbitId, bool* fileChange) {
0836   if (openedFileRunNumber_ != 0 && openedFileRunNumber_ == runNumber) {
0837     uint32_t firstOrb, lastOrb;
0838     bool goodRange = getOrbitRange(firstOrb, lastOrb);
0839     //    if(orbitId < firstOrb || orbitId > lastOrb) continue;
0840     if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
0841       if (fileChange != nullptr)
0842         *fileChange = false;
0843       return misOpened();
0844     }
0845   }
0846 
0847   if (fileNames_.empty())
0848     return false;
0849 
0850   const string runNumberFormat = "%08d{,_*}";
0851   string sRunNumber = fmt::sprintf(runNumberFormat, runNumber);
0852   //cout << "Run number string: " << sRunNumber << "\n";
0853   bool found = false;
0854   string fname;
0855   uint32_t maxOrb = 0;
0856   //we make two iterations to handle the case where the event is procesed
0857   //before the matacq data are available. In such case we would have
0858   //orbitId > maxOrb (maxOrb: orbit of last written matacq event)
0859   //  for(int itry = 0; itry < 2 && (orbitId > maxOrb); ++itry){
0860   for (int itry = 0; itry < 1 && (orbitId > maxOrb); ++itry) {
0861     if (itry > 0) {
0862       int n_sec = 1;
0863       std::cout << "[Matacq " << now() << "] Event orbit id (" << orbitId
0864                 << ") goes "
0865                    "beyound the range of available one. Waiting for "
0866                 << n_sec
0867                 << " seconds in case "
0868                    "it was not written yet to disk.";
0869       sleep(n_sec);
0870     }
0871 
0872     for (unsigned i = 0; i < fileNames_.size() && !found; ++i) {
0873       fname = fileNames_[i];
0874       boost::algorithm::replace_all(fname, "%run_subdir%", runSubDir(runNumber));
0875       boost::algorithm::replace_all(fname, "%run_number%", sRunNumber);
0876 
0877       glob_t g;
0878       int rc = glob(fname.c_str(), GLOB_BRACE, nullptr, &g);
0879       if (rc) {
0880         if (verbosity_ > 1) {
0881           switch (rc) {
0882             case GLOB_NOSPACE:
0883               std::cout << "[Matacq " << now()
0884                         << "] Running out of memory while calling glob function to look for matacq file paths\n";
0885               break;
0886             case GLOB_ABORTED:
0887               std::cout << "[Matacq " << now()
0888                         << "] Read error while calling glob function to look for matacq file paths\n";
0889               break;
0890             case GLOB_NOMATCH:
0891               //ok. No message to report.
0892               break;
0893           }
0894           continue;
0895         }
0896       }  //rc
0897       for (unsigned iglob = 0; iglob < g.gl_pathc; ++iglob) {
0898         char* thePath = g.gl_pathv[iglob];
0899         //FIXME: add sanity check on the path
0900         static std::atomic<int> nOpenErrors{0};
0901         const int maxOpenErrors = 50;
0902         if (!mopen(thePath) && nOpenErrors < maxOpenErrors) {
0903           std::cout << "[Matacq " << now() << "] Failed to open file " << thePath;
0904           ++nOpenErrors;
0905           if (nOpenErrors == maxOpenErrors) {
0906             std::cout << nOpenErrors << "This is the " << maxOpenErrors
0907                       << "th occurence of this error. Report of this error is now disabled.\n";
0908           } else {
0909             std::cout << "\n";
0910           }
0911         }
0912         uint32_t firstOrb;
0913         uint32_t lastOrb;
0914         bool goodRange = getOrbitRange(firstOrb, lastOrb);
0915         std::cout << "Get orbit range " << (goodRange ? "succeeded" : "failed") << ". Range: " << firstOrb << "..."
0916                   << lastOrb << "\n";
0917         if (goodRange && lastOrb > maxOrb)
0918           maxOrb = lastOrb;
0919         if (goodRange && firstOrb <= orbitId && orbitId <= lastOrb) {
0920           found = true;
0921           //continue;
0922           fname = thePath;
0923           if (verbosity_ > 1)
0924             std::cout << "[Matacq " << now() << "] Switching to file " << fname << "\n";
0925           break;
0926         }
0927       }  //next iglob
0928       globfree(&g);
0929     }  //next filenames
0930   }    //next itry
0931 
0932   if (found) {
0933     LogInfo("Matacq") << "Uses matacq data file: '" << fname << "'\n";
0934   } else {
0935     if (verbosity_ >= 0)
0936       cout << "[Matacq " << now()
0937            << "] no matacq file found "
0938               "for run "
0939            << runNumber << ", orbit " << orbitId << "\n";
0940     eventSkipCounter_ = onErrorDisablingEvtCnt_;
0941     openedFileRunNumber_ = 0;
0942     if (fileChange != nullptr)
0943       *fileChange = false;
0944     return false;
0945   }
0946 
0947   if (found) {
0948     openedFileRunNumber_ = runNumber;
0949     lastOrb_ = 0;
0950     posEstim_.init(this);
0951     if (fileChange != nullptr)
0952       *fileChange = true;
0953     return true;
0954   } else {
0955     return false;
0956   }
0957 }
0958 
0959 uint32_t MatacqProducer::getRunNumber(edm::Event& ev) const { return ev.run(); }
0960 
0961 uint32_t MatacqProducer::getOrbitId(edm::Event& ev) const {
0962   //on CVS HEAD (June 4, 08), class Event has a method orbitNumber()
0963   //we could use here. The code would be shorten to:
0964   //return ev.orbitNumber();
0965   //we have to deal with what we have in current CMSSW releases:
0966   edm::Handle<FEDRawDataCollection> rawdata;
0967   ev.getByToken(inputRawCollectionToken_, rawdata);
0968   if (!(rawdata.isValid())) {
0969     throw cms::Exception("NotFound") << "No FED raw data collection found. ECAL raw data are "
0970                                         "required to retrieve the orbit ID";
0971   }
0972 
0973   int orbit = 0;
0974   for (int id = 601; id <= 654; ++id) {
0975     if (!FEDNumbering::inRange(id))
0976       continue;
0977     const FEDRawData& data = rawdata->FEDData(id);
0978     const int orbitIdOffset64 = 3;
0979     if (data.size() >= 8 * (orbitIdOffset64 + 1)) {  //orbit id is in 4th 64-bit word
0980       const unsigned char* pOrbit = data.data() + orbitIdOffset64 * 8;
0981       int thisOrbit = pOrbit[0] | (pOrbit[1] << 8) | (pOrbit[2] << 16) | (pOrbit[3] << 24);
0982       if (orbit != 0 && thisOrbit != 0 && abs(orbit - thisOrbit) > orbitTolerance_) {
0983         //throw cms::Exception("EventCorruption")
0984         //  << "Orbit ID inconsitency in DCC headers";
0985         LogWarning("EventCorruption") << "Orbit ID inconsitency in DCC headers";
0986         orbit = 0;
0987         break;
0988       }
0989       if (thisOrbit != 0)
0990         orbit = thisOrbit;
0991     }
0992   }
0993 
0994   if (orbit == 0) {
0995     //    throw cms::Exception("NotFound")
0996     //  << "Failed to retrieve orbit ID of event "<< ev.id();
0997     LogWarning("NotFound") << "Failed to retrieve orbit ID of event " << ev.id();
0998   }
0999   return orbit;
1000 }
1001 
1002 int MatacqProducer::getCalibTriggerType(edm::Event& ev) const {
1003   edm::Handle<FEDRawDataCollection> rawdata;
1004   ev.getByToken(inputRawCollectionToken_, rawdata);
1005   if (!(rawdata.isValid())) {
1006     throw cms::Exception("NotFound") << "No FED raw data collection found. ECAL raw data are "
1007                                         "required to retrieve the trigger type";
1008   }
1009 
1010   Majority<int> stat;
1011   for (int id = 601; id <= 654; ++id) {
1012     if (!FEDNumbering::inRange(id))
1013       continue;
1014     const FEDRawData& data = rawdata->FEDData(id);
1015     const int detailedTrigger32 = 5;
1016     if (data.size() >= 4 * (detailedTrigger32 + 1)) {
1017       const unsigned char* pTType = data.data() + detailedTrigger32 * 4;
1018       int tType = pTType[1] & 0x7;
1019       stat.add(tType);
1020     }
1021   }
1022   double p;
1023   int tType = stat.result(&p);
1024   if (p < 0) {
1025     //throw cms::Exception("NotFound") << "No ECAL DCC data found\n";
1026     LogWarning("NotFound") << "No ECAL DCC data found\n";
1027     tType = -1;
1028   }
1029   if (p < .8) {
1030     //throw cms::Exception("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
1031     LogWarning("EventCorruption") << "Inconsitency in detailed trigger type indicated in ECAL DCC data headers\n";
1032     tType = -1;
1033   }
1034   return tType;
1035 }
1036 
1037 void MatacqProducer::PosEstimator::init(MatacqProducer* mp) {
1038   mp->mrewind();
1039 
1040   const size_t headerSize = 8 * 8;
1041   unsigned char data[headerSize];
1042   if (!mp->mread((char*)data, headerSize)) {
1043     if (verbosity_)
1044       cout << "[Matacq " << now() << "] reached end of file!\n";
1045     firstOrbit_ = eventLength_ = orbitStepMean_ = 0;
1046     return;
1047   } else {
1048     firstOrbit_ = MatacqRawEvent::getOrbitId(data, headerSize);
1049     eventLength_ = MatacqRawEvent::getDccLen(data, headerSize);
1050     if (verbosity_ > 1)
1051       cout << "[Matacq " << now() << "] First event orbit: " << firstOrbit_ << " event length: " << eventLength_
1052            << "*8 byte\n";
1053   }
1054 
1055   mp->mrewind();
1056 
1057   if (eventLength_ == 0) {
1058     if (verbosity_)
1059       cout << "[Matacq " << now() << "] event length is null!" << endl;
1060     return;
1061   }
1062 
1063   filepos_t s = -1;
1064   mp->msize(s);
1065 
1066   if (s == -1) {
1067     if (verbosity_)
1068       cout << "[Matacq " << now() << "] File is missing!" << endl;
1069     orbitStepMean_ = 0;
1070     return;
1071   } else if (s == 0) {
1072     if (verbosity_)
1073       cout << "[Matacq " << now() << "] File is empty!" << endl;
1074     orbitStepMean_ = 0;
1075     return;
1076   }
1077 
1078   //number of complete events:
1079   const unsigned nEvents = s / eventLength_ / 8;
1080 
1081   if (verbosity_ > 1)
1082     cout << "[Matacq " << now() << "] File size: " << s << " Number of events: " << nEvents << endl;
1083 
1084   //position of last complete events:
1085   off_t last = (nEvents - 1) * (off_t)eventLength_ * 8;
1086   mp->mseek(last,
1087             SEEK_SET,
1088             "Moving to beginning of last complete "
1089             "matacq event");
1090   if (!mp->mread((char*)data, headerSize, "Reading matacq header", true)) {
1091     LogWarning("Matacq") << "Fast matacq event retrieval failure. "
1092                             "Falling back to safe retrieval mode.";
1093     orbitStepMean_ = 0;
1094   }
1095 
1096   int32_t lastOrb = MatacqRawEvent::getOrbitId(data, headerSize);
1097   int32_t lastLen = MatacqRawEvent::getDccLen(data, headerSize);
1098 
1099   if (verbosity_ > 1)
1100     cout << "[Matacq " << now() << "] Last event orbit: " << lastOrb << " last event length: " << lastLen << endl;
1101 
1102   //some consistency check
1103   if (lastLen != eventLength_) {
1104     LogWarning("Matacq")
1105         //throw cms::Exception("Matacq")
1106         << "Fast matacq event retrieval failure: it looks like "
1107            "the matacq file contains events of different sizes.";
1108     //      " Falling back to safe retrieval mode.";
1109     invalid_ = false;      //true;
1110     orbitStepMean_ = 112;  //0;
1111     return;
1112   }
1113 
1114   orbitStepMean_ = (lastOrb - firstOrbit_) / nEvents;
1115 
1116   if (verbosity_ > 1)
1117     cout << "[Matacq " << now() << "] Orbit step mean: " << orbitStepMean_ << "\n";
1118 
1119   invalid_ = false;
1120 }
1121 
1122 int64_t MatacqProducer::PosEstimator::pos(int orb) const {
1123   if (orb < firstOrbit_)
1124     return -1;
1125   uint64_t r = orbitStepMean_ != 0 ? (((uint64_t)(orb - firstOrbit_)) / orbitStepMean_) * eventLength_ * 8 : 0;
1126   if (verbosity_ > 2)
1127     cout << "[Matacq " << now() << "] Estimated Position for orbit  " << orb << ": " << r << endl;
1128   return r;
1129 }
1130 
1131 MatacqProducer::~MatacqProducer() {
1132   mclose();
1133   timeval t;
1134   gettimeofday(&t, nullptr);
1135   if (logTiming_ && startTime_.tv_sec != 0) {
1136     //not using logger, to allow timing with different logging options
1137     cout << "[Matacq " << now()
1138          << "] Time elapsed between first event and "
1139             "destruction of MatacqProducer: "
1140          << ((t.tv_sec - startTime_.tv_sec) * 1. + (t.tv_usec - startTime_.tv_usec) * 1.e-6) << "s\n";
1141   }
1142 }
1143 
1144 void MatacqProducer::loadOrbitOffset() {
1145   std::ifstream f(orbitOffsetFile_.c_str());
1146   if (f.bad()) {
1147     throw cms::Exception("Matacq") << "Failed to open orbit ID correction file '" << orbitOffsetFile_ << "'\n";
1148   }
1149 
1150   cout << "[Matacq " << now() << "] "
1151        << "Offset to substract to Matacq events Orbit ID: \n"
1152        << "#Run Number\t Offset\n";
1153 
1154   string s;
1155   stringstream buf;
1156   while (f.eof()) {
1157     getline(f, s);
1158     if (s[0] == '#') {  //comment
1159       //skip line:
1160       f.ignore(numeric_limits<streamsize>::max(), '\n');
1161       continue;
1162     }
1163     buf.str("");
1164     buf << s;
1165     int run;
1166     int orbit;
1167     buf >> run;
1168     buf >> orbit;
1169     if (buf.bad()) {
1170       throw cms::Exception("Matacq") << "Syntax error in Orbit offset file '" << orbitOffsetFile_ << "'";
1171     }
1172     cout << run << "\t" << orbit << "\n";
1173     orbitOffset_.insert(pair<int, int>(run, orbit));
1174   }
1175 }
1176 
1177 #ifdef USE_STORAGE_MANAGER
1178 bool MatacqProducer::mseek(filepos_t offset, int whence, const char* mess) {
1179   if (0 == inFile_.get())
1180     return false;
1181   try {
1182     Storage::Relative wh;
1183     if (whence == SEEK_SET)
1184       wh = Storage::SET;
1185     else if (whence == SEEK_CUR)
1186       wh = Storage::CURRENT;
1187     else if (whence == SEEK_END)
1188       wh = Storage::END;
1189     else
1190       throw cms::Exception("Bug") << "Bug found in " << __FILE__ << ": " << __LINE__ << "\n";
1191 
1192     inFile_->position(offset, wh);
1193   } catch (cms::Exception& e) {
1194     if (verbosity_) {
1195       cout << "[Matacq " << now() << "] ";
1196       if (mess)
1197         cout << mess << ". ";
1198       cout << "Random access error on input matacq file. ";
1199       if (whence == SEEK_SET)
1200         cout << "Failed to seek absolute position " << offset;
1201       else if (whence == SEEK_CUR)
1202         cout << "Failed to move " << offset << " bytes forward";
1203       else if (whence == SEEK_END)
1204         cout << "Failed to seek position at " << offset << " bytes before end of file";
1205       cout << ". Reopening file. " << e.what() << "\n";
1206       mopen(inFileName_);
1207       return false;
1208     }
1209   }
1210   return true;
1211 }
1212 
1213 bool MatacqProducer::mtell(filepos_t& pos) {
1214   if (0 == inFile_.get())
1215     return false;
1216   pos = inFile_->position();
1217   return true;
1218 }
1219 
1220 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek) {
1221   if (0 == inFile_.get())
1222     return false;
1223 
1224   filepos_t pos = -1;
1225   if (!mtell(pos))
1226     return false;
1227 
1228   bool rc = false;
1229   try {
1230     rc = (n == inFile_->xread(buf, n));
1231   } catch (cms::Exception& e) {
1232     if (verbosity_) {
1233       cout << "[Matacq " << now() << "] ";
1234       if (mess)
1235         cout << mess << ". ";
1236       cout << "Read failure from input matacq file: " << e.what() << "\n";
1237     }
1238     //recovering from error:
1239     mopen(inFileName_);
1240     mseek(pos);
1241     return false;
1242   }
1243   if (peek) {  //asked to restore original file position
1244     mseek(pos);
1245   }
1246   return rc;
1247 }
1248 
1249 bool MatacqProducer::msize(filepos_t& s) {
1250   if (inFile_.get() == 0)
1251     return false;
1252   s = inFile_.get()->size();
1253   return true;
1254 }
1255 
1256 bool MatacqProducer::mrewind() {
1257   Storage* file = inFile_.get();
1258   if (file == 0)
1259     return false;
1260   try {
1261     file->rewind();
1262   } catch (cms::Exception e) {
1263     if (verbosity_)
1264       cout << "Exception cautgh while rewinding file " << inFileName_ << ": " << e.what() << ". "
1265            << "File will be reopened.";
1266     return mopen(inFileName_);
1267   }
1268   return true;
1269 }
1270 
1271 bool MatacqProducer::mcheck(const std::string& name) { return StorageFactory::get()->check(name); }
1272 
1273 bool MatacqProducer::mopen(const std::string& name) {
1274   //close already opened file if any:
1275   mclose();
1276 
1277   try {
1278     inFile_ = unique_ptr<Storage>(StorageFactory::get()->open(name, IOFlags::OpenRead));
1279     inFileName_ = name;
1280   } catch (cms::Exception& e) {
1281     LogWarning("Matacq") << e.what();
1282     inFile_.reset();
1283     inFileName_ = "";
1284     return false;
1285   }
1286   return true;
1287 }
1288 
1289 void MatacqProducer::mclose() {
1290   if (inFile_.get() != 0) {
1291     inFile_->close();
1292     inFile_.reset();
1293   }
1294 }
1295 
1296 bool MatacqProducer::misOpened() { return inFile_.get() != 0; }
1297 
1298 bool MatacqProducer::meof() {
1299   if (inFile_.get() == 0)
1300     return true;
1301   return inFile_->eof();
1302 }
1303 
1304 #else  //USE_STORAGE_MANAGER not defined
1305 bool MatacqProducer::mseek(off_t offset, int whence, const char* mess) {
1306   if (nullptr == inFile_)
1307     return false;
1308   const int rc = fseeko(inFile_, offset, whence);
1309   if (rc != 0 && verbosity_) {
1310     cout << "[Matacq " << now() << "] ";
1311     if (mess)
1312       cout << mess << ". ";
1313     cout << "Random access error on input matacq file. "
1314             "Rewind file.\n";
1315     mrewind();
1316   }
1317   return rc == 0;
1318 }
1319 
1320 bool MatacqProducer::mtell(filepos_t& pos) {
1321   if (nullptr == inFile_)
1322     return false;
1323   pos = ftello(inFile_);
1324   return pos != -1;
1325 }
1326 
1327 bool MatacqProducer::mread(char* buf, size_t n, const char* mess, bool peek) {
1328   if (nullptr == inFile_)
1329     return false;
1330   off_t pos = ftello(inFile_);
1331   bool rc = (pos != -1) && (1 == fread(buf, n, 1, inFile_));
1332   if (!rc) {
1333     if (verbosity_) {
1334       cout << "[Matacq " << now() << "] ";
1335       if (mess)
1336         cout << mess << ". ";
1337       cout << "Read failure from input matacq file.\n";
1338     }
1339     clearerr(inFile_);
1340   }
1341   if (peek || !rc) {  //need to restore file position
1342     if (0 != fseeko(inFile_, pos, SEEK_SET)) {
1343       if (verbosity_) {
1344         cout << "[Matacq " << now() << "] ";
1345         if (mess)
1346           cout << mess << ". ";
1347         cout << "Failed to restore file position of "
1348                 "before read error. Rewind file.\n";
1349       }
1350       //rewind(inFile_.get());
1351       mrewind();
1352       lastOrb_ = 0;
1353     }
1354   }
1355   return rc;
1356 }
1357 
1358 bool MatacqProducer::msize(filepos_t& s) {
1359   if (nullptr == inFile_)
1360     return false;
1361   struct stat buf;
1362   if (0 != fstat(fileno(inFile_), &buf)) {
1363     s = 0;
1364     return false;
1365   } else {
1366     s = buf.st_size;
1367     return true;
1368   }
1369 }
1370 
1371 bool MatacqProducer::mrewind() {
1372   if (nullptr == inFile_)
1373     return false;
1374   clearerr(inFile_);
1375   return fseeko(inFile_, 0, SEEK_SET) != 0;
1376 }
1377 
1378 bool MatacqProducer::mcheck(const std::string& name) {
1379   struct stat dummy;
1380   return 0 == stat(name.c_str(), &dummy);
1381   //   if(stat(name.c_str(), &dummy)==0){
1382   //     return true;
1383   //   } else{
1384   //     cout << "[Matacq " << now() << "] Failed to stat file '"
1385   //     << name.c_str() << "'. "
1386   //     << "Error " << errno << ": " << strerror(errno) << "\n";
1387   //     return false;
1388   //   }
1389 }
1390 
1391 bool MatacqProducer::mopen(const std::string& name) {
1392   if (inFile_ != nullptr)
1393     mclose();
1394   inFile_ = fopen(name.c_str(), "r");
1395   if (inFile_ != nullptr) {
1396     inFileName_ = name;
1397     return true;
1398   } else {
1399     inFileName_ = "";
1400     return false;
1401   }
1402 }
1403 
1404 void MatacqProducer::mclose() {
1405   if (inFile_ != nullptr)
1406     fclose(inFile_);
1407   inFile_ = nullptr;
1408 }
1409 
1410 bool MatacqProducer::misOpened() { return inFile_ != nullptr; }
1411 
1412 bool MatacqProducer::meof() {
1413   if (nullptr == inFile_)
1414     return true;
1415   return feof(inFile_) == 0;
1416 }
1417 
1418 #endif  //USE_STORAGE_MANAGER defined
1419 
1420 std::string MatacqProducer::runSubDir(uint32_t runNumber) {
1421   int millions = runNumber / (1000 * 1000);
1422   int thousands = (runNumber - millions * 1000 * 1000) / 1000;
1423   int units = runNumber - millions * 1000 * 1000 - thousands * 1000;
1424   return fmt::sprintf("%03d/%03d/%03d", millions, thousands, units);
1425 }
1426 
1427 void MatacqProducer::newRun(int prevRun, int newRun) {
1428   runNumber_ = newRun;
1429   eventSkipCounter_ = 0;
1430   logFile_ << "[" << now() << "] Event count for run " << runNumber_ << ": "
1431            << "total: " << stats_.nEvents << ", "
1432            << "Laser event with Matacq data: " << stats_.nLaserEventsWithMatacq << ", "
1433            << "Non laser event (according to DCC header) with Matacq data: " << stats_.nNonLaserEventsWithMatacq << "\n"
1434            << flush;
1435 
1436   stats_.nEvents = 0;
1437   stats_.nLaserEventsWithMatacq = 0;
1438   stats_.nNonLaserEventsWithMatacq = 0;
1439 }
1440 
1441 bool MatacqProducer::getOrbitRange(uint32_t& firstOrb, uint32_t& lastOrb) {
1442   filepos_t pos = -1;
1443   filepos_t fsize = -1;
1444   mtell(pos);
1445   msize(fsize);
1446   const unsigned headerSize = 8 * 8;
1447   unsigned char header[headerSize];
1448   //FIXME: Don't we need here to rewind?
1449   mseek(0);
1450   if (!mread((char*)header, headerSize, nullptr, false))
1451     return false;
1452   firstOrb = MatacqRawEvent::getOrbitId(header, headerSize);
1453   int len = (int)MatacqRawEvent::getDccLen(header, headerSize);
1454   //number of complete events. If last event is partially written,
1455   //it won't be included in the count.
1456   unsigned nEvts = fsize / (len * 8);
1457   //Position of last complete event:
1458   filepos_t lastEvtPos = (filepos_t)(nEvts - 1) * len * 8;
1459   //  std::cout << "Move to position : " << lastEvtPos
1460   //        << "(" << (nEvts - 1) << "*" << len << "*" << 64 << ")"
1461   //<< "\n";
1462   mseek(lastEvtPos);
1463   filepos_t tmp;
1464   mtell(tmp);
1465   //std::cout << "New position, sizeof(tmp): " << tmp << "," << sizeof(tmp) << "\n";
1466   mread((char*)header, headerSize, nullptr, false);
1467   lastOrb = MatacqRawEvent::getOrbitId(header, headerSize);
1468 
1469   //restore file position:
1470   mseek(pos);
1471 
1472   return true;
1473 }
1474 
1475 #include "FWCore/Framework/interface/MakerMacros.h"
1476 DEFINE_FWK_MODULE(MatacqProducer);