Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:03:39

0001 from __future__ import print_function
0002 import sys
0003 import json
0004 import os
0005 import copy
0006 import multiprocessing
0007 import time
0008 import re
0009 
0010 MAXWORKFLOWLENGTH = 81
0011 
0012 def performInjectionOptionTest(opt):
0013     if opt.show:
0014         print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
0015         sys.exit(-1)
0016     if opt.wmcontrol=='init':
0017         #init means it'll be in test mode
0018         opt.nProcs=0
0019     if opt.wmcontrol=='test':
0020         #means the wf were created already, and we just dryRun it.
0021         opt.dryRun=True
0022     if opt.wmcontrol=='submit' and opt.nProcs==0:
0023         print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
0024         sys.exit(-1)
0025     if opt.wmcontrol=='force':
0026         print("This is an expert setting, you'd better know what you're doing")
0027         opt.dryRun=True
0028 
0029 def upload_to_couch_oneArg(arguments):
0030     from modules.wma import upload_to_couch
0031     (filePath,labelInCouch,user,group,where) = arguments
0032     cacheId=upload_to_couch(filePath,
0033                             labelInCouch,
0034                             user,
0035                             group,
0036                             test_mode=False,
0037                             url=where)
0038     return cacheId
0039 
0040 
0041 class MatrixInjector(object):
0042 
0043     def __init__(self,opt,mode='init',options=''):
0044         self.count=1040
0045 
0046         self.dqmgui=None
0047         self.wmagent=None
0048         for k in options.split(','):
0049             if k.startswith('dqm:'):
0050                 self.dqmgui=k.split(':',1)[-1]
0051             elif k.startswith('wma:'):
0052                 self.wmagent=k.split(':',1)[-1]
0053 
0054         self.testMode=((mode!='submit') and (mode!='force'))
0055         self.version =1
0056         self.keep = opt.keep
0057         self.memoryOffset = opt.memoryOffset
0058         self.memPerCore = opt.memPerCore
0059         self.numberEventsInLuminosityBlock = opt.numberEventsInLuminosityBlock
0060         self.numberOfStreams = 0
0061         if(opt.nStreams>0):
0062             self.numberOfStreams = opt.nStreams
0063         self.batchName = ''
0064         self.batchTime = str(int(time.time()))
0065         if(opt.batchName):
0066             self.batchName = '__'+opt.batchName+'-'+self.batchTime
0067 
0068         ####################################
0069         # Checking and setting up GPU attributes
0070         ####################################
0071         # Mendatory
0072         self.RequiresGPU = opt.gpu
0073         self.GPUMemoryMB = opt.GPUMemoryMB
0074         self.CUDACapabilities = opt.CUDACapabilities
0075         self.CUDARuntime = opt.CUDARuntime
0076         # optional
0077         self.GPUName = opt.GPUName
0078         self.CUDADriverVersion = opt.CUDADriverVersion
0079         self.CUDARuntimeVersion = opt.CUDARuntimeVersion
0080 
0081         # WMagent url
0082         if not self.wmagent:
0083             # Overwrite with env variable
0084             self.wmagent = os.getenv('WMAGENT_REQMGR')
0085 
0086         if not self.wmagent:
0087             # Default values
0088             if not opt.testbed:
0089                 self.wmagent = 'cmsweb.cern.ch'
0090             else:
0091                 self.wmagent = 'cmsweb-testbed.cern.ch'
0092 
0093         # DBSReader url
0094         if opt.dbsUrl is not None:
0095             self.DbsUrl = opt.dbsUrl
0096         elif os.getenv('CMS_DBSREADER_URL') is not None:
0097             self.DbsUrl = os.getenv('CMS_DBSREADER_URL')
0098         else:
0099             # Default values
0100             if not opt.testbed:
0101                 self.DbsUrl = "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader"
0102             else:
0103                 self.DbsUrl = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader"
0104 
0105         if not self.dqmgui:
0106             self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
0107         #couch stuff
0108         self.couch = 'https://'+self.wmagent+'/couchdb'
0109 #        self.couchDB = 'reqmgr_config_cache'
0110         self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
0111         self.user = os.getenv('USER')
0112         self.group = 'ppd'
0113         self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
0114         self.speciallabel=''
0115         if opt.label:
0116             self.speciallabel= '_'+opt.label
0117         self.longWFName = []
0118 
0119         if not os.getenv('WMCORE_ROOT'):
0120             print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
0121             if not self.testMode:
0122                 print('\n\t QUIT\n')
0123                 sys.exit(-18)
0124         else:
0125             print('\n\tFound wmclient\n')
0126             
0127         self.defaultChain={
0128             "RequestType" :    "TaskChain",                    #this is how we handle relvals
0129             "SubRequestType" : "RelVal",                       #this is how we handle relvals, now that TaskChain is also used for central MC production
0130             "RequestPriority": 500000,
0131             "Requestor": self.user,                           #Person responsible
0132             "Group": self.group,                              #group for the request
0133             "CMSSWVersion": os.getenv('CMSSW_VERSION'),       #CMSSW Version (used for all tasks in chain)
0134             "Campaign": os.getenv('CMSSW_VERSION'),           # = AcquisitionEra, will be reset later to the one of first task, will both be the CMSSW_VERSION
0135             "ScramArch": os.getenv('SCRAM_ARCH'),             #Scram Arch (used for all tasks in chain)
0136             "ProcessingVersion": self.version,                #Processing Version (used for all tasks in chain)
0137             "GlobalTag": None,                                #Global Tag (overridden per task)
0138             "ConfigCacheUrl": self.couch,                     #URL of CouchDB containing Config Cache
0139             "DbsUrl": self.DbsUrl,
0140             #- Will contain all configs for all Tasks
0141             #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"],   #Site whitelist
0142             "TaskChain" : None,                                  #Define number of tasks in chain.
0143             "nowmTasklist" : [],  #a list of tasks as we put them in
0144             "Multicore" : 1,   # do not set multicore for the whole chain
0145             "Memory" : 3000,
0146             "SizePerEvent" : 1234,
0147             "TimePerEvent" : 10,
0148             "PrepID": os.getenv('CMSSW_VERSION')
0149             }
0150 
0151         self.defaultHarvest={
0152             "EnableHarvesting" : "True",
0153             "DQMUploadUrl" : self.dqmgui,
0154             "DQMConfigCacheID" : None,
0155             "Multicore" : 1              # hardcode Multicore to be 1 for Harvest
0156             }
0157         
0158         self.defaultScratch={
0159             "TaskName" : None,                            #Task Name
0160             "ConfigCacheID" : None,                   #Generator Config id
0161             "GlobalTag": None,
0162             "SplittingAlgo"  : "EventBased",             #Splitting Algorithm
0163             "EventsPerJob" : None,                       #Size of jobs in terms of splitting algorithm
0164             "EventsPerLumi" : None,
0165             "RequestNumEvents" : None,                      #Total number of events to generate
0166             "Seeding" : "AutomaticSeeding",                          #Random seeding method
0167             "PrimaryDataset" : None,                          #Primary Dataset to be created
0168             "nowmIO": {},
0169             "Multicore" : opt.nThreads,                  # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified 
0170             "EventStreams": self.numberOfStreams,
0171             "KeepOutput" : False
0172             }
0173         self.defaultInput={
0174             "TaskName" : "DigiHLT",                                      #Task Name
0175             "ConfigCacheID" : None,                                      #Processing Config id
0176             "GlobalTag": None,
0177             "InputDataset" : None,                                       #Input Dataset to be processed
0178             "SplittingAlgo"  : "LumiBased",                        #Splitting Algorithm
0179             "LumisPerJob" : 10,               #Size of jobs in terms of splitting algorithm
0180             "nowmIO": {},
0181             "Multicore" : opt.nThreads,                       # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified 
0182             "EventStreams": self.numberOfStreams,
0183             "KeepOutput" : False
0184             }
0185         self.defaultTask={
0186             "TaskName" : None,                                 #Task Name
0187             "InputTask" : None,                                #Input Task Name (Task Name field of a previous Task entry)
0188             "InputFromOutputModule" : None,                    #OutputModule name in the input task that will provide files to process
0189             "ConfigCacheID" : None,                            #Processing Config id
0190             "GlobalTag": None,
0191             "SplittingAlgo"  : "LumiBased",                        #Splitting Algorithm
0192             "LumisPerJob" : 10,               #Size of jobs in terms of splitting algorithm
0193             "nowmIO": {},
0194             "Multicore" : opt.nThreads,                       # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified 
0195             "EventStreams": self.numberOfStreams,
0196             "KeepOutput" : False,
0197             "RequiresGPU" : None,
0198             "GPUParams": None
0199             }
0200         self.defaultGPUParams={
0201             "GPUMemoryMB": self.GPUMemoryMB,
0202             "CUDACapabilities": self.CUDACapabilities,
0203             "CUDARuntime": self.CUDARuntime
0204             }
0205         if self.GPUName: self.defaultGPUParams.update({"GPUName": self.GPUName})
0206         if self.CUDADriverVersion: self.defaultGPUParams.update({"CUDADriverVersion": self.CUDADriverVersion})
0207         if self.CUDARuntimeVersion: self.defaultGPUParams.update({"CUDARuntimeVersion": self.CUDARuntimeVersion})
0208 
0209         self.chainDicts={}
0210 
0211     @staticmethod
0212     def get_wmsplit():
0213         """
0214         Return a "wmsplit" dictionary that contain non-default LumisPerJob values
0215         """
0216         wmsplit = {}
0217         try:
0218             wmsplit['DIGIHI'] = 5
0219             wmsplit['RECOHI'] = 5
0220             wmsplit['HLTD'] = 5
0221             wmsplit['RECODreHLT'] = 2
0222             wmsplit['DIGIPU'] = 4
0223             wmsplit['DIGIPU1'] = 4
0224             wmsplit['RECOPU1'] = 1
0225             wmsplit['DIGIUP15_PU50'] = 1
0226             wmsplit['RECOUP15_PU50'] = 1
0227             wmsplit['DIGIUP15_PU25'] = 1
0228             wmsplit['RECOUP15_PU25'] = 1
0229             wmsplit['DIGIUP15_PU25HS'] = 1
0230             wmsplit['RECOUP15_PU25HS'] = 1
0231             wmsplit['DIGIHIMIX'] = 5
0232             wmsplit['RECOHIMIX'] = 5
0233             wmsplit['RECODSplit'] = 1
0234             wmsplit['SingleMuPt10_UP15_ID'] = 1
0235             wmsplit['DIGIUP15_ID'] = 1
0236             wmsplit['RECOUP15_ID'] = 1
0237             wmsplit['TTbar_13_ID'] = 1
0238             wmsplit['SingleMuPt10FS_ID'] = 1
0239             wmsplit['TTbarFS_ID'] = 1
0240             wmsplit['RECODR2_50nsreHLT'] = 5
0241             wmsplit['RECODR2_25nsreHLT'] = 5
0242             wmsplit['RECODR2_2016reHLT'] = 5
0243             wmsplit['RECODR2_50nsreHLT_HIPM'] = 5
0244             wmsplit['RECODR2_25nsreHLT_HIPM'] = 5
0245             wmsplit['RECODR2_2016reHLT_HIPM'] = 1
0246             wmsplit['RECODR2_2016reHLT_skimSingleMu'] = 1
0247             wmsplit['RECODR2_2016reHLT_skimDoubleEG'] = 1
0248             wmsplit['RECODR2_2016reHLT_skimMuonEG'] = 1
0249             wmsplit['RECODR2_2016reHLT_skimJetHT'] = 1
0250             wmsplit['RECODR2_2016reHLT_skimMET'] = 1
0251             wmsplit['RECODR2_2016reHLT_skimSinglePh'] = 1
0252             wmsplit['RECODR2_2016reHLT_skimMuOnia'] = 1
0253             wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM'] = 1
0254             wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM'] = 1
0255             wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM'] = 1
0256             wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM'] = 1
0257             wmsplit['RECODR2_2016reHLT_skimMET_HIPM'] = 1
0258             wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM'] = 1
0259             wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM'] = 1
0260             wmsplit['RECODR2_2017reHLT_Prompt'] = 1
0261             wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi'] = 1
0262             wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt'] = 1
0263             wmsplit['RECODR2_2017reHLT_skimMET_Prompt'] = 1
0264             wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt'] = 1
0265             wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM'] = 1
0266             wmsplit['RECODR2_2018reHLT_Prompt'] = 1
0267             wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi'] = 1
0268             wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt'] = 1
0269             wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt'] = 1
0270             wmsplit['RECODR2_2018reHLT_skimMET_Prompt'] = 1
0271             wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt'] = 1
0272             wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM'] = 1
0273             wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt'] = 1
0274             wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt'] = 1
0275             wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail'] = 1
0276             wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig'] = 1
0277             wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt'] = 1
0278             wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt'] = 1
0279             wmsplit['RECODR2_2018reHLT_ZBPrompt'] = 1
0280             wmsplit['RECODR2_2018reHLT_Offline'] = 1
0281             wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi'] = 1
0282             wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline'] = 1
0283             wmsplit['RECODR2_2018reHLT_skimJetHT_Offline'] = 1
0284             wmsplit['RECODR2_2018reHLT_skimMET_Offline'] = 1
0285             wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline'] = 1
0286             wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM'] = 1
0287             wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline'] = 1
0288             wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline'] = 1
0289             wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail'] = 1
0290             wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig'] = 1
0291             wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline'] = 1
0292             wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline'] = 1
0293             wmsplit['RECODR2_2018reHLT_ZBOffline'] = 1
0294             wmsplit['HLTDR2_50ns'] = 1
0295             wmsplit['HLTDR2_25ns'] = 1
0296             wmsplit['HLTDR2_2016'] = 1
0297             wmsplit['HLTDR2_2017'] = 1
0298             wmsplit['HLTDR2_2018'] = 1
0299             wmsplit['HLTDR2_2018_BadHcalMitig'] = 1
0300             wmsplit['Hadronizer'] = 1
0301             wmsplit['DIGIUP15'] = 1
0302             wmsplit['RECOUP15'] = 1
0303             wmsplit['RECOAODUP15'] = 5
0304             wmsplit['DBLMINIAODMCUP15NODQM'] = 5
0305             wmsplit['Digi'] = 5
0306             wmsplit['Reco'] = 5
0307             wmsplit['DigiPU'] = 1
0308             wmsplit['RecoPU'] = 1
0309             wmsplit['RECOHID11'] = 1
0310             wmsplit['DIGIUP17'] = 1
0311             wmsplit['RECOUP17'] = 1
0312             wmsplit['DIGIUP17_PU25'] = 1
0313             wmsplit['RECOUP17_PU25'] = 1
0314             wmsplit['DIGICOS_UP16'] = 1
0315             wmsplit['RECOCOS_UP16'] = 1
0316             wmsplit['DIGICOS_UP17'] = 1
0317             wmsplit['RECOCOS_UP17'] = 1
0318             wmsplit['DIGICOS_UP18'] = 1
0319             wmsplit['RECOCOS_UP18'] = 1
0320             wmsplit['DIGICOS_UP21'] = 1
0321             wmsplit['RECOCOS_UP21'] = 1
0322             wmsplit['HYBRIDRepackHI2015VR'] = 1
0323             wmsplit['HYBRIDZSHI2015'] = 1
0324             wmsplit['RECOHID15'] = 1
0325             wmsplit['RECOHID18'] = 1
0326             wmsplit['HLTDR3_2023'] = 1
0327             wmsplit['RECONANORUN3_reHLT'] = 1
0328             wmsplit['HARVESTRUN3'] = 1
0329             # automate for phase 2
0330             from .upgradeWorkflowComponents import upgradeKeys
0331             for key in upgradeKeys[2026]:
0332                 if 'PU' not in key:
0333                     continue
0334 
0335                 wmsplit['DigiTriggerPU_' + key] = 1
0336                 wmsplit['RecoGlobalPU_' + key] = 1
0337 
0338         except Exception as ex:
0339             print('Exception while building a wmsplit dictionary: %s' % (str(ex)))
0340             return {}
0341 
0342         return wmsplit
0343 
0344     def prepare(self, mReader, directories, mode='init'):
0345         wmsplit = MatrixInjector.get_wmsplit()
0346         acqEra=False
0347         for (n,dir) in directories.items():
0348             chainDict=copy.deepcopy(self.defaultChain)
0349             print("inspecting",dir)
0350             nextHasDSInput=None
0351             for (x,s) in mReader.workFlowSteps.items():
0352                 #x has the format (num, prefix)
0353                 #s has the format (num, name, commands, stepList)
0354                 if x[0]==n:
0355                     #print "found",n,s[3]
0356                     #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
0357                     index=0
0358                     splitForThisWf=None
0359                     thisLabel=self.speciallabel
0360                     #if 'HARVESTGEN' in s[3]:
0361                     if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
0362                         chainDict['TimePerEvent']=0.01
0363                         thisLabel=thisLabel+"_gen"
0364                     # for double miniAOD test
0365                     if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
0366                         thisLabel=thisLabel+"_dblMiniAOD"
0367                     processStrPrefix=''
0368                     setPrimaryDs=None
0369                     nanoedmGT=''
0370                     for step in s[3]:
0371                         
0372                         if 'INPUT' in step or (not isinstance(s[2][index],str)):
0373                             nextHasDSInput=s[2][index]
0374 
0375                         else:
0376 
0377                             if (index==0):
0378                                 #first step and not input -> gen part
0379                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
0380                                 try:
0381                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0382                                 except:
0383                                     print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0384                                     return -15
0385 
0386                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
0387                                 if not '--relval' in s[2][index]:
0388                                     print('Impossible to create task from scratch without splitting information with --relval')
0389                                     return -12
0390                                 else:
0391                                     arg=s[2][index].split()
0392                                     ns=list(map(int,arg[len(arg) - arg[-1::-1].index('--relval')].split(',')))
0393                                     chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
0394                                     chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
0395                                     chainDict['nowmTasklist'][-1]['EventsPerLumi'] = ns[1]
0396                                     #overwrite EventsPerLumi if numberEventsInLuminosityBlock is set in cmsDriver
0397                                     if 'numberEventsInLuminosityBlock' in s[2][index]:
0398                                         nEventsInLuminosityBlock = re.findall('process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\(([ 0-9 ]*)\)', s[2][index],re.DOTALL)
0399                                         if nEventsInLuminosityBlock[-1].isdigit() and int(nEventsInLuminosityBlock[-1]) < ns[1]:
0400                                             chainDict['nowmTasklist'][-1]['EventsPerLumi'] = int(nEventsInLuminosityBlock[-1])
0401                                     if(self.numberEventsInLuminosityBlock > 0 and self.numberEventsInLuminosityBlock <= ns[1]):
0402                                         chainDict['nowmTasklist'][-1]['EventsPerLumi'] = self.numberEventsInLuminosityBlock
0403                                 if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
0404                                     thisLabel+='_FastSim'
0405                                 if 'lhe' in s[2][index] in s[2][index]:
0406                                     chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
0407 
0408                             elif nextHasDSInput:
0409                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
0410                                 try:
0411                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0412                                 except:
0413                                     print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0414                                     return -15
0415                                 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
0416                                 if ('DQMHLTonRAWAOD' in step) :
0417                                     chainDict['nowmTasklist'][-1]['IncludeParents']=True
0418                                 splitForThisWf=nextHasDSInput.split
0419                                 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0420                                 if step in wmsplit:
0421                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0422                                 # get the run numbers or #events
0423                                 if len(nextHasDSInput.run):
0424                                     chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
0425                                 if len(nextHasDSInput.ls):
0426                                     chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
0427                                 #print "what is s",s[2][index]
0428                                 if '--data' in s[2][index] and nextHasDSInput.label:
0429                                     thisLabel+='_RelVal_%s'%nextHasDSInput.label
0430                                 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
0431                                     print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
0432                                     processStrPrefix='PU_'
0433                                     setPrimaryDs = 'RelVal'+s[1].split('+')[0]
0434                                     if setPrimaryDs:
0435                                         chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0436                                 nextHasDSInput=None
0437                                 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0438                                     chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0439                                     chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0440                             else:
0441                                 #not first step and no inputDS
0442                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
0443                                 try:
0444                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0445                                 except:
0446                                     print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0447                                     return -15
0448                                 if splitForThisWf:
0449                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0450                                 if step in wmsplit:
0451                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0452                                 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0453                                     chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0454                                     chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0455 
0456                             # change LumisPerJob for Hadronizer steps. 
0457                             if 'Hadronizer' in step: 
0458                                 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
0459 
0460                             #print step
0461                             chainDict['nowmTasklist'][-1]['TaskName']=step
0462                             if setPrimaryDs:
0463                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0464                             chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
0465                             chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
0466                             chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
0467                             if 'NANOEDM' in step :
0468                                 nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0469                             if 'NANOMERGE' in step :
0470                                 chainDict['GlobalTag'] = nanoedmGT
0471                             if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
0472                                 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
0473                             if '--pileup ' in s[2][index]:      # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
0474                                 pileupString = s[2][index].split()[s[2][index].split().index('--pileup')+1]
0475                                 processStrPrefix='PU_'          # take care of pu overlay done with GEN-SIM mixing
0476                                 if pileupString.find('25ns')  > 0 :
0477                                     processStrPrefix='PU25ns_'
0478                                 elif pileupString.find('50ns')  > 0 :
0479                                     processStrPrefix='PU50ns_'
0480                                 elif 'nopu' in pileupString.lower():
0481                                     processStrPrefix=''
0482                             if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]: # take care of pu overlay done with DIGI mixing of premixed events
0483                                 if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1  ].find('25ns')  > 0 :
0484                                     processStrPrefix='PUpmx25ns_'
0485                                 elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1  ].find('50ns')  > 0 :
0486                                     processStrPrefix='PUpmx50ns_'
0487 
0488                             if acqEra:
0489                                 #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
0490                                 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
0491                                 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0492                                 if 'NANOMERGE' in step :
0493                                     chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0494                             else:
0495                                 #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
0496                                 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
0497                                 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0498                                 if 'NANOMERGE' in step :
0499                                     chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0500 
0501                             if (self.batchName):
0502                                 chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
0503 
0504                             # specify different ProcessingString for double miniAOD dataset
0505                             if ('DBLMINIAODMCUP15NODQM' in step): 
0506                                 chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD' 
0507 
0508                             if( chainDict['nowmTasklist'][-1]['Multicore'] ):
0509                                 # the scaling factor of 1.2GB / thread is empirical and measured on a SECOND round of tests with PU samples
0510                                 # the number of threads is NO LONGER assumed to be the same for all tasks
0511                                 # https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3509/1/1/1.html
0512                                 # now change to 1.5GB / additional thread according to discussion:
0513                                 # https://hypernews.cern.ch/HyperNews/CMS/get/relval/4817/1/1.html
0514 #                                chainDict['nowmTasklist'][-1]['Memory'] = 3000 + int( chainDict['nowmTasklist'][-1]['Multicore']  -1 )*1500
0515                                 chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore']  -1 ) * self.memPerCore
0516 
0517                         index+=1
0518                     #end of loop through steps
0519                     chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
0520                     if processStrPrefix or thisLabel:
0521                         chainDict['RequestString']+='_'+processStrPrefix+thisLabel
0522                     #check candidate WF name
0523                     self.candidateWFName = self.user+'_'+chainDict['RequestString']
0524                     if (len(self.candidateWFName)>MAXWORKFLOWLENGTH):
0525                         self.longWFName.append(self.candidateWFName)
0526 
0527 ### PrepID
0528                     chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
0529                     if(self.batchName):
0530                         chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
0531                         if( 'HIN' in self.batchName ):
0532                             chainDict['SubRequestType'] = "HIRelVal"
0533                         
0534             #wrap up for this one
0535             import pprint
0536             #print 'wrapping up'
0537             #pprint.pprint(chainDict)
0538             #loop on the task list
0539             for i_second in reversed(range(len(chainDict['nowmTasklist']))):
0540                 t_second=chainDict['nowmTasklist'][i_second]
0541                 #print "t_second taskname", t_second['TaskName']
0542                 if 'primary' in t_second['nowmIO']:
0543                     #print t_second['nowmIO']['primary']
0544                     primary=t_second['nowmIO']['primary'][0].replace('file:','')
0545                     for i_input in reversed(range(0,i_second)):
0546                         t_input=chainDict['nowmTasklist'][i_input]
0547                         for (om,o) in t_input['nowmIO'].items():
0548                             if primary in o:
0549                                 #print "found",primary,"procuced by",om,"of",t_input['TaskName']
0550                                 #ad-hoc fix due to restriction in TaskName of 50 characters
0551                                 if (len(t_input['TaskName'])>50):
0552                                     if (t_input['TaskName'].find('GenSim') != -1):
0553                                         t_input['TaskName'] = 'GenSimFull'
0554                                     if (t_input['TaskName'].find('Hadronizer') != -1):
0555                                         t_input['TaskName'] = 'HadronizerFull'
0556                                 t_second['InputTask'] = t_input['TaskName']
0557                                 t_second['InputFromOutputModule'] = om
0558                                 #print 't_second',pprint.pformat(t_second)
0559                                 if t_second['TaskName'].startswith('HARVEST'):
0560                                     chainDict.update(copy.deepcopy(self.defaultHarvest))
0561                                     if "_RD" in t_second['TaskName']:
0562                                         chainDict['DQMHarvestUnit'] = "multiRun"
0563                                     chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
0564                                     ## the info are not in the task specific dict but in the general dict
0565                                     #t_input.update(copy.deepcopy(self.defaultHarvest))
0566                                     #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
0567                                 break
0568 
0569             # agreed changes for wm injection:
0570             # - Campaign: *optional* string during creation. It will default to AcqEra value if possible.  
0571             #             Otherwise it will be empty.
0572             # - AcquisitionEra: *mandatory* string at request level during creation. *optional* string
0573             #                   at task level during creation. "optional" during assignment.
0574             # - ProcessingString: *mandatory* string at request level during creation. *optional* string
0575             #                     at task level during creation. "optional" during assignment.
0576             # - ProcessingVersion: *optional* during creation (default 1). *optional* during assignment.
0577             # 
0578             # Which requires following changes here:
0579             #  - reset Global AcuisitionEra, ProcessingString to be the one in the first task
0580             #  - and also Campaign to be always the same as the AcquisitionEra
0581 
0582             if acqEra:
0583                 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0] 
0584                 chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
0585             else:
0586                 chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
0587                 chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
0588                 
0589 #####  batch name appended to Campaign name
0590 #            chainDict['Campaign'] = chainDict['AcquisitionEra']
0591             chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
0592                
0593             ## clean things up now
0594             itask=0
0595             if self.keep:
0596                 for i in self.keep:
0597                     if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
0598                         chainDict['nowmTasklist'][i]['KeepOutput']=True
0599             for (i,t) in enumerate(chainDict['nowmTasklist']):
0600                 if t['TaskName'].startswith('HARVEST'):
0601                     continue
0602                 if not self.keep:
0603                     t['KeepOutput']=True
0604                 elif t['TaskName'] in self.keep:
0605                     t['KeepOutput']=True
0606                 if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
0607                     t['KeepOutput']=False
0608                 t.pop('nowmIO')
0609                 itask+=1
0610                 chainDict['Task%d'%(itask)]=t
0611 
0612 
0613             ## 
0614 
0615 
0616             ## provide the number of tasks
0617             chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
0618             
0619             chainDict.pop('nowmTasklist')
0620             self.chainDicts[n]=chainDict
0621 
0622             
0623         return 0
0624 
0625     def uploadConf(self,filePath,label,where):
0626         labelInCouch=self.label+'_'+label
0627         cacheName=filePath.split('/')[-1]
0628         if self.testMode:
0629             self.count+=1
0630             print('\tFake upload of',filePath,'to couch with label',labelInCouch)
0631             return self.count
0632         else:
0633             try:
0634                 from modules.wma import upload_to_couch,DATABASE_NAME
0635             except:
0636                 print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0637                 print('\n\t QUIT\n')
0638                 sys.exit(-16)
0639 
0640             if cacheName in self.couchCache:
0641                 print("Not re-uploading",filePath,"to",where,"for",label)
0642                 cacheId=self.couchCache[cacheName]
0643             else:
0644                 print("Loading",filePath,"to",where,"for",label)
0645                 ## totally fork the upload to couch to prevent cross loading of process configurations
0646                 pool = multiprocessing.Pool(1)
0647                 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
0648                 cacheId = cacheIds[0]
0649                 self.couchCache[cacheName]=cacheId
0650             return cacheId
0651     
0652     def upload(self):
0653         for (n,d) in self.chainDicts.items():
0654             for it in d:
0655                 if it.startswith("Task") and it!='TaskChain':
0656                     #upload
0657                     couchID=self.uploadConf(d[it]['ConfigCacheID'],
0658                                             str(n)+d[it]['TaskName'],
0659                                             d['ConfigCacheUrl']
0660                                             )
0661                     print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
0662                     d[it]['ConfigCacheID']=couchID
0663                 if it =='DQMConfigCacheID':
0664                     couchID=self.uploadConf(d['DQMConfigCacheID'],
0665                                             str(n)+'harvesting',
0666                                             d['ConfigCacheUrl']
0667                                             )
0668                     print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
0669                     d['DQMConfigCacheID']=couchID
0670                         
0671             
0672     def submit(self):
0673         try:
0674             from modules.wma import makeRequest,approveRequest
0675             from wmcontrol import random_sleep
0676             print('\n\tFound wmcontrol\n')
0677         except:
0678             print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0679             if not self.testMode:
0680                 print('\n\t QUIT\n')
0681                 sys.exit(-17)
0682 
0683         import pprint
0684         for (n,d) in self.chainDicts.items():
0685             if self.testMode:
0686                 print("Only viewing request",n)
0687                 print(pprint.pprint(d))
0688             else:
0689                 #submit to wmagent each dict
0690                 print("For eyes before submitting",n)
0691                 print(pprint.pprint(d))
0692                 print("Submitting",n,"...........")
0693                 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
0694                 print("...........",n,"submitted")
0695                 random_sleep()
0696         if self.testMode and len(self.longWFName)>0:
0697             print("\n*** WARNING: "+str(len(self.longWFName))+" workflows have too long names for submission (>"+str(MAXWORKFLOWLENGTH)+ "characters) ***")
0698             print('\n'.join(self.longWFName))