File indexing completed on 2024-04-06 12:12:10
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010 #include "FWCore/Framework/interface/EventSetupRecordIOVQueue.h"
0011 #include "FWCore/Framework/interface/EventSetupRecordProvider.h"
0012 #include "FWCore/Concurrency/interface/WaitingTask.h"
0013 #include "FWCore/Utilities/interface/EDMException.h"
0014 #include "FWCore/Utilities/interface/thread_safety_macros.h"
0015
0016 #include <exception>
0017
0018 namespace edm {
0019 namespace eventsetup {
0020
0021 EventSetupRecordIOVQueue::EventSetupRecordIOVQueue(unsigned int nConcurrentIOVs)
0022 : iovQueue_(nConcurrentIOVs),
0023 isAvailable_(nConcurrentIOVs),
0024
0025
0026
0027
0028
0029 cacheIdentifier_(1) {
0030 for (auto& i : isAvailable_) {
0031 i.store(true);
0032 }
0033 }
0034
0035 EventSetupRecordIOVQueue::~EventSetupRecordIOVQueue() { assert(endIOVCalled_); }
0036
0037 void EventSetupRecordIOVQueue::endIOVAsync(edm::WaitingTaskHolder iEndTask) {
0038 endIOVTasks_.reset();
0039 if (endIOVTaskHolder_.hasTask()) {
0040 endIOVTasks_.add(std::move(iEndTask));
0041 { WaitingTaskHolder tmp{std::move(endIOVTaskHolder_)}; }
0042 }
0043 endIOVCalled_ = true;
0044 }
0045
0046 void EventSetupRecordIOVQueue::setNewIntervalForAnySubProcess() {
0047 bool newIntervalForAnySubProcess = false;
0048 for (auto const& recordProvider : recordProviders_) {
0049 if (recordProvider->intervalStatus() == EventSetupRecordProvider::IntervalStatus::NewInterval) {
0050 newIntervalForAnySubProcess = true;
0051 break;
0052 }
0053 }
0054 for (auto& recordProvider : recordProviders_) {
0055 recordProvider->setNewIntervalForAnySubProcess(newIntervalForAnySubProcess);
0056 }
0057 }
0058
0059 void EventSetupRecordIOVQueue::checkForNewIOVs(WaitingTaskHolder const& taskToStartAfterIOVInit,
0060 WaitingTaskList& endIOVWaitingTasks,
0061 bool newEventSetupImpl) {
0062 for (auto& recordProvider : recordProviders_) {
0063 if (recordProvider->newIntervalForAnySubProcess()) {
0064 startNewIOVAsync(taskToStartAfterIOVInit, endIOVWaitingTasks);
0065 return;
0066 }
0067 }
0068
0069 for (auto& recordProvider : recordProviders_) {
0070 if (recordProvider->intervalStatus() != EventSetupRecordProvider::IntervalStatus::Invalid) {
0071 endIOVWaitingTasks.add(endIOVTaskHolder_);
0072 break;
0073 }
0074 }
0075
0076 for (auto& recordProvider : recordProviders_) {
0077 recordProvider->continueIOV(newEventSetupImpl);
0078 }
0079 }
0080
0081 void EventSetupRecordIOVQueue::startNewIOVAsync(WaitingTaskHolder const& taskToStartAfterIOVInit,
0082 WaitingTaskList& endIOVWaitingTasks) {
0083 ++cacheIdentifier_;
0084 {
0085
0086
0087 WaitingTaskHolder tmp{std::move(endIOVTaskHolder_)};
0088 }
0089 auto taskHolder = std::make_shared<WaitingTaskHolder>(taskToStartAfterIOVInit);
0090 auto startIOVForRecord =
0091 [this, taskHolder, &endIOVWaitingTasks](edm::LimitedTaskQueue::Resumer iResumer) mutable {
0092
0093 CMS_SA_ALLOW try {
0094 unsigned int iovIndex = 0;
0095 auto nConcurrentIOVs = isAvailable_.size();
0096 for (; iovIndex < nConcurrentIOVs; ++iovIndex) {
0097 bool expected = true;
0098 if (isAvailable_[iovIndex].compare_exchange_strong(expected, false)) {
0099 break;
0100 }
0101 }
0102
0103 if (iovIndex == nConcurrentIOVs) {
0104 throw edm::Exception(edm::errors::LogicError)
0105 << "EventSetupRecordIOVQueue::startNewIOVAsync\n"
0106 << "Couldn't find available IOV slot. This should never happen.\n"
0107 << "Contact a Framework Developer\n";
0108 }
0109 for (auto recordProvider : recordProviders_) {
0110 recordProvider->initializeForNewIOV(iovIndex, cacheIdentifier_);
0111 }
0112
0113 auto endIOVWaitingTask = make_waiting_task([this, iResumer, iovIndex](std::exception_ptr const*) mutable {
0114 for (auto recordProvider : recordProviders_) {
0115 recordProvider->endIOV(iovIndex);
0116 }
0117 isAvailable_[iovIndex].store(true);
0118 iResumer.resume();
0119 endIOVTasks_.doneWaiting(std::exception_ptr());
0120
0121
0122 });
0123 endIOVTaskHolder_ = WaitingTaskHolder{*taskHolder->group(), endIOVWaitingTask};
0124 endIOVWaitingTasks.add(endIOVTaskHolder_);
0125 } catch (...) {
0126 taskHolder->doneWaiting(std::current_exception());
0127 return;
0128 }
0129 taskHolder->doneWaiting(std::exception_ptr{});
0130 };
0131 iovQueue_.pushAndPause(*taskToStartAfterIOVInit.group(), std::move(startIOVForRecord));
0132 }
0133
0134 void EventSetupRecordIOVQueue::addRecProvider(EventSetupRecordProvider* recProvider) {
0135 recordProviders_.push_back(recProvider);
0136 }
0137
0138 }
0139 }