File indexing completed on 2024-12-01 23:40:07
0001 import sys
0002 import json
0003 import os
0004 import copy
0005 import multiprocessing
0006 import time
0007 import re
0008
0009 MAXWORKFLOWLENGTH = 81
0010
0011 def performInjectionOptionTest(opt):
0012 if opt.show:
0013 print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
0014 sys.exit(-1)
0015 if opt.wmcontrol=='init':
0016
0017 opt.nProcs=0
0018 if opt.wmcontrol=='test':
0019
0020 opt.dryRun=True
0021 if opt.wmcontrol=='submit' and opt.nProcs==0:
0022 print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
0023 sys.exit(-1)
0024 if opt.wmcontrol=='force':
0025 print("This is an expert setting, you'd better know what you're doing")
0026 opt.dryRun=True
0027
0028 def upload_to_couch_oneArg(arguments):
0029 from modules.wma import upload_to_couch
0030 (filePath,labelInCouch,user,group,where) = arguments
0031 cacheId=upload_to_couch(filePath,
0032 labelInCouch,
0033 user,
0034 group,
0035 test_mode=False,
0036 url=where)
0037 return cacheId
0038
0039
0040 class MatrixInjector(object):
0041
0042 def __init__(self,opt,mode='init',options=''):
0043 self.count=1040
0044
0045 self.dqmgui=None
0046 self.wmagent=None
0047 for k in options.split(','):
0048 if k.startswith('dqm:'):
0049 self.dqmgui=k.split(':',1)[-1]
0050 elif k.startswith('wma:'):
0051 self.wmagent=k.split(':',1)[-1]
0052
0053 self.testMode=((mode!='submit') and (mode!='force'))
0054 self.version =1
0055 self.keep = opt.keep
0056 self.memoryOffset = opt.memoryOffset
0057 self.memPerCore = opt.memPerCore
0058 self.numberEventsInLuminosityBlock = opt.numberEventsInLuminosityBlock
0059 self.numberOfStreams = 0
0060 if(opt.nStreams>0):
0061 self.numberOfStreams = opt.nStreams
0062 self.batchName = ''
0063 self.batchTime = str(int(time.time()))
0064 if(opt.batchName):
0065 self.batchName = '__'+opt.batchName+'-'+self.batchTime
0066
0067
0068
0069
0070
0071 self.RequiresGPU = opt.gpu
0072 self.GPUMemoryMB = opt.GPUMemoryMB
0073 self.CUDACapabilities = opt.CUDACapabilities
0074 self.CUDARuntime = opt.CUDARuntime
0075
0076 self.GPUName = opt.GPUName
0077 self.CUDADriverVersion = opt.CUDADriverVersion
0078 self.CUDARuntimeVersion = opt.CUDARuntimeVersion
0079
0080
0081 if not self.wmagent:
0082
0083 self.wmagent = os.getenv('WMAGENT_REQMGR')
0084
0085 if not self.wmagent:
0086
0087 if not opt.testbed:
0088 self.wmagent = 'cmsweb.cern.ch'
0089 else:
0090 self.wmagent = 'cmsweb-testbed.cern.ch'
0091
0092
0093 if opt.dbsUrl is not None:
0094 self.DbsUrl = opt.dbsUrl
0095 elif os.getenv('CMS_DBSREADER_URL') is not None:
0096 self.DbsUrl = os.getenv('CMS_DBSREADER_URL')
0097 else:
0098
0099 if not opt.testbed:
0100 self.DbsUrl = "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader"
0101 else:
0102 self.DbsUrl = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader"
0103
0104 if not self.dqmgui:
0105 self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
0106
0107 self.couch = 'https://'+self.wmagent+'/couchdb'
0108
0109 self.couchCache={}
0110 self.user = os.getenv('USER')
0111 self.group = 'ppd'
0112 self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
0113 self.speciallabel=''
0114 if opt.label:
0115 self.speciallabel= '_'+opt.label
0116 self.longWFName = []
0117
0118 if not os.getenv('WMCORE_ROOT'):
0119 print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
0120 if not self.testMode:
0121 print('\n\t QUIT\n')
0122 sys.exit(-18)
0123 else:
0124 print('\n\tFound wmclient\n')
0125
0126 self.defaultChain={
0127 "RequestType" : "TaskChain",
0128 "SubRequestType" : "RelVal",
0129 "RequestPriority": 500000,
0130 "Requestor": self.user,
0131 "Group": self.group,
0132 "CMSSWVersion": os.getenv('CMSSW_VERSION'),
0133 "Campaign": os.getenv('CMSSW_VERSION'),
0134 "ScramArch": os.getenv('SCRAM_ARCH'),
0135 "ProcessingVersion": self.version,
0136 "GlobalTag": None,
0137 "ConfigCacheUrl": self.couch,
0138 "DbsUrl": self.DbsUrl,
0139
0140
0141 "TaskChain" : None,
0142 "nowmTasklist" : [],
0143 "Multicore" : 1,
0144 "Memory" : 3000,
0145 "SizePerEvent" : 1234,
0146 "TimePerEvent" : 10,
0147 "PrepID": os.getenv('CMSSW_VERSION')
0148 }
0149
0150 self.defaultHarvest={
0151 "EnableHarvesting" : "True",
0152 "DQMUploadUrl" : self.dqmgui,
0153 "DQMConfigCacheID" : None,
0154 "Multicore" : 1
0155 }
0156
0157 self.defaultScratch={
0158 "TaskName" : None,
0159 "ConfigCacheID" : None,
0160 "GlobalTag": None,
0161 "SplittingAlgo" : "EventBased",
0162 "EventsPerJob" : None,
0163 "EventsPerLumi" : None,
0164 "RequestNumEvents" : None,
0165 "Seeding" : "AutomaticSeeding",
0166 "PrimaryDataset" : None,
0167 "nowmIO": {},
0168 "Multicore" : opt.nThreads,
0169 "EventStreams": self.numberOfStreams,
0170 "KeepOutput" : False
0171 }
0172 self.defaultInput={
0173 "TaskName" : "DigiHLT",
0174 "ConfigCacheID" : None,
0175 "GlobalTag": None,
0176 "InputDataset" : None,
0177 "SplittingAlgo" : "LumiBased",
0178 "LumisPerJob" : 10,
0179 "nowmIO": {},
0180 "Multicore" : opt.nThreads,
0181 "EventStreams": self.numberOfStreams,
0182 "KeepOutput" : False
0183 }
0184 self.defaultTask={
0185 "TaskName" : None,
0186 "InputTask" : None,
0187 "InputFromOutputModule" : None,
0188 "ConfigCacheID" : None,
0189 "GlobalTag": None,
0190 "SplittingAlgo" : "LumiBased",
0191 "LumisPerJob" : 10,
0192 "nowmIO": {},
0193 "Multicore" : opt.nThreads,
0194 "EventStreams": self.numberOfStreams,
0195 "KeepOutput" : False,
0196 "RequiresGPU" : None,
0197 "GPUParams": None
0198 }
0199 self.defaultGPUParams={
0200 "GPUMemoryMB": self.GPUMemoryMB,
0201 "CUDACapabilities": self.CUDACapabilities,
0202 "CUDARuntime": self.CUDARuntime
0203 }
0204 if self.GPUName: self.defaultGPUParams.update({"GPUName": self.GPUName})
0205 if self.CUDADriverVersion: self.defaultGPUParams.update({"CUDADriverVersion": self.CUDADriverVersion})
0206 if self.CUDARuntimeVersion: self.defaultGPUParams.update({"CUDARuntimeVersion": self.CUDARuntimeVersion})
0207
0208 self.chainDicts={}
0209
0210 @staticmethod
0211 def get_wmsplit():
0212 """
0213 Return a "wmsplit" dictionary that contain non-default LumisPerJob values
0214 """
0215 wmsplit = {}
0216 try:
0217 wmsplit['DIGIHI'] = 5
0218 wmsplit['RECOHI'] = 5
0219 wmsplit['HLTD'] = 5
0220 wmsplit['RECODreHLT'] = 2
0221 wmsplit['DIGIPU'] = 4
0222 wmsplit['DIGIPU1'] = 4
0223 wmsplit['RECOPU1'] = 1
0224 wmsplit['DIGIUP15_PU50'] = 1
0225 wmsplit['RECOUP15_PU50'] = 1
0226 wmsplit['DIGIUP15_PU25'] = 1
0227 wmsplit['RECOUP15_PU25'] = 1
0228 wmsplit['DIGIUP15_PU25HS'] = 1
0229 wmsplit['RECOUP15_PU25HS'] = 1
0230 wmsplit['DIGIHIMIX'] = 5
0231 wmsplit['RECOHIMIX'] = 5
0232 wmsplit['RECODSplit'] = 1
0233 wmsplit['SingleMuPt10_UP15_ID'] = 1
0234 wmsplit['DIGIUP15_ID'] = 1
0235 wmsplit['RECOUP15_ID'] = 1
0236 wmsplit['TTbar_13_ID'] = 1
0237 wmsplit['SingleMuPt10FS_ID'] = 1
0238 wmsplit['TTbarFS_ID'] = 1
0239 wmsplit['RECODR2_50nsreHLT'] = 5
0240 wmsplit['RECODR2_25nsreHLT'] = 5
0241 wmsplit['RECODR2_2016reHLT'] = 5
0242 wmsplit['RECODR2_50nsreHLT_HIPM'] = 5
0243 wmsplit['RECODR2_25nsreHLT_HIPM'] = 5
0244 wmsplit['RECODR2_2016reHLT_HIPM'] = 1
0245 wmsplit['RECODR2_2016reHLT_skimSingleMu'] = 1
0246 wmsplit['RECODR2_2016reHLT_skimDoubleEG'] = 1
0247 wmsplit['RECODR2_2016reHLT_skimMuonEG'] = 1
0248 wmsplit['RECODR2_2016reHLT_skimJetHT'] = 1
0249 wmsplit['RECODR2_2016reHLT_skimMET'] = 1
0250 wmsplit['RECODR2_2016reHLT_skimSinglePh'] = 1
0251 wmsplit['RECODR2_2016reHLT_skimMuOnia'] = 1
0252 wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM'] = 1
0253 wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM'] = 1
0254 wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM'] = 1
0255 wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM'] = 1
0256 wmsplit['RECODR2_2016reHLT_skimMET_HIPM'] = 1
0257 wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM'] = 1
0258 wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM'] = 1
0259 wmsplit['RECODR2_2017reHLT_Prompt'] = 1
0260 wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi'] = 1
0261 wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt'] = 1
0262 wmsplit['RECODR2_2017reHLT_skimMET_Prompt'] = 1
0263 wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt'] = 1
0264 wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM'] = 1
0265 wmsplit['RECODR2_2018reHLT_Prompt'] = 1
0266 wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi'] = 1
0267 wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt'] = 1
0268 wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt'] = 1
0269 wmsplit['RECODR2_2018reHLT_skimMET_Prompt'] = 1
0270 wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt'] = 1
0271 wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM'] = 1
0272 wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt'] = 1
0273 wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt'] = 1
0274 wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail'] = 1
0275 wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig'] = 1
0276 wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt'] = 1
0277 wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt'] = 1
0278 wmsplit['RECODR2_2018reHLT_ZBPrompt'] = 1
0279 wmsplit['RECODR2_2018reHLT_Offline'] = 1
0280 wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi'] = 1
0281 wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline'] = 1
0282 wmsplit['RECODR2_2018reHLT_skimJetHT_Offline'] = 1
0283 wmsplit['RECODR2_2018reHLT_skimMET_Offline'] = 1
0284 wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline'] = 1
0285 wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM'] = 1
0286 wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline'] = 1
0287 wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline'] = 1
0288 wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail'] = 1
0289 wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig'] = 1
0290 wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline'] = 1
0291 wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline'] = 1
0292 wmsplit['RECODR2_2018reHLT_ZBOffline'] = 1
0293 wmsplit['HLTDR2_50ns'] = 1
0294 wmsplit['HLTDR2_25ns'] = 1
0295 wmsplit['HLTDR2_2016'] = 1
0296 wmsplit['HLTDR2_2017'] = 1
0297 wmsplit['HLTDR2_2018'] = 1
0298 wmsplit['HLTDR2_2018_BadHcalMitig'] = 1
0299 wmsplit['Hadronizer'] = 1
0300 wmsplit['DIGIUP15'] = 1
0301 wmsplit['RECOUP15'] = 1
0302 wmsplit['RECOAODUP15'] = 5
0303 wmsplit['DBLMINIAODMCUP15NODQM'] = 5
0304 wmsplit['Digi'] = 5
0305 wmsplit['Reco'] = 5
0306 wmsplit['DigiPU'] = 1
0307 wmsplit['RecoPU'] = 1
0308 wmsplit['RECOHID11'] = 1
0309 wmsplit['DIGIUP17'] = 1
0310 wmsplit['RECOUP17'] = 1
0311 wmsplit['DIGIUP17_PU25'] = 1
0312 wmsplit['RECOUP17_PU25'] = 1
0313 wmsplit['DIGICOS_UP16'] = 1
0314 wmsplit['RECOCOS_UP16'] = 1
0315 wmsplit['DIGICOS_UP17'] = 1
0316 wmsplit['RECOCOS_UP17'] = 1
0317 wmsplit['DIGICOS_UP18'] = 1
0318 wmsplit['RECOCOS_UP18'] = 1
0319 wmsplit['DIGICOS_UP21'] = 1
0320 wmsplit['RECOCOS_UP21'] = 1
0321 wmsplit['HYBRIDRepackHI2015VR'] = 1
0322 wmsplit['HYBRIDZSHI2015'] = 1
0323 wmsplit['RECOHID15'] = 1
0324 wmsplit['RECOHID18'] = 1
0325 wmsplit['HLTDR3_2023'] = 1
0326 wmsplit['RECONANORUN3_reHLT'] = 1
0327 wmsplit['HARVESTRUN3'] = 1
0328
0329 from .upgradeWorkflowComponents import upgradeKeys
0330 for key in upgradeKeys['Run4']:
0331 if 'PU' not in key:
0332 continue
0333
0334 wmsplit['DigiTriggerPU_' + key] = 1
0335 wmsplit['RecoGlobalPU_' + key] = 1
0336
0337 except Exception as ex:
0338 print('Exception while building a wmsplit dictionary: %s' % (str(ex)))
0339 return {}
0340
0341 return wmsplit
0342
0343 def prepare(self, mReader, directories, mode='init'):
0344 wmsplit = MatrixInjector.get_wmsplit()
0345 acqEra=False
0346 for (n,dir) in directories.items():
0347 chainDict=copy.deepcopy(self.defaultChain)
0348 print("inspecting",dir)
0349 nextHasDSInput=None
0350 for (x,s) in mReader.workFlowSteps.items():
0351
0352
0353 if x[0]==n:
0354
0355
0356 index=0
0357 splitForThisWf=None
0358 thisLabel=self.speciallabel
0359
0360 if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
0361 chainDict['TimePerEvent']=0.01
0362 thisLabel=thisLabel+"_gen"
0363
0364 if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
0365 thisLabel=thisLabel+"_dblMiniAOD"
0366 processStrPrefix=''
0367 setPrimaryDs=None
0368 nanoedmGT=''
0369 for step in s[3]:
0370
0371 if 'INPUT' in step or (not isinstance(s[2][index],str)):
0372 nextHasDSInput=s[2][index]
0373
0374 else:
0375
0376 if (index==0):
0377
0378 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
0379 try:
0380 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0381 except:
0382 print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0383 return -15
0384
0385 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
0386 if not '--relval' in s[2][index]:
0387 print('Impossible to create task from scratch without splitting information with --relval')
0388 return -12
0389 else:
0390 arg=s[2][index].split()
0391 ns=list(map(int,arg[len(arg) - arg[-1::-1].index('--relval')].split(',')))
0392 chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
0393 chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
0394 chainDict['nowmTasklist'][-1]['EventsPerLumi'] = ns[1]
0395
0396 if 'numberEventsInLuminosityBlock' in s[2][index]:
0397 nEventsInLuminosityBlock = re.findall('process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\\(([ 0-9 ]*)\\)', s[2][index],re.DOTALL)
0398 if nEventsInLuminosityBlock[-1].isdigit() and int(nEventsInLuminosityBlock[-1]) < ns[1]:
0399 chainDict['nowmTasklist'][-1]['EventsPerLumi'] = int(nEventsInLuminosityBlock[-1])
0400 if(self.numberEventsInLuminosityBlock > 0 and self.numberEventsInLuminosityBlock <= ns[1]):
0401 chainDict['nowmTasklist'][-1]['EventsPerLumi'] = self.numberEventsInLuminosityBlock
0402 if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
0403 thisLabel+='_FastSim'
0404 if 'lhe' in s[2][index] in s[2][index]:
0405 chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
0406
0407 elif nextHasDSInput:
0408 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
0409 try:
0410 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0411 except:
0412 print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0413 return -15
0414 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
0415 if ('DQMHLTonRAWAOD' in step) :
0416 chainDict['nowmTasklist'][-1]['IncludeParents']=True
0417 splitForThisWf=nextHasDSInput.split
0418 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0419 if step in wmsplit:
0420 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0421
0422 if len(nextHasDSInput.run):
0423 chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
0424 if len(nextHasDSInput.ls):
0425 chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
0426
0427 if '--data' in s[2][index] and nextHasDSInput.label:
0428 thisLabel+='_RelVal_%s'%nextHasDSInput.label
0429 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
0430 print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
0431 processStrPrefix='PU_'
0432 setPrimaryDs = 'RelVal'+s[1].split('+')[0]
0433 if setPrimaryDs:
0434 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0435 nextHasDSInput=None
0436 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0437 chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0438 chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0439 else:
0440
0441 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
0442 try:
0443 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0444 except:
0445 print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0446 return -15
0447 if splitForThisWf:
0448 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0449 if step in wmsplit:
0450 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0451 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0452 chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0453 chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0454
0455
0456 if 'Hadronizer' in step:
0457 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
0458
0459
0460 chainDict['nowmTasklist'][-1]['TaskName']=step
0461 if setPrimaryDs:
0462 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0463 chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
0464 chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0465 chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0466 if 'NANOEDM' in step :
0467 nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0468 if 'NANOMERGE' in step :
0469 chainDict['GlobalTag'] = nanoedmGT
0470 if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
0471 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
0472 if '--pileup ' in s[2][index]:
0473 pileupString = s[2][index].split()[s[2][index].split().index('--pileup')+1]
0474 processStrPrefix='PU_'
0475 if pileupString.find('25ns') > 0 :
0476 processStrPrefix='PU25ns_'
0477 elif pileupString.find('50ns') > 0 :
0478 processStrPrefix='PU50ns_'
0479 elif 'nopu' in pileupString.lower():
0480 processStrPrefix=''
0481 if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]:
0482 if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
0483 processStrPrefix='PUpmx25ns_'
0484 elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
0485 processStrPrefix='PUpmx50ns_'
0486
0487 if acqEra:
0488
0489 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
0490 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0491 if 'NANOMERGE' in step :
0492 chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0493 else:
0494
0495 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
0496 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0497 if 'NANOMERGE' in step :
0498 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0499
0500 if (self.batchName):
0501 chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
0502
0503
0504 if ('DBLMINIAODMCUP15NODQM' in step):
0505 chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD'
0506
0507 if( chainDict['nowmTasklist'][-1]['Multicore'] ):
0508
0509
0510
0511
0512
0513
0514 chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 ) * self.memPerCore
0515
0516 index+=1
0517
0518 chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
0519 if processStrPrefix or thisLabel:
0520 chainDict['RequestString']+='_'+processStrPrefix+thisLabel
0521
0522 self.candidateWFName = self.user+'_'+chainDict['RequestString']
0523 if (len(self.candidateWFName)>MAXWORKFLOWLENGTH):
0524 self.longWFName.append(self.candidateWFName)
0525
0526
0527 chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
0528 if(self.batchName):
0529 chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
0530 if( 'HIN' in self.batchName ):
0531 chainDict['SubRequestType'] = "HIRelVal"
0532
0533
0534 import pprint
0535
0536
0537
0538 for i_second in reversed(range(len(chainDict['nowmTasklist']))):
0539 t_second=chainDict['nowmTasklist'][i_second]
0540
0541 if 'primary' in t_second['nowmIO']:
0542
0543 primary=t_second['nowmIO']['primary'][0].replace('file:','')
0544 for i_input in reversed(range(0,i_second)):
0545 t_input=chainDict['nowmTasklist'][i_input]
0546 for (om,o) in t_input['nowmIO'].items():
0547 if primary in o:
0548
0549
0550 if (len(t_input['TaskName'])>50):
0551 if (t_input['TaskName'].find('GenSim') != -1):
0552 t_input['TaskName'] = 'GenSimFull'
0553 if (t_input['TaskName'].find('Hadronizer') != -1):
0554 t_input['TaskName'] = 'HadronizerFull'
0555 t_second['InputTask'] = t_input['TaskName']
0556 t_second['InputFromOutputModule'] = om
0557
0558 if t_second['TaskName'].startswith('HARVEST'):
0559 chainDict.update(copy.deepcopy(self.defaultHarvest))
0560 if "_RD" in t_second['TaskName']:
0561 chainDict['DQMHarvestUnit'] = "multiRun"
0562 chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
0563
0564
0565
0566 break
0567
0568
0569
0570
0571
0572
0573
0574
0575
0576
0577
0578
0579
0580
0581 if acqEra:
0582 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
0583 chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
0584 else:
0585 chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
0586 chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
0587
0588
0589
0590 chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
0591
0592
0593 itask=0
0594 if self.keep:
0595 for i in self.keep:
0596 if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
0597 chainDict['nowmTasklist'][i]['KeepOutput']=True
0598 for (i,t) in enumerate(chainDict['nowmTasklist']):
0599 if t['TaskName'].startswith('HARVEST'):
0600 continue
0601 if not self.keep:
0602 t['KeepOutput']=True
0603 elif t['TaskName'] in self.keep:
0604 t['KeepOutput']=True
0605 if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
0606 t['KeepOutput']=False
0607 t.pop('nowmIO')
0608 itask+=1
0609 chainDict['Task%d'%(itask)]=t
0610
0611
0612
0613
0614
0615
0616 chainDict['TaskChain']=itask
0617
0618 chainDict.pop('nowmTasklist')
0619 self.chainDicts[n]=chainDict
0620
0621
0622 return 0
0623
0624 def uploadConf(self,filePath,label,where):
0625 labelInCouch=self.label+'_'+label
0626 cacheName=filePath.split('/')[-1]
0627 if self.testMode:
0628 self.count+=1
0629 print('\tFake upload of',filePath,'to couch with label',labelInCouch)
0630 return self.count
0631 else:
0632 try:
0633 from modules.wma import upload_to_couch,DATABASE_NAME
0634 except:
0635 print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0636 print('\n\t QUIT\n')
0637 sys.exit(-16)
0638
0639 if cacheName in self.couchCache:
0640 print("Not re-uploading",filePath,"to",where,"for",label)
0641 cacheId=self.couchCache[cacheName]
0642 else:
0643 print("Loading",filePath,"to",where,"for",label)
0644
0645 pool = multiprocessing.Pool(1)
0646 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
0647 cacheId = cacheIds[0]
0648 self.couchCache[cacheName]=cacheId
0649 return cacheId
0650
0651 def upload(self):
0652 for (n,d) in self.chainDicts.items():
0653 for it in d:
0654 if it.startswith("Task") and it!='TaskChain':
0655
0656 couchID=self.uploadConf(d[it]['ConfigCacheID'],
0657 str(n)+d[it]['TaskName'],
0658 d['ConfigCacheUrl']
0659 )
0660 print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
0661 d[it]['ConfigCacheID']=couchID
0662 if it =='DQMConfigCacheID':
0663 couchID=self.uploadConf(d['DQMConfigCacheID'],
0664 str(n)+'harvesting',
0665 d['ConfigCacheUrl']
0666 )
0667 print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
0668 d['DQMConfigCacheID']=couchID
0669
0670
0671 def submit(self):
0672 try:
0673 from modules.wma import makeRequest,approveRequest
0674 from wmcontrol import random_sleep
0675 print('\n\tFound wmcontrol\n')
0676 except:
0677 print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0678 if not self.testMode:
0679 print('\n\t QUIT\n')
0680 sys.exit(-17)
0681
0682 import pprint
0683 for (n,d) in self.chainDicts.items():
0684 if self.testMode:
0685 print("Only viewing request",n)
0686 print(pprint.pprint(d))
0687 else:
0688
0689 print("For eyes before submitting",n)
0690 print(pprint.pprint(d))
0691 print("Submitting",n,"...........")
0692 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
0693 print("...........",n,"submitted")
0694 random_sleep()
0695 if self.testMode and len(self.longWFName)>0:
0696 print("\n*** WARNING: "+str(len(self.longWFName))+" workflows have too long names for submission (>"+str(MAXWORKFLOWLENGTH)+ "characters) ***")
0697 print('\n'.join(self.longWFName))