Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-09-25 01:44:59

0001 from __future__ import print_function
0002 import sys
0003 import json
0004 import os
0005 import copy
0006 import multiprocessing
0007 import time
0008 import re
0009 
0010 MAXWORKFLOWLENGTH = 81
0011 
0012 def performInjectionOptionTest(opt):
0013     if opt.show:
0014         print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
0015         sys.exit(-1)
0016     if opt.wmcontrol=='init':
0017         #init means it'll be in test mode
0018         opt.nProcs=0
0019     if opt.wmcontrol=='test':
0020         #means the wf were created already, and we just dryRun it.
0021         opt.dryRun=True
0022     if opt.wmcontrol=='submit' and opt.nProcs==0:
0023         print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
0024         sys.exit(-1)
0025     if opt.wmcontrol=='force':
0026         print("This is an expert setting, you'd better know what you're doing")
0027         opt.dryRun=True
0028 
0029 def upload_to_couch_oneArg(arguments):
0030     from modules.wma import upload_to_couch
0031     (filePath,labelInCouch,user,group,where) = arguments
0032     cacheId=upload_to_couch(filePath,
0033                             labelInCouch,
0034                             user,
0035                             group,
0036                             test_mode=False,
0037                             url=where)
0038     return cacheId
0039 
0040 
0041 class MatrixInjector(object):
0042 
0043     def __init__(self,opt,mode='init',options=''):
0044         self.count=1040
0045 
0046         self.dqmgui=None
0047         self.wmagent=None
0048         for k in options.split(','):
0049             if k.startswith('dqm:'):
0050                 self.dqmgui=k.split(':',1)[-1]
0051             elif k.startswith('wma:'):
0052                 self.wmagent=k.split(':',1)[-1]
0053 
0054         self.testMode=((mode!='submit') and (mode!='force'))
0055         self.version =1
0056         self.keep = opt.keep
0057         self.memoryOffset = opt.memoryOffset
0058         self.memPerCore = opt.memPerCore
0059         self.numberEventsInLuminosityBlock = opt.numberEventsInLuminosityBlock
0060         self.numberOfStreams = 0
0061         if(opt.nStreams>0):
0062             self.numberOfStreams = opt.nStreams
0063         self.batchName = ''
0064         self.batchTime = str(int(time.time()))
0065         if(opt.batchName):
0066             self.batchName = '__'+opt.batchName+'-'+self.batchTime
0067 
0068         ####################################
0069         # Checking and setting up GPU attributes
0070         ####################################
0071         # Mendatory
0072         self.RequiresGPU = opt.gpu
0073         self.GPUMemoryMB = opt.GPUMemoryMB
0074         self.CUDACapabilities = opt.CUDACapabilities
0075         self.CUDARuntime = opt.CUDARuntime
0076         # optional
0077         self.GPUName = opt.GPUName
0078         self.CUDADriverVersion = opt.CUDADriverVersion
0079         self.CUDARuntimeVersion = opt.CUDARuntimeVersion
0080 
0081         # WMagent url
0082         if not self.wmagent:
0083             # Overwrite with env variable
0084             self.wmagent = os.getenv('WMAGENT_REQMGR')
0085 
0086         if not self.wmagent:
0087             # Default values
0088             if not opt.testbed:
0089                 self.wmagent = 'cmsweb.cern.ch'
0090             else:
0091                 self.wmagent = 'cmsweb-testbed.cern.ch'
0092 
0093         # DBSReader url
0094         if opt.dbsUrl is not None:
0095             self.DbsUrl = opt.dbsUrl
0096         elif os.getenv('CMS_DBSREADER_URL') is not None:
0097             self.DbsUrl = os.getenv('CMS_DBSREADER_URL')
0098         else:
0099             # Default values
0100             if not opt.testbed:
0101                 self.DbsUrl = "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader"
0102             else:
0103                 self.DbsUrl = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader"
0104 
0105         if not self.dqmgui:
0106             self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
0107         #couch stuff
0108         self.couch = 'https://'+self.wmagent+'/couchdb'
0109 #        self.couchDB = 'reqmgr_config_cache'
0110         self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
0111         self.user = os.getenv('USER')
0112         self.group = 'ppd'
0113         self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
0114         self.speciallabel=''
0115         if opt.label:
0116             self.speciallabel= '_'+opt.label
0117         self.longWFName = []
0118 
0119         if not os.getenv('WMCORE_ROOT'):
0120             print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
0121             if not self.testMode:
0122                 print('\n\t QUIT\n')
0123                 sys.exit(-18)
0124         else:
0125             print('\n\tFound wmclient\n')
0126             
0127         self.defaultChain={
0128             "RequestType" :    "TaskChain",                    #this is how we handle relvals
0129             "SubRequestType" : "RelVal",                       #this is how we handle relvals, now that TaskChain is also used for central MC production
0130             "RequestPriority": 500000,
0131             "Requestor": self.user,                           #Person responsible
0132             "Group": self.group,                              #group for the request
0133             "CMSSWVersion": os.getenv('CMSSW_VERSION'),       #CMSSW Version (used for all tasks in chain)
0134             "Campaign": os.getenv('CMSSW_VERSION'),           # = AcquisitionEra, will be reset later to the one of first task, will both be the CMSSW_VERSION
0135             "ScramArch": os.getenv('SCRAM_ARCH'),             #Scram Arch (used for all tasks in chain)
0136             "ProcessingVersion": self.version,                #Processing Version (used for all tasks in chain)
0137             "GlobalTag": None,                                #Global Tag (overridden per task)
0138             "ConfigCacheUrl": self.couch,                     #URL of CouchDB containing Config Cache
0139             "DbsUrl": self.DbsUrl,
0140             #- Will contain all configs for all Tasks
0141             #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"],   #Site whitelist
0142             "TaskChain" : None,                                  #Define number of tasks in chain.
0143             "nowmTasklist" : [],  #a list of tasks as we put them in
0144             "Multicore" : 1,   # do not set multicore for the whole chain
0145             "Memory" : 3000,
0146             "SizePerEvent" : 1234,
0147             "TimePerEvent" : 10,
0148             "PrepID": os.getenv('CMSSW_VERSION')
0149             }
0150 
0151         self.defaultHarvest={
0152             "EnableHarvesting" : "True",
0153             "DQMUploadUrl" : self.dqmgui,
0154             "DQMConfigCacheID" : None,
0155             "Multicore" : 1              # hardcode Multicore to be 1 for Harvest
0156             }
0157         
0158         self.defaultScratch={
0159             "TaskName" : None,                            #Task Name
0160             "ConfigCacheID" : None,                   #Generator Config id
0161             "GlobalTag": None,
0162             "SplittingAlgo"  : "EventBased",             #Splitting Algorithm
0163             "EventsPerJob" : None,                       #Size of jobs in terms of splitting algorithm
0164             "EventsPerLumi" : None,
0165             "RequestNumEvents" : None,                      #Total number of events to generate
0166             "Seeding" : "AutomaticSeeding",                          #Random seeding method
0167             "PrimaryDataset" : None,                          #Primary Dataset to be created
0168             "nowmIO": {},
0169             "Multicore" : opt.nThreads,                  # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified 
0170             "EventStreams": self.numberOfStreams,
0171             "KeepOutput" : False
0172             }
0173         self.defaultInput={
0174             "TaskName" : "DigiHLT",                                      #Task Name
0175             "ConfigCacheID" : None,                                      #Processing Config id
0176             "GlobalTag": None,
0177             "InputDataset" : None,                                       #Input Dataset to be processed
0178             "SplittingAlgo"  : "LumiBased",                        #Splitting Algorithm
0179             "LumisPerJob" : 10,               #Size of jobs in terms of splitting algorithm
0180             "nowmIO": {},
0181             "Multicore" : opt.nThreads,                       # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified 
0182             "EventStreams": self.numberOfStreams,
0183             "KeepOutput" : False
0184             }
0185         self.defaultTask={
0186             "TaskName" : None,                                 #Task Name
0187             "InputTask" : None,                                #Input Task Name (Task Name field of a previous Task entry)
0188             "InputFromOutputModule" : None,                    #OutputModule name in the input task that will provide files to process
0189             "ConfigCacheID" : None,                            #Processing Config id
0190             "GlobalTag": None,
0191             "SplittingAlgo"  : "LumiBased",                        #Splitting Algorithm
0192             "LumisPerJob" : 10,               #Size of jobs in terms of splitting algorithm
0193             "nowmIO": {},
0194             "Multicore" : opt.nThreads,                       # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified 
0195             "EventStreams": self.numberOfStreams,
0196             "KeepOutput" : False,
0197             "RequiresGPU" : None,
0198             "GPUParams": None
0199             }
0200         self.defaultGPUParams={
0201             "GPUMemoryMB": self.GPUMemoryMB,
0202             "CUDACapabilities": self.CUDACapabilities,
0203             "CUDARuntime": self.CUDARuntime
0204             }
0205         if self.GPUName: self.defaultGPUParams.update({"GPUName": self.GPUName})
0206         if self.CUDADriverVersion: self.defaultGPUParams.update({"CUDADriverVersion": self.CUDADriverVersion})
0207         if self.CUDARuntimeVersion: self.defaultGPUParams.update({"CUDARuntimeVersion": self.CUDARuntimeVersion})
0208 
0209         self.chainDicts={}
0210 
0211     @staticmethod
0212     def get_wmsplit():
0213         """
0214         Return a "wmsplit" dictionary that contain non-default LumisPerJob values
0215         """
0216         wmsplit = {}
0217         try:
0218             wmsplit['DIGIHI'] = 5
0219             wmsplit['RECOHI'] = 5
0220             wmsplit['HLTD'] = 5
0221             wmsplit['RECODreHLT'] = 2
0222             wmsplit['DIGIPU'] = 4
0223             wmsplit['DIGIPU1'] = 4
0224             wmsplit['RECOPU1'] = 1
0225             wmsplit['DIGIUP15_PU50'] = 1
0226             wmsplit['RECOUP15_PU50'] = 1
0227             wmsplit['DIGIUP15_PU25'] = 1
0228             wmsplit['RECOUP15_PU25'] = 1
0229             wmsplit['DIGIUP15_PU25HS'] = 1
0230             wmsplit['RECOUP15_PU25HS'] = 1
0231             wmsplit['DIGIHIMIX'] = 5
0232             wmsplit['RECOHIMIX'] = 5
0233             wmsplit['RECODSplit'] = 1
0234             wmsplit['SingleMuPt10_UP15_ID'] = 1
0235             wmsplit['DIGIUP15_ID'] = 1
0236             wmsplit['RECOUP15_ID'] = 1
0237             wmsplit['TTbar_13_ID'] = 1
0238             wmsplit['SingleMuPt10FS_ID'] = 1
0239             wmsplit['TTbarFS_ID'] = 1
0240             wmsplit['RECODR2_50nsreHLT'] = 5
0241             wmsplit['RECODR2_25nsreHLT'] = 5
0242             wmsplit['RECODR2_2016reHLT'] = 5
0243             wmsplit['RECODR2_50nsreHLT_HIPM'] = 5
0244             wmsplit['RECODR2_25nsreHLT_HIPM'] = 5
0245             wmsplit['RECODR2_2016reHLT_HIPM'] = 1
0246             wmsplit['RECODR2_2016reHLT_skimSingleMu'] = 1
0247             wmsplit['RECODR2_2016reHLT_skimDoubleEG'] = 1
0248             wmsplit['RECODR2_2016reHLT_skimMuonEG'] = 1
0249             wmsplit['RECODR2_2016reHLT_skimJetHT'] = 1
0250             wmsplit['RECODR2_2016reHLT_skimMET'] = 1
0251             wmsplit['RECODR2_2016reHLT_skimSinglePh'] = 1
0252             wmsplit['RECODR2_2016reHLT_skimMuOnia'] = 1
0253             wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM'] = 1
0254             wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM'] = 1
0255             wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM'] = 1
0256             wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM'] = 1
0257             wmsplit['RECODR2_2016reHLT_skimMET_HIPM'] = 1
0258             wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM'] = 1
0259             wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM'] = 1
0260             wmsplit['RECODR2_2017reHLT_Prompt'] = 1
0261             wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi'] = 1
0262             wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt'] = 1
0263             wmsplit['RECODR2_2017reHLT_skimMET_Prompt'] = 1
0264             wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt'] = 1
0265             wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM'] = 1
0266             wmsplit['RECODR2_2018reHLT_Prompt'] = 1
0267             wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi'] = 1
0268             wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt'] = 1
0269             wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt'] = 1
0270             wmsplit['RECODR2_2018reHLT_skimMET_Prompt'] = 1
0271             wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt'] = 1
0272             wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM'] = 1
0273             wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt'] = 1
0274             wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt'] = 1
0275             wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail'] = 1
0276             wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig'] = 1
0277             wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt'] = 1
0278             wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt'] = 1
0279             wmsplit['RECODR2_2018reHLT_ZBPrompt'] = 1
0280             wmsplit['RECODR2_2018reHLT_Offline'] = 1
0281             wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi'] = 1
0282             wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline'] = 1
0283             wmsplit['RECODR2_2018reHLT_skimJetHT_Offline'] = 1
0284             wmsplit['RECODR2_2018reHLT_skimMET_Offline'] = 1
0285             wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline'] = 1
0286             wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM'] = 1
0287             wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline'] = 1
0288             wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline'] = 1
0289             wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail'] = 1
0290             wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig'] = 1
0291             wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline'] = 1
0292             wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline'] = 1
0293             wmsplit['RECODR2_2018reHLT_ZBOffline'] = 1
0294             wmsplit['HLTDR2_50ns'] = 1
0295             wmsplit['HLTDR2_25ns'] = 1
0296             wmsplit['HLTDR2_2016'] = 1
0297             wmsplit['HLTDR2_2017'] = 1
0298             wmsplit['HLTDR2_2018'] = 1
0299             wmsplit['HLTDR2_2018_BadHcalMitig'] = 1
0300             wmsplit['Hadronizer'] = 1
0301             wmsplit['DIGIUP15'] = 1
0302             wmsplit['RECOUP15'] = 1
0303             wmsplit['RECOAODUP15'] = 5
0304             wmsplit['DBLMINIAODMCUP15NODQM'] = 5
0305             wmsplit['Digi'] = 5
0306             wmsplit['Reco'] = 5
0307             wmsplit['DigiPU'] = 1
0308             wmsplit['RecoPU'] = 1
0309             wmsplit['RECOHID11'] = 1
0310             wmsplit['DIGIUP17'] = 1
0311             wmsplit['RECOUP17'] = 1
0312             wmsplit['DIGIUP17_PU25'] = 1
0313             wmsplit['RECOUP17_PU25'] = 1
0314             wmsplit['DIGICOS_UP16'] = 1
0315             wmsplit['RECOCOS_UP16'] = 1
0316             wmsplit['DIGICOS_UP17'] = 1
0317             wmsplit['RECOCOS_UP17'] = 1
0318             wmsplit['DIGICOS_UP18'] = 1
0319             wmsplit['RECOCOS_UP18'] = 1
0320             wmsplit['DIGICOS_UP21'] = 1
0321             wmsplit['RECOCOS_UP21'] = 1
0322             wmsplit['HYBRIDRepackHI2015VR'] = 1
0323             wmsplit['HYBRIDZSHI2015'] = 1
0324             wmsplit['RECOHID15'] = 1
0325             wmsplit['RECOHID18'] = 1
0326             # automate for phase 2
0327             from .upgradeWorkflowComponents import upgradeKeys
0328             for key in upgradeKeys[2026]:
0329                 if 'PU' not in key:
0330                     continue
0331 
0332                 wmsplit['DigiTriggerPU_' + key] = 1
0333                 wmsplit['RecoGlobalPU_' + key] = 1
0334 
0335         except Exception as ex:
0336             print('Exception while building a wmsplit dictionary: %s' % (str(ex)))
0337             return {}
0338 
0339         return wmsplit
0340 
0341     def prepare(self, mReader, directories, mode='init'):
0342         wmsplit = MatrixInjector.get_wmsplit()
0343         acqEra=False
0344         for (n,dir) in directories.items():
0345             chainDict=copy.deepcopy(self.defaultChain)
0346             print("inspecting",dir)
0347             nextHasDSInput=None
0348             for (x,s) in mReader.workFlowSteps.items():
0349                 #x has the format (num, prefix)
0350                 #s has the format (num, name, commands, stepList)
0351                 if x[0]==n:
0352                     #print "found",n,s[3]
0353                     #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
0354                     index=0
0355                     splitForThisWf=None
0356                     thisLabel=self.speciallabel
0357                     #if 'HARVESTGEN' in s[3]:
0358                     if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
0359                         chainDict['TimePerEvent']=0.01
0360                         thisLabel=thisLabel+"_gen"
0361                     # for double miniAOD test
0362                     if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
0363                         thisLabel=thisLabel+"_dblMiniAOD"
0364                     processStrPrefix=''
0365                     setPrimaryDs=None
0366                     nanoedmGT=''
0367                     for step in s[3]:
0368                         
0369                         if 'INPUT' in step or (not isinstance(s[2][index],str)):
0370                             nextHasDSInput=s[2][index]
0371 
0372                         else:
0373 
0374                             if (index==0):
0375                                 #first step and not input -> gen part
0376                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
0377                                 try:
0378                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0379                                 except:
0380                                     print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0381                                     return -15
0382 
0383                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
0384                                 if not '--relval' in s[2][index]:
0385                                     print('Impossible to create task from scratch without splitting information with --relval')
0386                                     return -12
0387                                 else:
0388                                     arg=s[2][index].split()
0389                                     ns=list(map(int,arg[len(arg) - arg[-1::-1].index('--relval')].split(',')))
0390                                     chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
0391                                     chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
0392                                     chainDict['nowmTasklist'][-1]['EventsPerLumi'] = ns[1]
0393                                     #overwrite EventsPerLumi if numberEventsInLuminosityBlock is set in cmsDriver
0394                                     if 'numberEventsInLuminosityBlock' in s[2][index]:
0395                                         nEventsInLuminosityBlock = re.findall('process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\(([ 0-9 ]*)\)', s[2][index],re.DOTALL)
0396                                         if nEventsInLuminosityBlock[-1].isdigit() and int(nEventsInLuminosityBlock[-1]) < ns[1]:
0397                                             chainDict['nowmTasklist'][-1]['EventsPerLumi'] = int(nEventsInLuminosityBlock[-1])
0398                                     if(self.numberEventsInLuminosityBlock > 0 and self.numberEventsInLuminosityBlock <= ns[1]):
0399                                         chainDict['nowmTasklist'][-1]['EventsPerLumi'] = self.numberEventsInLuminosityBlock
0400                                 if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
0401                                     thisLabel+='_FastSim'
0402                                 if 'lhe' in s[2][index] in s[2][index]:
0403                                     chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
0404 
0405                             elif nextHasDSInput:
0406                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
0407                                 try:
0408                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0409                                 except:
0410                                     print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0411                                     return -15
0412                                 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
0413                                 if ('DQMHLTonRAWAOD' in step) :
0414                                     chainDict['nowmTasklist'][-1]['IncludeParents']=True
0415                                 splitForThisWf=nextHasDSInput.split
0416                                 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0417                                 if step in wmsplit:
0418                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0419                                 # get the run numbers or #events
0420                                 if len(nextHasDSInput.run):
0421                                     chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
0422                                 if len(nextHasDSInput.ls):
0423                                     chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
0424                                 #print "what is s",s[2][index]
0425                                 if '--data' in s[2][index] and nextHasDSInput.label:
0426                                     thisLabel+='_RelVal_%s'%nextHasDSInput.label
0427                                 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
0428                                     print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
0429                                     processStrPrefix='PU_'
0430                                     setPrimaryDs = 'RelVal'+s[1].split('+')[0]
0431                                     if setPrimaryDs:
0432                                         chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0433                                 nextHasDSInput=None
0434                                 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0435                                     chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0436                                     chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0437                             else:
0438                                 #not first step and no inputDS
0439                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
0440                                 try:
0441                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0442                                 except:
0443                                     print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0444                                     return -15
0445                                 if splitForThisWf:
0446                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0447                                 if step in wmsplit:
0448                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0449                                 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0450                                     chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0451                                     chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0452 
0453                             # change LumisPerJob for Hadronizer steps. 
0454                             if 'Hadronizer' in step: 
0455                                 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
0456 
0457                             #print step
0458                             chainDict['nowmTasklist'][-1]['TaskName']=step
0459                             if setPrimaryDs:
0460                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0461                             chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
0462                             chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
0463                             chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
0464                             if 'NANOEDM' in step :
0465                                 nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0466                             if 'NANOMERGE' in step :
0467                                 chainDict['GlobalTag'] = nanoedmGT
0468                             if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
0469                                 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
0470                             if '--pileup ' in s[2][index]:      # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
0471                                 pileupString = s[2][index].split()[s[2][index].split().index('--pileup')+1]
0472                                 processStrPrefix='PU_'          # take care of pu overlay done with GEN-SIM mixing
0473                                 if pileupString.find('25ns')  > 0 :
0474                                     processStrPrefix='PU25ns_'
0475                                 elif pileupString.find('50ns')  > 0 :
0476                                     processStrPrefix='PU50ns_'
0477                                 elif 'nopu' in pileupString.lower():
0478                                     processStrPrefix=''
0479                             if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]: # take care of pu overlay done with DIGI mixing of premixed events
0480                                 if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1  ].find('25ns')  > 0 :
0481                                     processStrPrefix='PUpmx25ns_'
0482                                 elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1  ].find('50ns')  > 0 :
0483                                     processStrPrefix='PUpmx50ns_'
0484 
0485                             if acqEra:
0486                                 #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
0487                                 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
0488                                 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0489                                 if 'NANOMERGE' in step :
0490                                     chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0491                             else:
0492                                 #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
0493                                 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
0494                                 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0495                                 if 'NANOMERGE' in step :
0496                                     chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0497 
0498                             if (self.batchName):
0499                                 chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
0500 
0501                             # specify different ProcessingString for double miniAOD dataset
0502                             if ('DBLMINIAODMCUP15NODQM' in step): 
0503                                 chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD' 
0504 
0505                             if( chainDict['nowmTasklist'][-1]['Multicore'] ):
0506                                 # the scaling factor of 1.2GB / thread is empirical and measured on a SECOND round of tests with PU samples
0507                                 # the number of threads is NO LONGER assumed to be the same for all tasks
0508                                 # https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3509/1/1/1.html
0509                                 # now change to 1.5GB / additional thread according to discussion:
0510                                 # https://hypernews.cern.ch/HyperNews/CMS/get/relval/4817/1/1.html
0511 #                                chainDict['nowmTasklist'][-1]['Memory'] = 3000 + int( chainDict['nowmTasklist'][-1]['Multicore']  -1 )*1500
0512                                 chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore']  -1 ) * self.memPerCore
0513 
0514                         index+=1
0515                     #end of loop through steps
0516                     chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
0517                     if processStrPrefix or thisLabel:
0518                         chainDict['RequestString']+='_'+processStrPrefix+thisLabel
0519                     #check candidate WF name
0520                     self.candidateWFName = self.user+'_'+chainDict['RequestString']
0521                     if (len(self.candidateWFName)>MAXWORKFLOWLENGTH):
0522                         self.longWFName.append(self.candidateWFName)
0523 
0524 ### PrepID
0525                     chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
0526                     if(self.batchName):
0527                         chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
0528                         if( 'HIN' in self.batchName ):
0529                             chainDict['SubRequestType'] = "HIRelVal"
0530                         
0531             #wrap up for this one
0532             import pprint
0533             #print 'wrapping up'
0534             #pprint.pprint(chainDict)
0535             #loop on the task list
0536             for i_second in reversed(range(len(chainDict['nowmTasklist']))):
0537                 t_second=chainDict['nowmTasklist'][i_second]
0538                 #print "t_second taskname", t_second['TaskName']
0539                 if 'primary' in t_second['nowmIO']:
0540                     #print t_second['nowmIO']['primary']
0541                     primary=t_second['nowmIO']['primary'][0].replace('file:','')
0542                     for i_input in reversed(range(0,i_second)):
0543                         t_input=chainDict['nowmTasklist'][i_input]
0544                         for (om,o) in t_input['nowmIO'].items():
0545                             if primary in o:
0546                                 #print "found",primary,"procuced by",om,"of",t_input['TaskName']
0547                                 #ad-hoc fix due to restriction in TaskName of 50 characters
0548                                 if (len(t_input['TaskName'])>50):
0549                                     if (t_input['TaskName'].find('GenSim') != -1):
0550                                         t_input['TaskName'] = 'GenSimFull'
0551                                     if (t_input['TaskName'].find('Hadronizer') != -1):
0552                                         t_input['TaskName'] = 'HadronizerFull'
0553                                 t_second['InputTask'] = t_input['TaskName']
0554                                 t_second['InputFromOutputModule'] = om
0555                                 #print 't_second',pprint.pformat(t_second)
0556                                 if t_second['TaskName'].startswith('HARVEST'):
0557                                     chainDict.update(copy.deepcopy(self.defaultHarvest))
0558                                     if "_RD" in t_second['TaskName']:
0559                                         chainDict['DQMHarvestUnit'] = "multiRun"
0560                                     chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
0561                                     ## the info are not in the task specific dict but in the general dict
0562                                     #t_input.update(copy.deepcopy(self.defaultHarvest))
0563                                     #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
0564                                 break
0565 
0566             # agreed changes for wm injection:
0567             # - Campaign: *optional* string during creation. It will default to AcqEra value if possible.  
0568             #             Otherwise it will be empty.
0569             # - AcquisitionEra: *mandatory* string at request level during creation. *optional* string
0570             #                   at task level during creation. "optional" during assignment.
0571             # - ProcessingString: *mandatory* string at request level during creation. *optional* string
0572             #                     at task level during creation. "optional" during assignment.
0573             # - ProcessingVersion: *optional* during creation (default 1). *optional* during assignment.
0574             # 
0575             # Which requires following changes here:
0576             #  - reset Global AcuisitionEra, ProcessingString to be the one in the first task
0577             #  - and also Campaign to be always the same as the AcquisitionEra
0578 
0579             if acqEra:
0580                 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0] 
0581                 chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
0582             else:
0583                 chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
0584                 chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
0585                 
0586 #####  batch name appended to Campaign name
0587 #            chainDict['Campaign'] = chainDict['AcquisitionEra']
0588             chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
0589                
0590             ## clean things up now
0591             itask=0
0592             if self.keep:
0593                 for i in self.keep:
0594                     if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
0595                         chainDict['nowmTasklist'][i]['KeepOutput']=True
0596             for (i,t) in enumerate(chainDict['nowmTasklist']):
0597                 if t['TaskName'].startswith('HARVEST'):
0598                     continue
0599                 if not self.keep:
0600                     t['KeepOutput']=True
0601                 elif t['TaskName'] in self.keep:
0602                     t['KeepOutput']=True
0603                 if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
0604                     t['KeepOutput']=False
0605                 t.pop('nowmIO')
0606                 itask+=1
0607                 chainDict['Task%d'%(itask)]=t
0608 
0609 
0610             ## 
0611 
0612 
0613             ## provide the number of tasks
0614             chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
0615             
0616             chainDict.pop('nowmTasklist')
0617             self.chainDicts[n]=chainDict
0618 
0619             
0620         return 0
0621 
0622     def uploadConf(self,filePath,label,where):
0623         labelInCouch=self.label+'_'+label
0624         cacheName=filePath.split('/')[-1]
0625         if self.testMode:
0626             self.count+=1
0627             print('\tFake upload of',filePath,'to couch with label',labelInCouch)
0628             return self.count
0629         else:
0630             try:
0631                 from modules.wma import upload_to_couch,DATABASE_NAME
0632             except:
0633                 print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0634                 print('\n\t QUIT\n')
0635                 sys.exit(-16)
0636 
0637             if cacheName in self.couchCache:
0638                 print("Not re-uploading",filePath,"to",where,"for",label)
0639                 cacheId=self.couchCache[cacheName]
0640             else:
0641                 print("Loading",filePath,"to",where,"for",label)
0642                 ## totally fork the upload to couch to prevent cross loading of process configurations
0643                 pool = multiprocessing.Pool(1)
0644                 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
0645                 cacheId = cacheIds[0]
0646                 self.couchCache[cacheName]=cacheId
0647             return cacheId
0648     
0649     def upload(self):
0650         for (n,d) in self.chainDicts.items():
0651             for it in d:
0652                 if it.startswith("Task") and it!='TaskChain':
0653                     #upload
0654                     couchID=self.uploadConf(d[it]['ConfigCacheID'],
0655                                             str(n)+d[it]['TaskName'],
0656                                             d['ConfigCacheUrl']
0657                                             )
0658                     print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
0659                     d[it]['ConfigCacheID']=couchID
0660                 if it =='DQMConfigCacheID':
0661                     couchID=self.uploadConf(d['DQMConfigCacheID'],
0662                                             str(n)+'harvesting',
0663                                             d['ConfigCacheUrl']
0664                                             )
0665                     print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
0666                     d['DQMConfigCacheID']=couchID
0667                         
0668             
0669     def submit(self):
0670         try:
0671             from modules.wma import makeRequest,approveRequest
0672             from wmcontrol import random_sleep
0673             print('\n\tFound wmcontrol\n')
0674         except:
0675             print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0676             if not self.testMode:
0677                 print('\n\t QUIT\n')
0678                 sys.exit(-17)
0679 
0680         import pprint
0681         for (n,d) in self.chainDicts.items():
0682             if self.testMode:
0683                 print("Only viewing request",n)
0684                 print(pprint.pprint(d))
0685             else:
0686                 #submit to wmagent each dict
0687                 print("For eyes before submitting",n)
0688                 print(pprint.pprint(d))
0689                 print("Submitting",n,"...........")
0690                 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
0691                 print("...........",n,"submitted")
0692                 random_sleep()
0693         if self.testMode and len(self.longWFName)>0:
0694             print("\n*** WARNING: "+str(len(self.longWFName))+" workflows have too long names for submission (>"+str(MAXWORKFLOWLENGTH)+ "characters) ***")
0695             print('\n'.join(self.longWFName))