File indexing completed on 2021-09-25 01:44:59
0001 from __future__ import print_function
0002 import sys
0003 import json
0004 import os
0005 import copy
0006 import multiprocessing
0007 import time
0008 import re
0009
0010 MAXWORKFLOWLENGTH = 81
0011
0012 def performInjectionOptionTest(opt):
0013 if opt.show:
0014 print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
0015 sys.exit(-1)
0016 if opt.wmcontrol=='init':
0017
0018 opt.nProcs=0
0019 if opt.wmcontrol=='test':
0020
0021 opt.dryRun=True
0022 if opt.wmcontrol=='submit' and opt.nProcs==0:
0023 print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
0024 sys.exit(-1)
0025 if opt.wmcontrol=='force':
0026 print("This is an expert setting, you'd better know what you're doing")
0027 opt.dryRun=True
0028
0029 def upload_to_couch_oneArg(arguments):
0030 from modules.wma import upload_to_couch
0031 (filePath,labelInCouch,user,group,where) = arguments
0032 cacheId=upload_to_couch(filePath,
0033 labelInCouch,
0034 user,
0035 group,
0036 test_mode=False,
0037 url=where)
0038 return cacheId
0039
0040
0041 class MatrixInjector(object):
0042
0043 def __init__(self,opt,mode='init',options=''):
0044 self.count=1040
0045
0046 self.dqmgui=None
0047 self.wmagent=None
0048 for k in options.split(','):
0049 if k.startswith('dqm:'):
0050 self.dqmgui=k.split(':',1)[-1]
0051 elif k.startswith('wma:'):
0052 self.wmagent=k.split(':',1)[-1]
0053
0054 self.testMode=((mode!='submit') and (mode!='force'))
0055 self.version =1
0056 self.keep = opt.keep
0057 self.memoryOffset = opt.memoryOffset
0058 self.memPerCore = opt.memPerCore
0059 self.numberEventsInLuminosityBlock = opt.numberEventsInLuminosityBlock
0060 self.numberOfStreams = 0
0061 if(opt.nStreams>0):
0062 self.numberOfStreams = opt.nStreams
0063 self.batchName = ''
0064 self.batchTime = str(int(time.time()))
0065 if(opt.batchName):
0066 self.batchName = '__'+opt.batchName+'-'+self.batchTime
0067
0068
0069
0070
0071
0072 self.RequiresGPU = opt.gpu
0073 self.GPUMemoryMB = opt.GPUMemoryMB
0074 self.CUDACapabilities = opt.CUDACapabilities
0075 self.CUDARuntime = opt.CUDARuntime
0076
0077 self.GPUName = opt.GPUName
0078 self.CUDADriverVersion = opt.CUDADriverVersion
0079 self.CUDARuntimeVersion = opt.CUDARuntimeVersion
0080
0081
0082 if not self.wmagent:
0083
0084 self.wmagent = os.getenv('WMAGENT_REQMGR')
0085
0086 if not self.wmagent:
0087
0088 if not opt.testbed:
0089 self.wmagent = 'cmsweb.cern.ch'
0090 else:
0091 self.wmagent = 'cmsweb-testbed.cern.ch'
0092
0093
0094 if opt.dbsUrl is not None:
0095 self.DbsUrl = opt.dbsUrl
0096 elif os.getenv('CMS_DBSREADER_URL') is not None:
0097 self.DbsUrl = os.getenv('CMS_DBSREADER_URL')
0098 else:
0099
0100 if not opt.testbed:
0101 self.DbsUrl = "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader"
0102 else:
0103 self.DbsUrl = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader"
0104
0105 if not self.dqmgui:
0106 self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
0107
0108 self.couch = 'https://'+self.wmagent+'/couchdb'
0109
0110 self.couchCache={}
0111 self.user = os.getenv('USER')
0112 self.group = 'ppd'
0113 self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
0114 self.speciallabel=''
0115 if opt.label:
0116 self.speciallabel= '_'+opt.label
0117 self.longWFName = []
0118
0119 if not os.getenv('WMCORE_ROOT'):
0120 print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
0121 if not self.testMode:
0122 print('\n\t QUIT\n')
0123 sys.exit(-18)
0124 else:
0125 print('\n\tFound wmclient\n')
0126
0127 self.defaultChain={
0128 "RequestType" : "TaskChain",
0129 "SubRequestType" : "RelVal",
0130 "RequestPriority": 500000,
0131 "Requestor": self.user,
0132 "Group": self.group,
0133 "CMSSWVersion": os.getenv('CMSSW_VERSION'),
0134 "Campaign": os.getenv('CMSSW_VERSION'),
0135 "ScramArch": os.getenv('SCRAM_ARCH'),
0136 "ProcessingVersion": self.version,
0137 "GlobalTag": None,
0138 "ConfigCacheUrl": self.couch,
0139 "DbsUrl": self.DbsUrl,
0140
0141
0142 "TaskChain" : None,
0143 "nowmTasklist" : [],
0144 "Multicore" : 1,
0145 "Memory" : 3000,
0146 "SizePerEvent" : 1234,
0147 "TimePerEvent" : 10,
0148 "PrepID": os.getenv('CMSSW_VERSION')
0149 }
0150
0151 self.defaultHarvest={
0152 "EnableHarvesting" : "True",
0153 "DQMUploadUrl" : self.dqmgui,
0154 "DQMConfigCacheID" : None,
0155 "Multicore" : 1
0156 }
0157
0158 self.defaultScratch={
0159 "TaskName" : None,
0160 "ConfigCacheID" : None,
0161 "GlobalTag": None,
0162 "SplittingAlgo" : "EventBased",
0163 "EventsPerJob" : None,
0164 "EventsPerLumi" : None,
0165 "RequestNumEvents" : None,
0166 "Seeding" : "AutomaticSeeding",
0167 "PrimaryDataset" : None,
0168 "nowmIO": {},
0169 "Multicore" : opt.nThreads,
0170 "EventStreams": self.numberOfStreams,
0171 "KeepOutput" : False
0172 }
0173 self.defaultInput={
0174 "TaskName" : "DigiHLT",
0175 "ConfigCacheID" : None,
0176 "GlobalTag": None,
0177 "InputDataset" : None,
0178 "SplittingAlgo" : "LumiBased",
0179 "LumisPerJob" : 10,
0180 "nowmIO": {},
0181 "Multicore" : opt.nThreads,
0182 "EventStreams": self.numberOfStreams,
0183 "KeepOutput" : False
0184 }
0185 self.defaultTask={
0186 "TaskName" : None,
0187 "InputTask" : None,
0188 "InputFromOutputModule" : None,
0189 "ConfigCacheID" : None,
0190 "GlobalTag": None,
0191 "SplittingAlgo" : "LumiBased",
0192 "LumisPerJob" : 10,
0193 "nowmIO": {},
0194 "Multicore" : opt.nThreads,
0195 "EventStreams": self.numberOfStreams,
0196 "KeepOutput" : False,
0197 "RequiresGPU" : None,
0198 "GPUParams": None
0199 }
0200 self.defaultGPUParams={
0201 "GPUMemoryMB": self.GPUMemoryMB,
0202 "CUDACapabilities": self.CUDACapabilities,
0203 "CUDARuntime": self.CUDARuntime
0204 }
0205 if self.GPUName: self.defaultGPUParams.update({"GPUName": self.GPUName})
0206 if self.CUDADriverVersion: self.defaultGPUParams.update({"CUDADriverVersion": self.CUDADriverVersion})
0207 if self.CUDARuntimeVersion: self.defaultGPUParams.update({"CUDARuntimeVersion": self.CUDARuntimeVersion})
0208
0209 self.chainDicts={}
0210
0211 @staticmethod
0212 def get_wmsplit():
0213 """
0214 Return a "wmsplit" dictionary that contain non-default LumisPerJob values
0215 """
0216 wmsplit = {}
0217 try:
0218 wmsplit['DIGIHI'] = 5
0219 wmsplit['RECOHI'] = 5
0220 wmsplit['HLTD'] = 5
0221 wmsplit['RECODreHLT'] = 2
0222 wmsplit['DIGIPU'] = 4
0223 wmsplit['DIGIPU1'] = 4
0224 wmsplit['RECOPU1'] = 1
0225 wmsplit['DIGIUP15_PU50'] = 1
0226 wmsplit['RECOUP15_PU50'] = 1
0227 wmsplit['DIGIUP15_PU25'] = 1
0228 wmsplit['RECOUP15_PU25'] = 1
0229 wmsplit['DIGIUP15_PU25HS'] = 1
0230 wmsplit['RECOUP15_PU25HS'] = 1
0231 wmsplit['DIGIHIMIX'] = 5
0232 wmsplit['RECOHIMIX'] = 5
0233 wmsplit['RECODSplit'] = 1
0234 wmsplit['SingleMuPt10_UP15_ID'] = 1
0235 wmsplit['DIGIUP15_ID'] = 1
0236 wmsplit['RECOUP15_ID'] = 1
0237 wmsplit['TTbar_13_ID'] = 1
0238 wmsplit['SingleMuPt10FS_ID'] = 1
0239 wmsplit['TTbarFS_ID'] = 1
0240 wmsplit['RECODR2_50nsreHLT'] = 5
0241 wmsplit['RECODR2_25nsreHLT'] = 5
0242 wmsplit['RECODR2_2016reHLT'] = 5
0243 wmsplit['RECODR2_50nsreHLT_HIPM'] = 5
0244 wmsplit['RECODR2_25nsreHLT_HIPM'] = 5
0245 wmsplit['RECODR2_2016reHLT_HIPM'] = 1
0246 wmsplit['RECODR2_2016reHLT_skimSingleMu'] = 1
0247 wmsplit['RECODR2_2016reHLT_skimDoubleEG'] = 1
0248 wmsplit['RECODR2_2016reHLT_skimMuonEG'] = 1
0249 wmsplit['RECODR2_2016reHLT_skimJetHT'] = 1
0250 wmsplit['RECODR2_2016reHLT_skimMET'] = 1
0251 wmsplit['RECODR2_2016reHLT_skimSinglePh'] = 1
0252 wmsplit['RECODR2_2016reHLT_skimMuOnia'] = 1
0253 wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM'] = 1
0254 wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM'] = 1
0255 wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM'] = 1
0256 wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM'] = 1
0257 wmsplit['RECODR2_2016reHLT_skimMET_HIPM'] = 1
0258 wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM'] = 1
0259 wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM'] = 1
0260 wmsplit['RECODR2_2017reHLT_Prompt'] = 1
0261 wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi'] = 1
0262 wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt'] = 1
0263 wmsplit['RECODR2_2017reHLT_skimMET_Prompt'] = 1
0264 wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt'] = 1
0265 wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM'] = 1
0266 wmsplit['RECODR2_2018reHLT_Prompt'] = 1
0267 wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi'] = 1
0268 wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt'] = 1
0269 wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt'] = 1
0270 wmsplit['RECODR2_2018reHLT_skimMET_Prompt'] = 1
0271 wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt'] = 1
0272 wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM'] = 1
0273 wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt'] = 1
0274 wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt'] = 1
0275 wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail'] = 1
0276 wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig'] = 1
0277 wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt'] = 1
0278 wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt'] = 1
0279 wmsplit['RECODR2_2018reHLT_ZBPrompt'] = 1
0280 wmsplit['RECODR2_2018reHLT_Offline'] = 1
0281 wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi'] = 1
0282 wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline'] = 1
0283 wmsplit['RECODR2_2018reHLT_skimJetHT_Offline'] = 1
0284 wmsplit['RECODR2_2018reHLT_skimMET_Offline'] = 1
0285 wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline'] = 1
0286 wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM'] = 1
0287 wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline'] = 1
0288 wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline'] = 1
0289 wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail'] = 1
0290 wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig'] = 1
0291 wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline'] = 1
0292 wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline'] = 1
0293 wmsplit['RECODR2_2018reHLT_ZBOffline'] = 1
0294 wmsplit['HLTDR2_50ns'] = 1
0295 wmsplit['HLTDR2_25ns'] = 1
0296 wmsplit['HLTDR2_2016'] = 1
0297 wmsplit['HLTDR2_2017'] = 1
0298 wmsplit['HLTDR2_2018'] = 1
0299 wmsplit['HLTDR2_2018_BadHcalMitig'] = 1
0300 wmsplit['Hadronizer'] = 1
0301 wmsplit['DIGIUP15'] = 1
0302 wmsplit['RECOUP15'] = 1
0303 wmsplit['RECOAODUP15'] = 5
0304 wmsplit['DBLMINIAODMCUP15NODQM'] = 5
0305 wmsplit['Digi'] = 5
0306 wmsplit['Reco'] = 5
0307 wmsplit['DigiPU'] = 1
0308 wmsplit['RecoPU'] = 1
0309 wmsplit['RECOHID11'] = 1
0310 wmsplit['DIGIUP17'] = 1
0311 wmsplit['RECOUP17'] = 1
0312 wmsplit['DIGIUP17_PU25'] = 1
0313 wmsplit['RECOUP17_PU25'] = 1
0314 wmsplit['DIGICOS_UP16'] = 1
0315 wmsplit['RECOCOS_UP16'] = 1
0316 wmsplit['DIGICOS_UP17'] = 1
0317 wmsplit['RECOCOS_UP17'] = 1
0318 wmsplit['DIGICOS_UP18'] = 1
0319 wmsplit['RECOCOS_UP18'] = 1
0320 wmsplit['DIGICOS_UP21'] = 1
0321 wmsplit['RECOCOS_UP21'] = 1
0322 wmsplit['HYBRIDRepackHI2015VR'] = 1
0323 wmsplit['HYBRIDZSHI2015'] = 1
0324 wmsplit['RECOHID15'] = 1
0325 wmsplit['RECOHID18'] = 1
0326
0327 from .upgradeWorkflowComponents import upgradeKeys
0328 for key in upgradeKeys[2026]:
0329 if 'PU' not in key:
0330 continue
0331
0332 wmsplit['DigiTriggerPU_' + key] = 1
0333 wmsplit['RecoGlobalPU_' + key] = 1
0334
0335 except Exception as ex:
0336 print('Exception while building a wmsplit dictionary: %s' % (str(ex)))
0337 return {}
0338
0339 return wmsplit
0340
0341 def prepare(self, mReader, directories, mode='init'):
0342 wmsplit = MatrixInjector.get_wmsplit()
0343 acqEra=False
0344 for (n,dir) in directories.items():
0345 chainDict=copy.deepcopy(self.defaultChain)
0346 print("inspecting",dir)
0347 nextHasDSInput=None
0348 for (x,s) in mReader.workFlowSteps.items():
0349
0350
0351 if x[0]==n:
0352
0353
0354 index=0
0355 splitForThisWf=None
0356 thisLabel=self.speciallabel
0357
0358 if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
0359 chainDict['TimePerEvent']=0.01
0360 thisLabel=thisLabel+"_gen"
0361
0362 if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
0363 thisLabel=thisLabel+"_dblMiniAOD"
0364 processStrPrefix=''
0365 setPrimaryDs=None
0366 nanoedmGT=''
0367 for step in s[3]:
0368
0369 if 'INPUT' in step or (not isinstance(s[2][index],str)):
0370 nextHasDSInput=s[2][index]
0371
0372 else:
0373
0374 if (index==0):
0375
0376 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
0377 try:
0378 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0379 except:
0380 print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0381 return -15
0382
0383 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
0384 if not '--relval' in s[2][index]:
0385 print('Impossible to create task from scratch without splitting information with --relval')
0386 return -12
0387 else:
0388 arg=s[2][index].split()
0389 ns=list(map(int,arg[len(arg) - arg[-1::-1].index('--relval')].split(',')))
0390 chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
0391 chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
0392 chainDict['nowmTasklist'][-1]['EventsPerLumi'] = ns[1]
0393
0394 if 'numberEventsInLuminosityBlock' in s[2][index]:
0395 nEventsInLuminosityBlock = re.findall('process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\(([ 0-9 ]*)\)', s[2][index],re.DOTALL)
0396 if nEventsInLuminosityBlock[-1].isdigit() and int(nEventsInLuminosityBlock[-1]) < ns[1]:
0397 chainDict['nowmTasklist'][-1]['EventsPerLumi'] = int(nEventsInLuminosityBlock[-1])
0398 if(self.numberEventsInLuminosityBlock > 0 and self.numberEventsInLuminosityBlock <= ns[1]):
0399 chainDict['nowmTasklist'][-1]['EventsPerLumi'] = self.numberEventsInLuminosityBlock
0400 if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
0401 thisLabel+='_FastSim'
0402 if 'lhe' in s[2][index] in s[2][index]:
0403 chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
0404
0405 elif nextHasDSInput:
0406 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
0407 try:
0408 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0409 except:
0410 print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0411 return -15
0412 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
0413 if ('DQMHLTonRAWAOD' in step) :
0414 chainDict['nowmTasklist'][-1]['IncludeParents']=True
0415 splitForThisWf=nextHasDSInput.split
0416 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0417 if step in wmsplit:
0418 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0419
0420 if len(nextHasDSInput.run):
0421 chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
0422 if len(nextHasDSInput.ls):
0423 chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
0424
0425 if '--data' in s[2][index] and nextHasDSInput.label:
0426 thisLabel+='_RelVal_%s'%nextHasDSInput.label
0427 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
0428 print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
0429 processStrPrefix='PU_'
0430 setPrimaryDs = 'RelVal'+s[1].split('+')[0]
0431 if setPrimaryDs:
0432 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0433 nextHasDSInput=None
0434 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0435 chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0436 chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0437 else:
0438
0439 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
0440 try:
0441 chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0442 except:
0443 print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0444 return -15
0445 if splitForThisWf:
0446 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0447 if step in wmsplit:
0448 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0449 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0450 chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0451 chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0452
0453
0454 if 'Hadronizer' in step:
0455 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
0456
0457
0458 chainDict['nowmTasklist'][-1]['TaskName']=step
0459 if setPrimaryDs:
0460 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0461 chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
0462 chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0463 chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0464 if 'NANOEDM' in step :
0465 nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0466 if 'NANOMERGE' in step :
0467 chainDict['GlobalTag'] = nanoedmGT
0468 if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
0469 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
0470 if '--pileup ' in s[2][index]:
0471 pileupString = s[2][index].split()[s[2][index].split().index('--pileup')+1]
0472 processStrPrefix='PU_'
0473 if pileupString.find('25ns') > 0 :
0474 processStrPrefix='PU25ns_'
0475 elif pileupString.find('50ns') > 0 :
0476 processStrPrefix='PU50ns_'
0477 elif 'nopu' in pileupString.lower():
0478 processStrPrefix=''
0479 if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]:
0480 if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('25ns') > 0 :
0481 processStrPrefix='PUpmx25ns_'
0482 elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1 ].find('50ns') > 0 :
0483 processStrPrefix='PUpmx50ns_'
0484
0485 if acqEra:
0486
0487 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
0488 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0489 if 'NANOMERGE' in step :
0490 chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0491 else:
0492
0493 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
0494 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0495 if 'NANOMERGE' in step :
0496 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0497
0498 if (self.batchName):
0499 chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
0500
0501
0502 if ('DBLMINIAODMCUP15NODQM' in step):
0503 chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD'
0504
0505 if( chainDict['nowmTasklist'][-1]['Multicore'] ):
0506
0507
0508
0509
0510
0511
0512 chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore'] -1 ) * self.memPerCore
0513
0514 index+=1
0515
0516 chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
0517 if processStrPrefix or thisLabel:
0518 chainDict['RequestString']+='_'+processStrPrefix+thisLabel
0519
0520 self.candidateWFName = self.user+'_'+chainDict['RequestString']
0521 if (len(self.candidateWFName)>MAXWORKFLOWLENGTH):
0522 self.longWFName.append(self.candidateWFName)
0523
0524
0525 chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
0526 if(self.batchName):
0527 chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
0528 if( 'HIN' in self.batchName ):
0529 chainDict['SubRequestType'] = "HIRelVal"
0530
0531
0532 import pprint
0533
0534
0535
0536 for i_second in reversed(range(len(chainDict['nowmTasklist']))):
0537 t_second=chainDict['nowmTasklist'][i_second]
0538
0539 if 'primary' in t_second['nowmIO']:
0540
0541 primary=t_second['nowmIO']['primary'][0].replace('file:','')
0542 for i_input in reversed(range(0,i_second)):
0543 t_input=chainDict['nowmTasklist'][i_input]
0544 for (om,o) in t_input['nowmIO'].items():
0545 if primary in o:
0546
0547
0548 if (len(t_input['TaskName'])>50):
0549 if (t_input['TaskName'].find('GenSim') != -1):
0550 t_input['TaskName'] = 'GenSimFull'
0551 if (t_input['TaskName'].find('Hadronizer') != -1):
0552 t_input['TaskName'] = 'HadronizerFull'
0553 t_second['InputTask'] = t_input['TaskName']
0554 t_second['InputFromOutputModule'] = om
0555
0556 if t_second['TaskName'].startswith('HARVEST'):
0557 chainDict.update(copy.deepcopy(self.defaultHarvest))
0558 if "_RD" in t_second['TaskName']:
0559 chainDict['DQMHarvestUnit'] = "multiRun"
0560 chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
0561
0562
0563
0564 break
0565
0566
0567
0568
0569
0570
0571
0572
0573
0574
0575
0576
0577
0578
0579 if acqEra:
0580 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0]
0581 chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
0582 else:
0583 chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
0584 chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
0585
0586
0587
0588 chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
0589
0590
0591 itask=0
0592 if self.keep:
0593 for i in self.keep:
0594 if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
0595 chainDict['nowmTasklist'][i]['KeepOutput']=True
0596 for (i,t) in enumerate(chainDict['nowmTasklist']):
0597 if t['TaskName'].startswith('HARVEST'):
0598 continue
0599 if not self.keep:
0600 t['KeepOutput']=True
0601 elif t['TaskName'] in self.keep:
0602 t['KeepOutput']=True
0603 if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
0604 t['KeepOutput']=False
0605 t.pop('nowmIO')
0606 itask+=1
0607 chainDict['Task%d'%(itask)]=t
0608
0609
0610
0611
0612
0613
0614 chainDict['TaskChain']=itask
0615
0616 chainDict.pop('nowmTasklist')
0617 self.chainDicts[n]=chainDict
0618
0619
0620 return 0
0621
0622 def uploadConf(self,filePath,label,where):
0623 labelInCouch=self.label+'_'+label
0624 cacheName=filePath.split('/')[-1]
0625 if self.testMode:
0626 self.count+=1
0627 print('\tFake upload of',filePath,'to couch with label',labelInCouch)
0628 return self.count
0629 else:
0630 try:
0631 from modules.wma import upload_to_couch,DATABASE_NAME
0632 except:
0633 print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0634 print('\n\t QUIT\n')
0635 sys.exit(-16)
0636
0637 if cacheName in self.couchCache:
0638 print("Not re-uploading",filePath,"to",where,"for",label)
0639 cacheId=self.couchCache[cacheName]
0640 else:
0641 print("Loading",filePath,"to",where,"for",label)
0642
0643 pool = multiprocessing.Pool(1)
0644 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
0645 cacheId = cacheIds[0]
0646 self.couchCache[cacheName]=cacheId
0647 return cacheId
0648
0649 def upload(self):
0650 for (n,d) in self.chainDicts.items():
0651 for it in d:
0652 if it.startswith("Task") and it!='TaskChain':
0653
0654 couchID=self.uploadConf(d[it]['ConfigCacheID'],
0655 str(n)+d[it]['TaskName'],
0656 d['ConfigCacheUrl']
0657 )
0658 print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
0659 d[it]['ConfigCacheID']=couchID
0660 if it =='DQMConfigCacheID':
0661 couchID=self.uploadConf(d['DQMConfigCacheID'],
0662 str(n)+'harvesting',
0663 d['ConfigCacheUrl']
0664 )
0665 print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
0666 d['DQMConfigCacheID']=couchID
0667
0668
0669 def submit(self):
0670 try:
0671 from modules.wma import makeRequest,approveRequest
0672 from wmcontrol import random_sleep
0673 print('\n\tFound wmcontrol\n')
0674 except:
0675 print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0676 if not self.testMode:
0677 print('\n\t QUIT\n')
0678 sys.exit(-17)
0679
0680 import pprint
0681 for (n,d) in self.chainDicts.items():
0682 if self.testMode:
0683 print("Only viewing request",n)
0684 print(pprint.pprint(d))
0685 else:
0686
0687 print("For eyes before submitting",n)
0688 print(pprint.pprint(d))
0689 print("Submitting",n,"...........")
0690 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
0691 print("...........",n,"submitted")
0692 random_sleep()
0693 if self.testMode and len(self.longWFName)>0:
0694 print("\n*** WARNING: "+str(len(self.longWFName))+" workflows have too long names for submission (>"+str(MAXWORKFLOWLENGTH)+ "characters) ***")
0695 print('\n'.join(self.longWFName))