Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:10:10

0001 #include "DQMServices/Core/src/DQMService.h"
0002 #include "DQMServices/Core/interface/DQMNet.h"
0003 #include "DQMServices/Core/interface/DQMStore.h"
0004 #include "DQMServices/Core/interface/DQMScope.h"
0005 #include "DQMServices/Core/interface/MonitorElement.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include <mutex>
0008 #include <iostream>
0009 #include <string>
0010 #include <memory>
0011 #include "TBufferFile.h"
0012 
0013 // -------------------------------------------------------------------
0014 static std::recursive_mutex s_mutex;
0015 
0016 /// Acquire lock and access to the DQM core from a thread other than
0017 /// the "main" CMSSW processing thread, such as in extra XDAQ threads.
0018 DQMScope::DQMScope() { s_mutex.lock(); }
0019 
0020 /// Release access lock to the DQM core.
0021 DQMScope::~DQMScope() { s_mutex.unlock(); }
0022 
0023 // -------------------------------------------------------------------
0024 DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
0025     : store_(&*edm::Service<DQMStore>()), net_(nullptr), lastFlush_(0), publishFrequency_(5.0) {
0026   ar.watchPostEvent(this, &DQMService::flush);
0027   ar.watchPostStreamEndLumi(this, &DQMService::flush);
0028 
0029   std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
0030   int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
0031   bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
0032   publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
0033 
0034   if (!host.empty() && port > 0) {
0035     net_ = new DQMBasicNet;
0036     net_->debug(verbose);
0037     net_->updateToCollector(host, port);
0038     net_->start();
0039   }
0040 }
0041 
0042 DQMService::~DQMService() { shutdown(); }
0043 
0044 // Flush updates to the network layer at the end of each event.  This
0045 // is the only point at which the main application and the network
0046 // layer interact outside initialisation and exit.
0047 void DQMService::flushStandalone() {
0048   // Avoid sending updates excessively often.
0049   uint64_t version = lat::Time::current().ns();
0050   double vtime = version * 1e-9;
0051   if (vtime - lastFlush_ < publishFrequency_)
0052     return;
0053 
0054   // OK, send an update.
0055   if (net_) {
0056     DQMNet::Object o;
0057     std::set<std::string> seen;
0058     std::string fullpath;
0059 
0060     // Lock the network layer so we can modify the data.
0061     net_->lock();
0062     bool updated = false;
0063 
0064     auto mes = store_->getAllContents("");
0065     for (MonitorElement *me : mes) {
0066       auto fullpath = me->getFullname();
0067       seen.insert(fullpath);
0068       if (!me->wasUpdated())
0069         continue;
0070 
0071       o.lastreq = 0;
0072       o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
0073       o.flags = me->data_.flags;
0074       o.version = version;
0075       o.dirname = me->data_.dirname.substr(0, me->data_.dirname.size() - 1);
0076       o.objname = me->data_.objname;
0077       assert(o.rawdata.empty());
0078       assert(o.scalar.empty());
0079       assert(o.qdata.empty());
0080 
0081       // Pack object and reference, scalar and quality data.
0082 
0083       switch (me->kind()) {
0084         case MonitorElement::Kind::INT:
0085         case MonitorElement::Kind::REAL:
0086         case MonitorElement::Kind::STRING:
0087           me->packScalarData(o.scalar, "");
0088           break;
0089         default: {
0090           TBufferFile buffer(TBufferFile::kWrite);
0091           buffer.WriteObject(me->getTH1());
0092           // placeholder for (no longer supported) reference
0093           buffer.WriteObjectAny(nullptr, nullptr);
0094           o.rawdata.resize(buffer.Length());
0095           memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
0096           DQMNet::packQualityData(o.qdata, me->data_.qreports);
0097           break;
0098         }
0099       }
0100 
0101       net_->updateLocalObject(o);
0102       DQMNet::DataBlob().swap(o.rawdata);
0103       std::string().swap(o.scalar);
0104       std::string().swap(o.qdata);
0105       updated = true;
0106     }
0107 
0108     // Find removed contents and clear the network cache.
0109     if (net_->removeLocalExcept(seen))
0110       updated = true;
0111 
0112     // Unlock the network layer.
0113     net_->unlock();
0114 
0115     // Tell network to flush if we updated something.
0116     if (updated)
0117       net_->sendLocalChanges();
0118   }
0119 
0120   lastFlush_ = lat::Time::current().ns() * 1e-9;
0121 }
0122 void DQMService::flush(edm::StreamContext const &sc) {
0123   // Call a function independent to the framework
0124   flushStandalone();
0125 }
0126 
0127 // Disengage the network service.
0128 void DQMService::shutdown() {
0129   // If we have a network, let it go.
0130   if (net_)
0131     net_->shutdown();
0132 }