Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:12:10

0001 // -*- C++ -*-
0002 //
0003 // Package:     Framework
0004 // Class  :     EventSetupRecordIOVQueue
0005 //
0006 //
0007 // Author:      W. David Dagenhart
0008 // Created:     22 Feb 2019
0009 
0010 #include "FWCore/Framework/interface/EventSetupRecordIOVQueue.h"
0011 #include "FWCore/Framework/interface/EventSetupRecordProvider.h"
0012 #include "FWCore/Concurrency/interface/WaitingTask.h"
0013 #include "FWCore/Utilities/interface/EDMException.h"
0014 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0015 
0016 #include <exception>
0017 
0018 namespace edm {
0019   namespace eventsetup {
0020 
0021     EventSetupRecordIOVQueue::EventSetupRecordIOVQueue(unsigned int nConcurrentIOVs)
0022         : iovQueue_(nConcurrentIOVs),
0023           isAvailable_(nConcurrentIOVs),
0024           // start valid cacheIdentifiers at 1 and only increment them
0025           // so that the EventSetup system will never return the value 0
0026           // as a cacheIdentifier. Then clients which store a cache identifier
0027           // identifying the state of their own cache can store a 0 when their
0028           // cache is uninitialized.
0029           cacheIdentifier_(1) {
0030       for (auto& i : isAvailable_) {
0031         i.store(true);
0032       }
0033     }
0034 
0035     EventSetupRecordIOVQueue::~EventSetupRecordIOVQueue() { assert(endIOVCalled_); }
0036 
0037     void EventSetupRecordIOVQueue::endIOVAsync(edm::WaitingTaskHolder iEndTask) {
0038       endIOVTasks_.reset();
0039       if (endIOVTaskHolder_.hasTask()) {
0040         endIOVTasks_.add(std::move(iEndTask));
0041         { WaitingTaskHolder tmp{std::move(endIOVTaskHolder_)}; }
0042       }
0043       endIOVCalled_ = true;
0044     }
0045 
0046     void EventSetupRecordIOVQueue::setNewIntervalForAnySubProcess() {
0047       bool newIntervalForAnySubProcess = false;
0048       for (auto const& recordProvider : recordProviders_) {
0049         if (recordProvider->intervalStatus() == EventSetupRecordProvider::IntervalStatus::NewInterval) {
0050           newIntervalForAnySubProcess = true;
0051           break;
0052         }
0053       }
0054       for (auto& recordProvider : recordProviders_) {
0055         recordProvider->setNewIntervalForAnySubProcess(newIntervalForAnySubProcess);
0056       }
0057     }
0058 
0059     void EventSetupRecordIOVQueue::checkForNewIOVs(WaitingTaskHolder const& taskToStartAfterIOVInit,
0060                                                    WaitingTaskList& endIOVWaitingTasks,
0061                                                    bool newEventSetupImpl) {
0062       for (auto& recordProvider : recordProviders_) {
0063         if (recordProvider->newIntervalForAnySubProcess()) {
0064           startNewIOVAsync(taskToStartAfterIOVInit, endIOVWaitingTasks);
0065           return;
0066         }
0067       }
0068 
0069       for (auto& recordProvider : recordProviders_) {
0070         if (recordProvider->intervalStatus() != EventSetupRecordProvider::IntervalStatus::Invalid) {
0071           endIOVWaitingTasks.add(endIOVTaskHolder_);
0072           break;
0073         }
0074       }
0075 
0076       for (auto& recordProvider : recordProviders_) {
0077         recordProvider->continueIOV(newEventSetupImpl);
0078       }
0079     }
0080 
0081     void EventSetupRecordIOVQueue::startNewIOVAsync(WaitingTaskHolder const& taskToStartAfterIOVInit,
0082                                                     WaitingTaskList& endIOVWaitingTasks) {
0083       ++cacheIdentifier_;
0084       {
0085         // Let the old IOV end when all the lumis using it are done.
0086         // otherwise we'll deadlock when there is only one thread.
0087         WaitingTaskHolder tmp{std::move(endIOVTaskHolder_)};
0088       }
0089       auto taskHolder = std::make_shared<WaitingTaskHolder>(taskToStartAfterIOVInit);
0090       auto startIOVForRecord =
0091           [this, taskHolder, &endIOVWaitingTasks](edm::LimitedTaskQueue::Resumer iResumer) mutable {
0092             // Caught exception is propagated via WaitingTaskHolder
0093             CMS_SA_ALLOW try {
0094               unsigned int iovIndex = 0;
0095               auto nConcurrentIOVs = isAvailable_.size();
0096               for (; iovIndex < nConcurrentIOVs; ++iovIndex) {
0097                 bool expected = true;
0098                 if (isAvailable_[iovIndex].compare_exchange_strong(expected, false)) {
0099                   break;
0100                 }
0101               }
0102               // Should never fail, just a sanity check
0103               if (iovIndex == nConcurrentIOVs) {
0104                 throw edm::Exception(edm::errors::LogicError)
0105                     << "EventSetupRecordIOVQueue::startNewIOVAsync\n"
0106                     << "Couldn't find available IOV slot. This should never happen.\n"
0107                     << "Contact a Framework Developer\n";
0108               }
0109               for (auto recordProvider : recordProviders_) {
0110                 recordProvider->initializeForNewIOV(iovIndex, cacheIdentifier_);
0111               }
0112 
0113               auto endIOVWaitingTask = make_waiting_task([this, iResumer, iovIndex](std::exception_ptr const*) mutable {
0114                 for (auto recordProvider : recordProviders_) {
0115                   recordProvider->endIOV(iovIndex);
0116                 }
0117                 isAvailable_[iovIndex].store(true);
0118                 iResumer.resume();
0119                 endIOVTasks_.doneWaiting(std::exception_ptr());
0120                 // There is nothing in this task to catch an exception
0121                 // because it is extremely unlikely to throw.
0122               });
0123               endIOVTaskHolder_ = WaitingTaskHolder{*taskHolder->group(), endIOVWaitingTask};
0124               endIOVWaitingTasks.add(endIOVTaskHolder_);
0125             } catch (...) {
0126               taskHolder->doneWaiting(std::current_exception());
0127               return;
0128             }
0129             taskHolder->doneWaiting(std::exception_ptr{});
0130           };
0131       iovQueue_.pushAndPause(*taskToStartAfterIOVInit.group(), std::move(startIOVForRecord));
0132     }
0133 
0134     void EventSetupRecordIOVQueue::addRecProvider(EventSetupRecordProvider* recProvider) {
0135       recordProviders_.push_back(recProvider);
0136     }
0137 
0138   }  // namespace eventsetup
0139 }  // namespace edm