Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-02-14 13:13:48

0001 #include "DQMServices/Core/src/DQMService.h"
0002 #include "DQMServices/Core/interface/DQMNet.h"
0003 #include "DQMServices/Core/interface/DQMStore.h"
0004 #include "DQMServices/Core/interface/DQMScope.h"
0005 #include "DQMServices/Core/interface/MonitorElement.h"
0006 #include "FWCore/ServiceRegistry/interface/Service.h"
0007 #include "classlib/utils/Error.h"
0008 #include <mutex>
0009 #include <iostream>
0010 #include <string>
0011 #include <memory>
0012 #include "TBufferFile.h"
0013 
0014 // -------------------------------------------------------------------
0015 static std::recursive_mutex s_mutex;
0016 
0017 /// Acquire lock and access to the DQM core from a thread other than
0018 /// the "main" CMSSW processing thread, such as in extra XDAQ threads.
0019 DQMScope::DQMScope() { s_mutex.lock(); }
0020 
0021 /// Release access lock to the DQM core.
0022 DQMScope::~DQMScope() { s_mutex.unlock(); }
0023 
0024 // -------------------------------------------------------------------
0025 DQMService::DQMService(const edm::ParameterSet &pset, edm::ActivityRegistry &ar)
0026     : store_(&*edm::Service<DQMStore>()), net_(nullptr), lastFlush_(0), publishFrequency_(5.0) {
0027   ar.watchPostEvent(this, &DQMService::flush);
0028   ar.watchPostStreamEndLumi(this, &DQMService::flush);
0029 
0030   std::string host = pset.getUntrackedParameter<std::string>("collectorHost", "");
0031   int port = pset.getUntrackedParameter<int>("collectorPort", 9090);
0032   bool verbose = pset.getUntrackedParameter<bool>("verbose", false);
0033   publishFrequency_ = pset.getUntrackedParameter<double>("publishFrequency", publishFrequency_);
0034 
0035   if (!host.empty() && port > 0) {
0036     net_ = new DQMBasicNet;
0037     net_->debug(verbose);
0038     net_->updateToCollector(host, port);
0039     net_->start();
0040   }
0041 }
0042 
0043 DQMService::~DQMService() { shutdown(); }
0044 
0045 // Flush updates to the network layer at the end of each event.  This
0046 // is the only point at which the main application and the network
0047 // layer interact outside initialisation and exit.
0048 void DQMService::flushStandalone() {
0049   // Avoid sending updates excessively often.
0050   uint64_t version = lat::Time::current().ns();
0051   double vtime = version * 1e-9;
0052   if (vtime - lastFlush_ < publishFrequency_)
0053     return;
0054 
0055   // OK, send an update.
0056   if (net_) {
0057     DQMNet::Object o;
0058     std::set<std::string> seen;
0059     std::string fullpath;
0060 
0061     // Lock the network layer so we can modify the data.
0062     net_->lock();
0063     bool updated = false;
0064 
0065     auto mes = store_->getAllContents("");
0066     for (MonitorElement *me : mes) {
0067       auto fullpath = me->getFullname();
0068       seen.insert(fullpath);
0069       if (!me->wasUpdated())
0070         continue;
0071 
0072       o.lastreq = 0;
0073       o.hash = DQMNet::dqmhash(fullpath.c_str(), fullpath.size());
0074       o.flags = me->data_.flags;
0075       o.version = version;
0076       o.dirname = me->data_.dirname.substr(0, me->data_.dirname.size() - 1);
0077       o.objname = me->data_.objname;
0078       assert(o.rawdata.empty());
0079       assert(o.scalar.empty());
0080       assert(o.qdata.empty());
0081 
0082       // Pack object and reference, scalar and quality data.
0083 
0084       switch (me->kind()) {
0085         case MonitorElement::Kind::INT:
0086         case MonitorElement::Kind::REAL:
0087         case MonitorElement::Kind::STRING:
0088           me->packScalarData(o.scalar, "");
0089           break;
0090         default: {
0091           TBufferFile buffer(TBufferFile::kWrite);
0092           buffer.WriteObject(me->getTH1());
0093           // placeholder for (no longer supported) reference
0094           buffer.WriteObjectAny(nullptr, nullptr);
0095           o.rawdata.resize(buffer.Length());
0096           memcpy(&o.rawdata[0], buffer.Buffer(), buffer.Length());
0097           DQMNet::packQualityData(o.qdata, me->data_.qreports);
0098           break;
0099         }
0100       }
0101 
0102       net_->updateLocalObject(o);
0103       DQMNet::DataBlob().swap(o.rawdata);
0104       std::string().swap(o.scalar);
0105       std::string().swap(o.qdata);
0106       updated = true;
0107     }
0108 
0109     // Find removed contents and clear the network cache.
0110     if (net_->removeLocalExcept(seen))
0111       updated = true;
0112 
0113     // Unlock the network layer.
0114     net_->unlock();
0115 
0116     // Tell network to flush if we updated something.
0117     if (updated)
0118       net_->sendLocalChanges();
0119   }
0120 
0121   lastFlush_ = lat::Time::current().ns() * 1e-9;
0122 }
0123 void DQMService::flush(edm::StreamContext const &sc) {
0124   // Call a function independent to the framework
0125   flushStandalone();
0126 }
0127 
0128 // Disengage the network service.
0129 void DQMService::shutdown() {
0130   // If we have a network, let it go.
0131   if (net_)
0132     net_->shutdown();
0133 }