File indexing completed on 2023-03-17 10:49:04
0001 from __future__ import print_function
0002 import sys
0003 import json
0004 import os
0005 import copy
0006 import multiprocessing
0007 import time
0008 import re
0009
0010 MAXWORKFLOWLENGTH = 81
0011
0012 def performInjectionOptionTest(opt):
0013 if opt.show:
0014 print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
0015 sys.exit(-1)
0016 if opt.wmcontrol=='init':
0017
0018 opt.nProcs=0
0019 if opt.wmcontrol=='test':
0020
0021 opt.dryRun=True
0022 if opt.wmcontrol=='submit' and opt.nProcs==0:
0023 print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
0024 sys.exit(-1)
0025 if opt.wmcontrol=='force':
0026 print("This is an expert setting, you'd better know what you're doing")
0027 opt.dryRun=True
0028
0029 def upload_to_couch_oneArg(arguments):
0030 from modules.wma import upload_to_couch
0031 (filePath,labelInCouch,user,group,where) = arguments
0032 cacheId=upload_to_couch(filePath,
0033 labelInCouch,
0034 user,
0035 group,
0036 test_mode=False,
0037 url=where)
0038 return cacheId
0039
0040
0041 class MatrixInjector(object):
0042
0043 def __init__(self,opt,mode='init',options=''):
0044 self.count=1040
0045
0046 self.dqmgui=None
0047 self.wmagent=None
0048 for k in options.split(','):
0049 if k.startswith('dqm:'):
0050 self.dqmgui=k.split(':',1)[-1]
0051 elif k.startswith('wma:'):
0052 self.wmagent=k.split(':',1)[-1]
0053
0054 self.testMode=((mode!='submit') and (mode!='force'))
0055 self.version =1
0056 self.keep = opt.keep
0057 self.memoryOffset = opt.memoryOffset
0058 self.memPerCore = opt.memPerCore
0059 self.numberEventsInLuminosityBlock = opt.numberEventsInLuminosityBlock
0060 self.numberOfStreams = 0
0061 if(opt.nStreams>0):
0062 self.numberOfStreams = opt.nStreams
0063 self.batchName = ''
0064 self.batchTime = str(int(time.time()))
0065 if(opt.batchName):
0066 self.batchName = '__'+opt.batchName+'-'+self.batchTime
0067
0068
0069
0070
0071
0072 self.RequiresGPU = opt.gpu
0073 self.GPUMemoryMB = opt.GPUMemoryMB
0074 self.CUDACapabilities = opt.CUDACapabilities
0075 self.CUDARuntime = opt.CUDARuntime
0076
0077 self.GPUName = opt.GPUName
0078 self.CUDADriverVersion = opt.CUDADriverVersion
0079 self.CUDARuntimeVersion = opt.CUDARuntimeVersion
0080
0081
0082 if not self.wmagent:
0083
0084 self.wmagent = os.getenv('WMAGENT_REQMGR')
0085
0086 if not self.wmagent:
0087
0088 if not opt.testbed:
0089 self.wmagent = 'cmsweb.cern.ch'
0090 else:
0091 self.wmagent = 'cmsweb-testbed.cern.ch'
0092
0093
0094 if opt.dbsUrl is not None:
0095 self.DbsUrl = opt.dbsUrl
0096 elif os.getenv('CMS_DBSREADER_URL') is not None:
0097 self.DbsUrl = os.getenv('CMS_DBSREADER_URL')
0098 else:
0099
0100 if not opt.testbed:
0101 self.DbsUrl = "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader"
0102 else:
0103 self.DbsUrl = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader"
0104
0105 if not self.dqmgui:
0106 self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
0107
0108 self.couch = 'https://'+self.wmagent+'/couchdb'
0109
0110 self.couchCache={}
0111 self.user = os.getenv('USER')
0112 self.group = 'ppd'
0113 self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
0114 self.speciallabel=''
0115 if opt.label:
0116 self.speciallabel= '_'+opt.label
0117 self.longWFName = []
0118
0119 if not os.getenv('WMCORE_ROOT'):
0120 print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
0121 if not self.testMode:
0122 print('\n\t QUIT\n')
0123 sys.exit(-18)
0124 else:
0125 print('\n\tFound wmclient\n')
0126
0127 self.defaultChain={
0128 "RequestType" : "TaskChain",
0129 "SubRequestType" : "RelVal",
0130 "RequestPriority": 500000,
0131 "Requestor": self.user,
0132 "Group": self.group,
0133 "CMSSWVersion": os.getenv('CMSSW_VERSION'),
0134 "Campaign": os.getenv('CMSSW_VERSION'),
0135 "ScramArch": os.getenv('SCRAM_ARCH'),
0136 "ProcessingVersion": self.version,
0137 "GlobalTag": None,
0138 "ConfigCacheUrl": self.couch,
0139 "DbsUrl": self.DbsUrl,
0140
0141
0142 "TaskChain" : None,
0143 "nowmTasklist" : [],
0144 "Multicore" : 1,
0145 "Memory" : 3000,
0146 "SizePerEvent" : 1234,
0147 "TimePerEvent" : 10,
0148 "PrepID": os.getenv('CMSSW_VERSION')
0149 }
0150
0151 self.defaultHarvest={
0152 "EnableHarvesting" : "True",
0153 "DQMUploadUrl" : self.dqmgui,
0154 "DQMConfigCacheID" : None,
0155 "Multicore" : 1
0156 }
0157
0158 self.defaultScratch={
0159 "TaskName" : None,
0160 "ConfigCacheID" : None,
0161 "GlobalTag": None,
0162 "SplittingAlgo" : "EventBased",
0163 "EventsPerJob" : None,
0164 "EventsPerLumi" : None,
0165 "RequestNumEvents" : None,
0166 "Seeding" : "AutomaticSeeding",
0167 "PrimaryDataset" : None,
0168 "nowmIO": {},
0169 "Multicore" : opt.nThreads,
0170 "EventStreams": self.numberOfStreams,
0171 "KeepOutput" : False
0172 }
0173 self.defaultInput={
0174 "TaskName" : "DigiHLT",
0175 "ConfigCacheID" : None,
0176 "GlobalTag": None,
0177 "InputDataset" : None,
0178 "SplittingAlgo" : "LumiBased",
0179 "LumisPerJob" : 10,
0180 "nowmIO": {},
0181 "Multicore" : opt.nThreads,
0182 "EventStreams": self.numberOfStreams,
0183 "KeepOutput" : False
0184 }
0185 self.defaultTask={
0186 "TaskName" : None,
0187 "InputTask" : None,
0188 "InputFromOutputModule" : None,
0189 "ConfigCacheID" : None,
0190 "GlobalTag": None,
0191 "SplittingAlgo" : "LumiBased",
0192 "LumisPerJob" : 10,
0193 "nowmIO": {},
0194 "Multicore" : opt.nThreads,
0195 "EventStreams": self.numberOfStreams,
0196 "KeepOutput" : False,
0197 "RequiresGPU" : None,
0198 "GPUParams": None
0199 }
0200 self.defaultGPUParams={
0201 "GPUMemoryMB": self.GPUMemoryMB,
0202 "CUDACapabilities": self.CUDACapabilities,
0203 "CUDARuntime": self.CUDARuntime
0204 }
0205 if self.GPUName: self.defaultGPUParams.update({"GPUName": self.GPUName})
0206 if self.CUDADriverVersion: self.defaultGPUParams.update({"CUDADriverVersion": self.CUDADriverVersion})
0207 if self.CUDARuntimeVersion: self.defaultGPUParams.update({"CUDARuntimeVersion": self.CUDARuntimeVersion})
0208
0209 self.chainDicts={}
0210
0211 @staticmethod
0212 def get_wmsplit():
0213 """
0214 Return a "wmsplit" dictionary that contain non-default LumisPerJob values
0215 """
0216 wmsplit = {}
0217 try:
0218 wmsplit['DIGIHI'] = 5
0219 wmsplit['RECOHI'] = 5
0220 wmsplit['HLTD'] = 5
0221 wmsplit['RECODreHLT'] = 2
0222 wmsplit['DIGIPU'] = 4
0223 wmsplit['DIGIPU1'] = 4
0224 wmsplit['RECOPU1'] = 1
0225 wmsplit['DIGIUP15_PU50'] = 1
0226 wmsplit['RECOUP15_PU50'] = 1
0227 wmsplit['DIGIUP15_PU25'] = 1
0228 wmsplit['RECOUP15_PU25'] = 1
0229 wmsplit['DIGIUP15_PU25HS'] = 1
0230 wmsplit['RECOUP15_PU25HS'] = 1
0231 wmsplit['DIGIHIMIX'] = 5
0232 wmsplit['RECOHIMIX'] = 5
0233 wmsplit['RECODSplit'] = 1
0234 wmsplit['SingleMuPt10_UP15_ID'] = 1
0235 wmsplit['DIGIUP15_ID'] = 1
0236 wmsplit['RECOUP15_ID'] = 1
0237 wmsplit['TTbar_13_ID'] = 1
0238 wmsplit['SingleMuPt10FS_ID'] = 1
0239 wmsplit['TTbarFS_ID'] = 1
0240 wmsplit['RECODR2_50nsreHLT'] = 5
0241 wmsplit['RECODR2_25nsreHLT'] = 5
0242 wmsplit['RECODR2_2016reHLT'] = 5
0243 wmsplit['RECODR2_50nsreHLT_HIPM'] = 5
0244 wmsplit['RECODR2_25nsreHLT_HIPM'] = 5
0245 wmsplit['RECODR2_2016reHLT_HIPM'] = 1
0246 wmsplit['RECODR2_2016reHLT_skimSingleMu'] = 1
0247 wmsplit['RECODR2_2016reHLT_skimDoubleEG'] = 1
0248 wmsplit['RECODR2_2016reHLT_skimMuonEG'] = 1
0249 wmsplit['RECODR2_2016reHLT_skimJetHT'] = 1
0250 wmsplit['RECODR2_2016reHLT_skimMET'] = 1
0251 wmsplit['RECODR2_2016reHLT_skimSinglePh'] = 1
0252 wmsplit['RECODR2_2016reHLT_skimMuOnia'] = 1
0253 wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM'] = 1
0254 wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM'] = 1
0255 wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM'] = 1
0256 wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM'] = 1
0257 wmsplit['RECODR2_2016reHLT_skimMET_HIPM'] = 1
0258 wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM'] = 1
0259 wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM'] = 1
0260 wmsplit['RECODR2_2017reHLT_Prompt'] = 1
0261 wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi'] = 1
0262 wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt'] = 1
0263 wmsplit['RECODR2_2017reHLT_skimMET_Prompt'] = 1
0264 wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt'] = 1
0265 wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM'] = 1
0266 wmsplit['RECODR2_2018reHLT_Prompt'] = 1
0267 wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi'] = 1
0268 wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt'] = 1
0269 wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt'] = 1
0270 wmsplit['RECODR2_2018reHLT_skimMET_Prompt'] = 1
0271 wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt'] = 1
0272 wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM'] = 1
0273 wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt'] = 1
0274 wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt'] = 1
0275 wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail'] = 1
0276 wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig'] = 1
0277 wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt'] = 1
0278 wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt'] = 1
0279 wmsplit['RECODR2_2018reHLT_ZBPrompt'] = 1
0280 wmsplit['RECODR2_2018reHLT_Offline'] = 1
0281 wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi'] = 1
0282 wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline'] = 1
0283 wmsplit['RECODR2_2018reHLT_skimJetHT_Offline'] = 1
0284 wmsplit['RECODR2_2018reHLT_skimMET_Offline'] = 1
0285 wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline'] = 1
0286 wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM'] = 1
0287 wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline'] = 1
0288 wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline'] = 1
0289 wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail'] = 1
0290 wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig'] = 1
0291 wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline'] = 1
0292 wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline'] = 1
0293 wmsplit['RECODR2_2018reHLT_ZBOffline'] = 1
0294 wmsplit['HLTDR2_50ns'] = 1
0295 wmsplit['HLTDR2_25ns'] = 1
0296 wmsplit['HLTDR2_2016'] = 1
0297 wmsplit['HLTDR2_2017'] = 1
0298 wmsplit['HLTDR2_2018'] = 1
0299 wmsplit['HLTDR2_2018_BadHcalMitig'] = 1
0300 wmsplit['Hadronizer'] = 1
0301 wmsplit['DIGIUP15'] = 1
0302 wmsplit['RECOUP15'] = 1
0303 wmsplit['RECOAODUP15'] = 5
0304 wmsplit['DBLMINIAODMCUP15NODQM'] = 5
0305 wmsplit['Digi'] = 5
0306 wmsplit['Reco'] = 5
0307 wmsplit['DigiPU'] = 1
0308 wmsplit['RecoPU'] = 1
0309 wmsplit['RECOHID11'] = 1
0310 wmsplit['DIGIUP17'] = 1
0311 wmsplit['RECOUP17'] = 1
0312 wmsplit['DIGIUP17_PU25'] = 1
0313 wmsplit['RECOUP17_PU25'] = 1
0314 wmsplit['DIGICOS_UP16'] = 1
0315 wmsplit['RECOCOS_UP16'] = 1
0316 wmsplit['DIGICOS_UP17'] = 1
0317 wmsplit['RECOCOS_UP17'] = 1
0318 wmsplit['DIGICOS_UP18'] = 1
0319 wmsplit['RECOCOS_UP18'] = 1
0320 wmsplit['DIGICOS_UP21'] = 1
0321 wmsplit['RECOCOS_UP21'] = 1
0322 wmsplit['HYBRIDRepackHI2015VR'] = 1
0323 wmsplit['HYBRIDZSHI2015'] = 1
0324 wmsplit['RECOHID15'] = 1
0325 wmsplit['RECOHID18'] = 1
0326 wmsplit['HLTDR3_2023'] = 1
0327 wmsplit['RECONANORUN3_reHLT'] = 1
0328 wmsplit['HARVESTRUN3'] = 1
0329
0330 from .upgradeWorkflowComponents import upgradeKeys
0331 for key in upgradeKeys[2026]:
0332 if 'PU' not in key:
0333 continue
0334
0335 wmsplit['DigiTriggerPU_' + key] = 1
0336 wmsplit['RecoGlobalPU_' + key] = 1
0337
0338 except Exception as ex:
0339 print('Exception while building a wmsplit dictionary: %s' % (str(ex)))
0340 return {}
0341
0342 return wmsplit
0343
0344 def prepare(self, mReader, directories, mode='init'):
0345 wmsplit = MatrixInjector.get_wmsplit()
0346 acqEra=False
0347 for (n,dir) in directories.items():
0348 chainDict=copy.deepcopy(self.defaultChain)
0349 print("inspecting",dir)
0350 nextHasDSInput=None
0351 for (x,s) in mReader.workFlowSteps.items():
0352
0353
0354 if x[0]==n:
0355
0356
0357 index=0
0358 splitForThisWf=None
0359 thisLabel=self.speciallabel
0360
0361 if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
0362 chainDict['TimePerEvent']=0.01
0363 thisLabel=thisLabel+"_gen"
0364
0365 if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
0366 thisLabel=thisLabel+"_dblMiniAOD"
0367 processStrPrefix=''
0368 setPrimaryDs=None
0369 nanoedmGT=''
0370 for step in s[3]:
0371
0372 if 'INPUT' in step or (not isinstance(s[2][index],str)):
0373 nextHasDSInput=s[2][index]
0374
0375 else:
0376
0377 if (index==0):
0378
0379 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
0380 try:
0381 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0382 except:
0383 print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0384 return -15
0385
0386 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
0387 if not '--relval' in s[2][index]:
0388 print('Impossible to create task from scratch without splitting information with --relval')
0389 return -12
0390 else:
0391 arg=s[2][index].split()
0392 ns=list(map(int,arg[len(arg) - arg[-1::-1].index('--relval')].split(',')))
0393 chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
0394 chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
0395 chainDict['nowmTasklist'][-1]['EventsPerLumi'] = ns[1]
0396
0397 if 'numberEventsInLuminosityBlock' in s[2][index]:
0398 nEventsInLuminosityBlock = re.findall('process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\(([ 0-9 ]*)\)', s[2][index],re.DOTALL)
0399 if nEventsInLuminosityBlock[-1].isdigit() and int(nEventsInLuminosityBlock[-1]) < ns[1]:
0400 chainDict['nowmTasklist'][-1]['EventsPerLumi'] = int(nEventsInLuminosityBlock[-1])
0401 if(self.numberEventsInLuminosityBlock > 0 and self.numberEventsInLuminosityBlock <= ns[1]):
0402 chainDict['nowmTasklist'][-1]['EventsPerLumi'] = self.numberEventsInLuminosityBlock
0403 if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
0404 thisLabel+='_FastSim'
0405 if 'lhe' in s[2][index] in s[2][index]:
0406 chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
0407
0408 elif nextHasDSInput:
0409 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
0410 try:
0411 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0412 except:
0413 print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0414 return -15
0415 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
0416 if ('DQMHLTonRAWAOD' in step) :
0417 chainDict['nowmTasklist'][-1]['IncludeParents']=True
0418 splitForThisWf=nextHasDSInput.split
0419 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0420 if step in wmsplit:
0421 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0422
0423 if len(nextHasDSInput.run):
0424 chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
0425 if len(nextHasDSInput.ls):
0426 chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
0427
0428 if '--data' in s[2][index] and nextHasDSInput.label:
0429 thisLabel+='_RelVal_%s'%nextHasDSInput.label
0430 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
0431 print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
0432 processStrPrefix='PU_'
0433 setPrimaryDs = 'RelVal'+s[1].split('+')[0]
0434 if setPrimaryDs:
0435 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0436 nextHasDSInput=None
0437 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0438 chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0439 chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0440 else:
0441
0442 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
0443 try:
0444 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0445 except:
0446 print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0447 return -15
0448 if splitForThisWf:
0449 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0450 if step in wmsplit:
0451 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0452 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0453 chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0454 chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0455
0456
0457 if 'Hadronizer' in step:
0458 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
0459
0460
0461 chainDict['nowmTasklist'][-1]['TaskName']=step
0462 if setPrimaryDs:
0463 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0464 chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
0465 chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0466 chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0467 if 'NANOEDM' in step :
0468 nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0469 if 'NANOMERGE' in step :
0470 chainDict['GlobalTag'] = nanoedmGT
0471 if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
0472 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
0473 if '--pileup ' in s[2][index]:
0474 pileupString = s[2][index].split()[s[2][index].split().index('--pileup')+1]
0475 processStrPrefix='PU_'
0476 if pileupString.find('25ns') > 0 :
0477 processStrPrefix='PU25ns_'
0478 elif pileupString.find('50ns') > 0 :
0479 processStrPrefix='PU50ns_'
0480 elif 'nopu' in pileupString.lower():
0481 processStrPrefix=''
0482 if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]:
0483 if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
0484 processStrPrefix='PUpmx25ns_'
0485 elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
0486 processStrPrefix='PUpmx50ns_'
0487
0488 if acqEra:
0489
0490 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
0491 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0492 if 'NANOMERGE' in step :
0493 chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0494 else:
0495
0496 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
0497 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0498 if 'NANOMERGE' in step :
0499 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0500
0501 if (self.batchName):
0502 chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
0503
0504
0505 if ('DBLMINIAODMCUP15NODQM' in step):
0506 chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD'
0507
0508 if( chainDict['nowmTasklist'][-1]['Multicore'] ):
0509
0510
0511
0512
0513
0514
0515 chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 ) * self.memPerCore
0516
0517 index+=1
0518
0519 chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
0520 if processStrPrefix or thisLabel:
0521 chainDict['RequestString']+='_'+processStrPrefix+thisLabel
0522
0523 self.candidateWFName = self.user+'_'+chainDict['RequestString']
0524 if (len(self.candidateWFName)>MAXWORKFLOWLENGTH):
0525 self.longWFName.append(self.candidateWFName)
0526
0527
0528 chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
0529 if(self.batchName):
0530 chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
0531 if( 'HIN' in self.batchName ):
0532 chainDict['SubRequestType'] = "HIRelVal"
0533
0534
0535 import pprint
0536
0537
0538
0539 for i_second in reversed(range(len(chainDict['nowmTasklist']))):
0540 t_second=chainDict['nowmTasklist'][i_second]
0541
0542 if 'primary' in t_second['nowmIO']:
0543
0544 primary=t_second['nowmIO']['primary'][0].replace('file:','')
0545 for i_input in reversed(range(0,i_second)):
0546 t_input=chainDict['nowmTasklist'][i_input]
0547 for (om,o) in t_input['nowmIO'].items():
0548 if primary in o:
0549
0550
0551 if (len(t_input['TaskName'])>50):
0552 if (t_input['TaskName'].find('GenSim') != -1):
0553 t_input['TaskName'] = 'GenSimFull'
0554 if (t_input['TaskName'].find('Hadronizer') != -1):
0555 t_input['TaskName'] = 'HadronizerFull'
0556 t_second['InputTask'] = t_input['TaskName']
0557 t_second['InputFromOutputModule'] = om
0558
0559 if t_second['TaskName'].startswith('HARVEST'):
0560 chainDict.update(copy.deepcopy(self.defaultHarvest))
0561 if "_RD" in t_second['TaskName']:
0562 chainDict['DQMHarvestUnit'] = "multiRun"
0563 chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
0564
0565
0566
0567 break
0568
0569
0570
0571
0572
0573
0574
0575
0576
0577
0578
0579
0580
0581
0582 if acqEra:
0583 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
0584 chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
0585 else:
0586 chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
0587 chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
0588
0589
0590
0591 chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
0592
0593
0594 itask=0
0595 if self.keep:
0596 for i in self.keep:
0597 if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
0598 chainDict['nowmTasklist'][i]['KeepOutput']=True
0599 for (i,t) in enumerate(chainDict['nowmTasklist']):
0600 if t['TaskName'].startswith('HARVEST'):
0601 continue
0602 if not self.keep:
0603 t['KeepOutput']=True
0604 elif t['TaskName'] in self.keep:
0605 t['KeepOutput']=True
0606 if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
0607 t['KeepOutput']=False
0608 t.pop('nowmIO')
0609 itask+=1
0610 chainDict['Task%d'%(itask)]=t
0611
0612
0613
0614
0615
0616
0617 chainDict['TaskChain']=itask
0618
0619 chainDict.pop('nowmTasklist')
0620 self.chainDicts[n]=chainDict
0621
0622
0623 return 0
0624
0625 def uploadConf(self,filePath,label,where):
0626 labelInCouch=self.label+'_'+label
0627 cacheName=filePath.split('/')[-1]
0628 if self.testMode:
0629 self.count+=1
0630 print('\tFake upload of',filePath,'to couch with label',labelInCouch)
0631 return self.count
0632 else:
0633 try:
0634 from modules.wma import upload_to_couch,DATABASE_NAME
0635 except:
0636 print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0637 print('\n\t QUIT\n')
0638 sys.exit(-16)
0639
0640 if cacheName in self.couchCache:
0641 print("Not re-uploading",filePath,"to",where,"for",label)
0642 cacheId=self.couchCache[cacheName]
0643 else:
0644 print("Loading",filePath,"to",where,"for",label)
0645
0646 pool = multiprocessing.Pool(1)
0647 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
0648 cacheId = cacheIds[0]
0649 self.couchCache[cacheName]=cacheId
0650 return cacheId
0651
0652 def upload(self):
0653 for (n,d) in self.chainDicts.items():
0654 for it in d:
0655 if it.startswith("Task") and it!='TaskChain':
0656
0657 couchID=self.uploadConf(d[it]['ConfigCacheID'],
0658 str(n)+d[it]['TaskName'],
0659 d['ConfigCacheUrl']
0660 )
0661 print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
0662 d[it]['ConfigCacheID']=couchID
0663 if it =='DQMConfigCacheID':
0664 couchID=self.uploadConf(d['DQMConfigCacheID'],
0665 str(n)+'harvesting',
0666 d['ConfigCacheUrl']
0667 )
0668 print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
0669 d['DQMConfigCacheID']=couchID
0670
0671
0672 def submit(self):
0673 try:
0674 from modules.wma import makeRequest,approveRequest
0675 from wmcontrol import random_sleep
0676 print('\n\tFound wmcontrol\n')
0677 except:
0678 print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0679 if not self.testMode:
0680 print('\n\t QUIT\n')
0681 sys.exit(-17)
0682
0683 import pprint
0684 for (n,d) in self.chainDicts.items():
0685 if self.testMode:
0686 print("Only viewing request",n)
0687 print(pprint.pprint(d))
0688 else:
0689
0690 print("For eyes before submitting",n)
0691 print(pprint.pprint(d))
0692 print("Submitting",n,"...........")
0693 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
0694 print("...........",n,"submitted")
0695 random_sleep()
0696 if self.testMode and len(self.longWFName)>0:
0697 print("\n*** WARNING: "+str(len(self.longWFName))+" workflows have too long names for submission (>"+str(MAXWORKFLOWLENGTH)+ "characters) ***")
0698 print('\n'.join(self.longWFName))