Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-12-01 23:40:07

0001 import sys
0002 import json
0003 import os
0004 import copy
0005 import multiprocessing
0006 import time
0007 import re
0008 
0009 MAXWORKFLOWLENGTH = 81
0010 
0011 def performInjectionOptionTest(opt):
0012     if opt.show:
0013         print('Not injecting to wmagent in --show mode. Need to run the worklfows.')
0014         sys.exit(-1)
0015     if opt.wmcontrol=='init':
0016         #init means it'll be in test mode
0017         opt.nProcs=0
0018     if opt.wmcontrol=='test':
0019         #means the wf were created already, and we just dryRun it.
0020         opt.dryRun=True
0021     if opt.wmcontrol=='submit' and opt.nProcs==0:
0022         print('Not injecting to wmagent in -j 0 mode. Need to run the worklfows.')
0023         sys.exit(-1)
0024     if opt.wmcontrol=='force':
0025         print("This is an expert setting, you'd better know what you're doing")
0026         opt.dryRun=True
0027 
0028 def upload_to_couch_oneArg(arguments):
0029     from modules.wma import upload_to_couch
0030     (filePath,labelInCouch,user,group,where) = arguments
0031     cacheId=upload_to_couch(filePath,
0032                             labelInCouch,
0033                             user,
0034                             group,
0035                             test_mode=False,
0036                             url=where)
0037     return cacheId
0038 
0039 
0040 class MatrixInjector(object):
0041 
0042     def __init__(self,opt,mode='init',options=''):
0043         self.count=1040
0044 
0045         self.dqmgui=None
0046         self.wmagent=None
0047         for k in options.split(','):
0048             if k.startswith('dqm:'):
0049                 self.dqmgui=k.split(':',1)[-1]
0050             elif k.startswith('wma:'):
0051                 self.wmagent=k.split(':',1)[-1]
0052 
0053         self.testMode=((mode!='submit') and (mode!='force'))
0054         self.version =1
0055         self.keep = opt.keep
0056         self.memoryOffset = opt.memoryOffset
0057         self.memPerCore = opt.memPerCore
0058         self.numberEventsInLuminosityBlock = opt.numberEventsInLuminosityBlock
0059         self.numberOfStreams = 0
0060         if(opt.nStreams>0):
0061             self.numberOfStreams = opt.nStreams
0062         self.batchName = ''
0063         self.batchTime = str(int(time.time()))
0064         if(opt.batchName):
0065             self.batchName = '__'+opt.batchName+'-'+self.batchTime
0066 
0067         ####################################
0068         # Checking and setting up GPU attributes
0069         ####################################
0070         # Mendatory
0071         self.RequiresGPU = opt.gpu
0072         self.GPUMemoryMB = opt.GPUMemoryMB
0073         self.CUDACapabilities = opt.CUDACapabilities
0074         self.CUDARuntime = opt.CUDARuntime
0075         # optional
0076         self.GPUName = opt.GPUName
0077         self.CUDADriverVersion = opt.CUDADriverVersion
0078         self.CUDARuntimeVersion = opt.CUDARuntimeVersion
0079 
0080         # WMagent url
0081         if not self.wmagent:
0082             # Overwrite with env variable
0083             self.wmagent = os.getenv('WMAGENT_REQMGR')
0084 
0085         if not self.wmagent:
0086             # Default values
0087             if not opt.testbed:
0088                 self.wmagent = 'cmsweb.cern.ch'
0089             else:
0090                 self.wmagent = 'cmsweb-testbed.cern.ch'
0091 
0092         # DBSReader url
0093         if opt.dbsUrl is not None:
0094             self.DbsUrl = opt.dbsUrl
0095         elif os.getenv('CMS_DBSREADER_URL') is not None:
0096             self.DbsUrl = os.getenv('CMS_DBSREADER_URL')
0097         else:
0098             # Default values
0099             if not opt.testbed:
0100                 self.DbsUrl = "https://cmsweb-prod.cern.ch/dbs/prod/global/DBSReader"
0101             else:
0102                 self.DbsUrl = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSReader"
0103 
0104         if not self.dqmgui:
0105             self.dqmgui="https://cmsweb.cern.ch/dqm/relval"
0106         #couch stuff
0107         self.couch = 'https://'+self.wmagent+'/couchdb'
0108 #        self.couchDB = 'reqmgr_config_cache'
0109         self.couchCache={} # so that we do not upload like crazy, and recyle cfgs
0110         self.user = os.getenv('USER')
0111         self.group = 'ppd'
0112         self.label = 'RelValSet_'+os.getenv('CMSSW_VERSION').replace('-','')+'_v'+str(self.version)
0113         self.speciallabel=''
0114         if opt.label:
0115             self.speciallabel= '_'+opt.label
0116         self.longWFName = []
0117 
0118         if not os.getenv('WMCORE_ROOT'):
0119             print('\n\twmclient is not setup properly. Will not be able to upload or submit requests.\n')
0120             if not self.testMode:
0121                 print('\n\t QUIT\n')
0122                 sys.exit(-18)
0123         else:
0124             print('\n\tFound wmclient\n')
0125             
0126         self.defaultChain={
0127             "RequestType" :    "TaskChain",                    #this is how we handle relvals
0128             "SubRequestType" : "RelVal",                       #this is how we handle relvals, now that TaskChain is also used for central MC production
0129             "RequestPriority": 500000,
0130             "Requestor": self.user,                           #Person responsible
0131             "Group": self.group,                              #group for the request
0132             "CMSSWVersion": os.getenv('CMSSW_VERSION'),       #CMSSW Version (used for all tasks in chain)
0133             "Campaign": os.getenv('CMSSW_VERSION'),           # = AcquisitionEra, will be reset later to the one of first task, will both be the CMSSW_VERSION
0134             "ScramArch": os.getenv('SCRAM_ARCH'),             #Scram Arch (used for all tasks in chain)
0135             "ProcessingVersion": self.version,                #Processing Version (used for all tasks in chain)
0136             "GlobalTag": None,                                #Global Tag (overridden per task)
0137             "ConfigCacheUrl": self.couch,                     #URL of CouchDB containing Config Cache
0138             "DbsUrl": self.DbsUrl,
0139             #- Will contain all configs for all Tasks
0140             #"SiteWhitelist" : ["T2_CH_CERN", "T1_US_FNAL"],   #Site whitelist
0141             "TaskChain" : None,                                  #Define number of tasks in chain.
0142             "nowmTasklist" : [],  #a list of tasks as we put them in
0143             "Multicore" : 1,   # do not set multicore for the whole chain
0144             "Memory" : 3000,
0145             "SizePerEvent" : 1234,
0146             "TimePerEvent" : 10,
0147             "PrepID": os.getenv('CMSSW_VERSION')
0148             }
0149 
0150         self.defaultHarvest={
0151             "EnableHarvesting" : "True",
0152             "DQMUploadUrl" : self.dqmgui,
0153             "DQMConfigCacheID" : None,
0154             "Multicore" : 1              # hardcode Multicore to be 1 for Harvest
0155             }
0156         
0157         self.defaultScratch={
0158             "TaskName" : None,                            #Task Name
0159             "ConfigCacheID" : None,                   #Generator Config id
0160             "GlobalTag": None,
0161             "SplittingAlgo"  : "EventBased",             #Splitting Algorithm
0162             "EventsPerJob" : None,                       #Size of jobs in terms of splitting algorithm
0163             "EventsPerLumi" : None,
0164             "RequestNumEvents" : None,                      #Total number of events to generate
0165             "Seeding" : "AutomaticSeeding",                          #Random seeding method
0166             "PrimaryDataset" : None,                          #Primary Dataset to be created
0167             "nowmIO": {},
0168             "Multicore" : opt.nThreads,                  # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified 
0169             "EventStreams": self.numberOfStreams,
0170             "KeepOutput" : False
0171             }
0172         self.defaultInput={
0173             "TaskName" : "DigiHLT",                                      #Task Name
0174             "ConfigCacheID" : None,                                      #Processing Config id
0175             "GlobalTag": None,
0176             "InputDataset" : None,                                       #Input Dataset to be processed
0177             "SplittingAlgo"  : "LumiBased",                        #Splitting Algorithm
0178             "LumisPerJob" : 10,               #Size of jobs in terms of splitting algorithm
0179             "nowmIO": {},
0180             "Multicore" : opt.nThreads,                       # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified 
0181             "EventStreams": self.numberOfStreams,
0182             "KeepOutput" : False
0183             }
0184         self.defaultTask={
0185             "TaskName" : None,                                 #Task Name
0186             "InputTask" : None,                                #Input Task Name (Task Name field of a previous Task entry)
0187             "InputFromOutputModule" : None,                    #OutputModule name in the input task that will provide files to process
0188             "ConfigCacheID" : None,                            #Processing Config id
0189             "GlobalTag": None,
0190             "SplittingAlgo"  : "LumiBased",                        #Splitting Algorithm
0191             "LumisPerJob" : 10,               #Size of jobs in terms of splitting algorithm
0192             "nowmIO": {},
0193             "Multicore" : opt.nThreads,                       # this is the per-taskchain Multicore; it's the default assigned to a task if it has no value specified 
0194             "EventStreams": self.numberOfStreams,
0195             "KeepOutput" : False,
0196             "RequiresGPU" : None,
0197             "GPUParams": None
0198             }
0199         self.defaultGPUParams={
0200             "GPUMemoryMB": self.GPUMemoryMB,
0201             "CUDACapabilities": self.CUDACapabilities,
0202             "CUDARuntime": self.CUDARuntime
0203             }
0204         if self.GPUName: self.defaultGPUParams.update({"GPUName": self.GPUName})
0205         if self.CUDADriverVersion: self.defaultGPUParams.update({"CUDADriverVersion": self.CUDADriverVersion})
0206         if self.CUDARuntimeVersion: self.defaultGPUParams.update({"CUDARuntimeVersion": self.CUDARuntimeVersion})
0207 
0208         self.chainDicts={}
0209 
0210     @staticmethod
0211     def get_wmsplit():
0212         """
0213         Return a "wmsplit" dictionary that contain non-default LumisPerJob values
0214         """
0215         wmsplit = {}
0216         try:
0217             wmsplit['DIGIHI'] = 5
0218             wmsplit['RECOHI'] = 5
0219             wmsplit['HLTD'] = 5
0220             wmsplit['RECODreHLT'] = 2
0221             wmsplit['DIGIPU'] = 4
0222             wmsplit['DIGIPU1'] = 4
0223             wmsplit['RECOPU1'] = 1
0224             wmsplit['DIGIUP15_PU50'] = 1
0225             wmsplit['RECOUP15_PU50'] = 1
0226             wmsplit['DIGIUP15_PU25'] = 1
0227             wmsplit['RECOUP15_PU25'] = 1
0228             wmsplit['DIGIUP15_PU25HS'] = 1
0229             wmsplit['RECOUP15_PU25HS'] = 1
0230             wmsplit['DIGIHIMIX'] = 5
0231             wmsplit['RECOHIMIX'] = 5
0232             wmsplit['RECODSplit'] = 1
0233             wmsplit['SingleMuPt10_UP15_ID'] = 1
0234             wmsplit['DIGIUP15_ID'] = 1
0235             wmsplit['RECOUP15_ID'] = 1
0236             wmsplit['TTbar_13_ID'] = 1
0237             wmsplit['SingleMuPt10FS_ID'] = 1
0238             wmsplit['TTbarFS_ID'] = 1
0239             wmsplit['RECODR2_50nsreHLT'] = 5
0240             wmsplit['RECODR2_25nsreHLT'] = 5
0241             wmsplit['RECODR2_2016reHLT'] = 5
0242             wmsplit['RECODR2_50nsreHLT_HIPM'] = 5
0243             wmsplit['RECODR2_25nsreHLT_HIPM'] = 5
0244             wmsplit['RECODR2_2016reHLT_HIPM'] = 1
0245             wmsplit['RECODR2_2016reHLT_skimSingleMu'] = 1
0246             wmsplit['RECODR2_2016reHLT_skimDoubleEG'] = 1
0247             wmsplit['RECODR2_2016reHLT_skimMuonEG'] = 1
0248             wmsplit['RECODR2_2016reHLT_skimJetHT'] = 1
0249             wmsplit['RECODR2_2016reHLT_skimMET'] = 1
0250             wmsplit['RECODR2_2016reHLT_skimSinglePh'] = 1
0251             wmsplit['RECODR2_2016reHLT_skimMuOnia'] = 1
0252             wmsplit['RECODR2_2016reHLT_skimSingleMu_HIPM'] = 1
0253             wmsplit['RECODR2_2016reHLT_skimDoubleEG_HIPM'] = 1
0254             wmsplit['RECODR2_2016reHLT_skimMuonEG_HIPM'] = 1
0255             wmsplit['RECODR2_2016reHLT_skimJetHT_HIPM'] = 1
0256             wmsplit['RECODR2_2016reHLT_skimMET_HIPM'] = 1
0257             wmsplit['RECODR2_2016reHLT_skimSinglePh_HIPM'] = 1
0258             wmsplit['RECODR2_2016reHLT_skimMuOnia_HIPM'] = 1
0259             wmsplit['RECODR2_2017reHLT_Prompt'] = 1
0260             wmsplit['RECODR2_2017reHLT_skimSingleMu_Prompt_Lumi'] = 1
0261             wmsplit['RECODR2_2017reHLT_skimDoubleEG_Prompt'] = 1
0262             wmsplit['RECODR2_2017reHLT_skimMET_Prompt'] = 1
0263             wmsplit['RECODR2_2017reHLT_skimMuOnia_Prompt'] = 1
0264             wmsplit['RECODR2_2017reHLT_Prompt_L1TEgDQM'] = 1
0265             wmsplit['RECODR2_2018reHLT_Prompt'] = 1
0266             wmsplit['RECODR2_2018reHLT_skimSingleMu_Prompt_Lumi'] = 1
0267             wmsplit['RECODR2_2018reHLT_skimDoubleEG_Prompt'] = 1
0268             wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt'] = 1
0269             wmsplit['RECODR2_2018reHLT_skimMET_Prompt'] = 1
0270             wmsplit['RECODR2_2018reHLT_skimMuOnia_Prompt'] = 1
0271             wmsplit['RECODR2_2018reHLT_skimEGamma_Prompt_L1TEgDQM'] = 1
0272             wmsplit['RECODR2_2018reHLT_skimMuonEG_Prompt'] = 1
0273             wmsplit['RECODR2_2018reHLT_skimCharmonium_Prompt'] = 1
0274             wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_HEfail'] = 1
0275             wmsplit['RECODR2_2018reHLT_skimJetHT_Prompt_BadHcalMitig'] = 1
0276             wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Prompt'] = 1
0277             wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Prompt'] = 1
0278             wmsplit['RECODR2_2018reHLT_ZBPrompt'] = 1
0279             wmsplit['RECODR2_2018reHLT_Offline'] = 1
0280             wmsplit['RECODR2_2018reHLT_skimSingleMu_Offline_Lumi'] = 1
0281             wmsplit['RECODR2_2018reHLT_skimDoubleEG_Offline'] = 1
0282             wmsplit['RECODR2_2018reHLT_skimJetHT_Offline'] = 1
0283             wmsplit['RECODR2_2018reHLT_skimMET_Offline'] = 1
0284             wmsplit['RECODR2_2018reHLT_skimMuOnia_Offline'] = 1
0285             wmsplit['RECODR2_2018reHLT_skimEGamma_Offline_L1TEgDQM'] = 1
0286             wmsplit['RECODR2_2018reHLT_skimMuonEG_Offline'] = 1
0287             wmsplit['RECODR2_2018reHLT_skimCharmonium_Offline'] = 1
0288             wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_HEfail'] = 1
0289             wmsplit['RECODR2_2018reHLT_skimJetHT_Offline_BadHcalMitig'] = 1
0290             wmsplit['RECODR2_2018reHLTAlCaTkCosmics_Offline'] = 1
0291             wmsplit['RECODR2_2018reHLT_skimDisplacedJet_Offline'] = 1
0292             wmsplit['RECODR2_2018reHLT_ZBOffline'] = 1
0293             wmsplit['HLTDR2_50ns'] = 1
0294             wmsplit['HLTDR2_25ns'] = 1
0295             wmsplit['HLTDR2_2016'] = 1
0296             wmsplit['HLTDR2_2017'] = 1
0297             wmsplit['HLTDR2_2018'] = 1
0298             wmsplit['HLTDR2_2018_BadHcalMitig'] = 1
0299             wmsplit['Hadronizer'] = 1
0300             wmsplit['DIGIUP15'] = 1
0301             wmsplit['RECOUP15'] = 1
0302             wmsplit['RECOAODUP15'] = 5
0303             wmsplit['DBLMINIAODMCUP15NODQM'] = 5
0304             wmsplit['Digi'] = 5
0305             wmsplit['Reco'] = 5
0306             wmsplit['DigiPU'] = 1
0307             wmsplit['RecoPU'] = 1
0308             wmsplit['RECOHID11'] = 1
0309             wmsplit['DIGIUP17'] = 1
0310             wmsplit['RECOUP17'] = 1
0311             wmsplit['DIGIUP17_PU25'] = 1
0312             wmsplit['RECOUP17_PU25'] = 1
0313             wmsplit['DIGICOS_UP16'] = 1
0314             wmsplit['RECOCOS_UP16'] = 1
0315             wmsplit['DIGICOS_UP17'] = 1
0316             wmsplit['RECOCOS_UP17'] = 1
0317             wmsplit['DIGICOS_UP18'] = 1
0318             wmsplit['RECOCOS_UP18'] = 1
0319             wmsplit['DIGICOS_UP21'] = 1
0320             wmsplit['RECOCOS_UP21'] = 1
0321             wmsplit['HYBRIDRepackHI2015VR'] = 1
0322             wmsplit['HYBRIDZSHI2015'] = 1
0323             wmsplit['RECOHID15'] = 1
0324             wmsplit['RECOHID18'] = 1
0325             wmsplit['HLTDR3_2023'] = 1
0326             wmsplit['RECONANORUN3_reHLT'] = 1
0327             wmsplit['HARVESTRUN3'] = 1
0328             # automate for phase 2
0329             from .upgradeWorkflowComponents import upgradeKeys
0330             for key in upgradeKeys['Run4']:
0331                 if 'PU' not in key:
0332                     continue
0333 
0334                 wmsplit['DigiTriggerPU_' + key] = 1
0335                 wmsplit['RecoGlobalPU_' + key] = 1
0336 
0337         except Exception as ex:
0338             print('Exception while building a wmsplit dictionary: %s' % (str(ex)))
0339             return {}
0340 
0341         return wmsplit
0342 
0343     def prepare(self, mReader, directories, mode='init'):
0344         wmsplit = MatrixInjector.get_wmsplit()
0345         acqEra=False
0346         for (n,dir) in directories.items():
0347             chainDict=copy.deepcopy(self.defaultChain)
0348             print("inspecting",dir)
0349             nextHasDSInput=None
0350             for (x,s) in mReader.workFlowSteps.items():
0351                 #x has the format (num, prefix)
0352                 #s has the format (num, name, commands, stepList)
0353                 if x[0]==n:
0354                     #print "found",n,s[3]
0355                     #chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
0356                     index=0
0357                     splitForThisWf=None
0358                     thisLabel=self.speciallabel
0359                     #if 'HARVESTGEN' in s[3]:
0360                     if len( [step for step in s[3] if "HARVESTGEN" in step] )>0:
0361                         chainDict['TimePerEvent']=0.01
0362                         thisLabel=thisLabel+"_gen"
0363                     # for double miniAOD test
0364                     if len( [step for step in s[3] if "DBLMINIAODMCUP15NODQM" in step] )>0:
0365                         thisLabel=thisLabel+"_dblMiniAOD"
0366                     processStrPrefix=''
0367                     setPrimaryDs=None
0368                     nanoedmGT=''
0369                     for step in s[3]:
0370                         
0371                         if 'INPUT' in step or (not isinstance(s[2][index],str)):
0372                             nextHasDSInput=s[2][index]
0373 
0374                         else:
0375 
0376                             if (index==0):
0377                                 #first step and not input -> gen part
0378                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultScratch))
0379                                 try:
0380                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0381                                 except:
0382                                     print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0383                                     return -15
0384 
0385                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']='RelVal'+s[1].split('+')[0]
0386                                 if not '--relval' in s[2][index]:
0387                                     print('Impossible to create task from scratch without splitting information with --relval')
0388                                     return -12
0389                                 else:
0390                                     arg=s[2][index].split()
0391                                     ns=list(map(int,arg[len(arg) - arg[-1::-1].index('--relval')].split(',')))
0392                                     chainDict['nowmTasklist'][-1]['RequestNumEvents'] = ns[0]
0393                                     chainDict['nowmTasklist'][-1]['EventsPerJob'] = ns[1]
0394                                     chainDict['nowmTasklist'][-1]['EventsPerLumi'] = ns[1]
0395                                     #overwrite EventsPerLumi if numberEventsInLuminosityBlock is set in cmsDriver
0396                                     if 'numberEventsInLuminosityBlock' in s[2][index]:
0397                                         nEventsInLuminosityBlock = re.findall('process.source.numberEventsInLuminosityBlock=cms.untracked.uint32\\(([ 0-9 ]*)\\)', s[2][index],re.DOTALL)
0398                                         if nEventsInLuminosityBlock[-1].isdigit() and int(nEventsInLuminosityBlock[-1]) < ns[1]:
0399                                             chainDict['nowmTasklist'][-1]['EventsPerLumi'] = int(nEventsInLuminosityBlock[-1])
0400                                     if(self.numberEventsInLuminosityBlock > 0 and self.numberEventsInLuminosityBlock <= ns[1]):
0401                                         chainDict['nowmTasklist'][-1]['EventsPerLumi'] = self.numberEventsInLuminosityBlock
0402                                 if 'FASTSIM' in s[2][index] or '--fast' in s[2][index]:
0403                                     thisLabel+='_FastSim'
0404                                 if 'lhe' in s[2][index] in s[2][index]:
0405                                     chainDict['nowmTasklist'][-1]['LheInputFiles'] =True
0406 
0407                             elif nextHasDSInput:
0408                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultInput))
0409                                 try:
0410                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0411                                 except:
0412                                     print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0413                                     return -15
0414                                 chainDict['nowmTasklist'][-1]['InputDataset']=nextHasDSInput.dataSet
0415                                 if ('DQMHLTonRAWAOD' in step) :
0416                                     chainDict['nowmTasklist'][-1]['IncludeParents']=True
0417                                 splitForThisWf=nextHasDSInput.split
0418                                 chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0419                                 if step in wmsplit:
0420                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0421                                 # get the run numbers or #events
0422                                 if len(nextHasDSInput.run):
0423                                     chainDict['nowmTasklist'][-1]['RunWhitelist']=nextHasDSInput.run
0424                                 if len(nextHasDSInput.ls):
0425                                     chainDict['nowmTasklist'][-1]['LumiList']=nextHasDSInput.ls
0426                                 #print "what is s",s[2][index]
0427                                 if '--data' in s[2][index] and nextHasDSInput.label:
0428                                     thisLabel+='_RelVal_%s'%nextHasDSInput.label
0429                                 if 'filter' in chainDict['nowmTasklist'][-1]['nowmIO']:
0430                                     print("This has an input DS and a filter sequence: very likely to be the PyQuen sample")
0431                                     processStrPrefix='PU_'
0432                                     setPrimaryDs = 'RelVal'+s[1].split('+')[0]
0433                                     if setPrimaryDs:
0434                                         chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0435                                 nextHasDSInput=None
0436                                 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0437                                     chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0438                                     chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0439                             else:
0440                                 #not first step and no inputDS
0441                                 chainDict['nowmTasklist'].append(copy.deepcopy(self.defaultTask))
0442                                 try:
0443                                     chainDict['nowmTasklist'][-1]['nowmIO']=json.loads(open('%s/%s.io'%(dir,step)).read())
0444                                 except:
0445                                     print("Failed to find",'%s/%s.io'%(dir,step),".The workflows were probably not run on cfg not created")
0446                                     return -15
0447                                 if splitForThisWf:
0448                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=splitForThisWf
0449                                 if step in wmsplit:
0450                                     chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit[step]
0451                                 if 'GPU' in step and self.RequiresGPU != 'forbidden':
0452                                     chainDict['nowmTasklist'][-1]['RequiresGPU'] = self.RequiresGPU
0453                                     chainDict['nowmTasklist'][-1]['GPUParams']=json.dumps(self.defaultGPUParams)
0454 
0455                             # change LumisPerJob for Hadronizer steps. 
0456                             if 'Hadronizer' in step: 
0457                                 chainDict['nowmTasklist'][-1]['LumisPerJob']=wmsplit['Hadronizer']
0458 
0459                             #print step
0460                             chainDict['nowmTasklist'][-1]['TaskName']=step
0461                             if setPrimaryDs:
0462                                 chainDict['nowmTasklist'][-1]['PrimaryDataset']=setPrimaryDs
0463                             chainDict['nowmTasklist'][-1]['ConfigCacheID']='%s/%s.py'%(dir,step)
0464                             chainDict['nowmTasklist'][-1]['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] # copy to the proper parameter name
0465                             chainDict['GlobalTag']=chainDict['nowmTasklist'][-1]['nowmIO']['GT'] #set in general to the last one of the chain
0466                             if 'NANOEDM' in step :
0467                                 nanoedmGT = chainDict['nowmTasklist'][-1]['nowmIO']['GT']
0468                             if 'NANOMERGE' in step :
0469                                 chainDict['GlobalTag'] = nanoedmGT
0470                             if 'pileup' in chainDict['nowmTasklist'][-1]['nowmIO']:
0471                                 chainDict['nowmTasklist'][-1]['MCPileup']=chainDict['nowmTasklist'][-1]['nowmIO']['pileup']
0472                             if '--pileup ' in s[2][index]:      # catch --pileup (scenarion) and not --pileup_ (dataset to be mixed) => works also making PRE-MIXed dataset
0473                                 pileupString = s[2][index].split()[s[2][index].split().index('--pileup')+1]
0474                                 processStrPrefix='PU_'          # take care of pu overlay done with GEN-SIM mixing
0475                                 if pileupString.find('25ns')  > 0 :
0476                                     processStrPrefix='PU25ns_'
0477                                 elif pileupString.find('50ns')  > 0 :
0478                                     processStrPrefix='PU50ns_'
0479                                 elif 'nopu' in pileupString.lower():
0480                                     processStrPrefix=''
0481                             if 'premix_stage2' in s[2][index] and '--pileup_input' in s[2][index]: # take care of pu overlay done with DIGI mixing of premixed events
0482                                 if s[2][index].split()[ s[2][index].split().index('--pileup_input')+1  ].find('25ns')  > 0 :
0483                                     processStrPrefix='PUpmx25ns_'
0484                                 elif s[2][index].split()[ s[2][index].split().index('--pileup_input')+1  ].find('50ns')  > 0 :
0485                                     processStrPrefix='PUpmx50ns_'
0486 
0487                             if acqEra:
0488                                 #chainDict['AcquisitionEra'][step]=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
0489                                 chainDict['AcquisitionEra'][step]=chainDict['CMSSWVersion']
0490                                 chainDict['ProcessingString'][step]=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0491                                 if 'NANOMERGE' in step :
0492                                     chainDict['ProcessingString'][step]=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0493                             else:
0494                                 #chainDict['nowmTasklist'][-1]['AcquisitionEra']=(chainDict['CMSSWVersion']+'-PU_'+chainDict['nowmTasklist'][-1]['GlobalTag']).replace('::All','')+thisLabel
0495                                 chainDict['nowmTasklist'][-1]['AcquisitionEra']=chainDict['CMSSWVersion']
0496                                 chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+chainDict['nowmTasklist'][-1]['GlobalTag'].replace('::All','').replace('-','_')+thisLabel
0497                                 if 'NANOMERGE' in step :
0498                                     chainDict['nowmTasklist'][-1]['ProcessingString']=processStrPrefix+nanoedmGT.replace('::All','').replace('-','_')+thisLabel
0499 
0500                             if (self.batchName):
0501                                 chainDict['nowmTasklist'][-1]['Campaign'] = chainDict['nowmTasklist'][-1]['AcquisitionEra']+self.batchName
0502 
0503                             # specify different ProcessingString for double miniAOD dataset
0504                             if ('DBLMINIAODMCUP15NODQM' in step): 
0505                                 chainDict['nowmTasklist'][-1]['ProcessingString']=chainDict['nowmTasklist'][-1]['ProcessingString']+'_miniAOD' 
0506 
0507                             if( chainDict['nowmTasklist'][-1]['Multicore'] ):
0508                                 # the scaling factor of 1.2GB / thread is empirical and measured on a SECOND round of tests with PU samples
0509                                 # the number of threads is NO LONGER assumed to be the same for all tasks
0510                                 # https://hypernews.cern.ch/HyperNews/CMS/get/edmFramework/3509/1/1/1.html
0511                                 # now change to 1.5GB / additional thread according to discussion:
0512                                 # https://hypernews.cern.ch/HyperNews/CMS/get/relval/4817/1/1.html
0513 #                                chainDict['nowmTasklist'][-1]['Memory'] = 3000 + int( chainDict['nowmTasklist'][-1]['Multicore']  -1 )*1500
0514                                 chainDict['nowmTasklist'][-1]['Memory'] = self.memoryOffset + int( chainDict['nowmTasklist'][-1]['Multicore']  -1 ) * self.memPerCore
0515 
0516                         index+=1
0517                     #end of loop through steps
0518                     chainDict['RequestString']='RV'+chainDict['CMSSWVersion']+s[1].split('+')[0]
0519                     if processStrPrefix or thisLabel:
0520                         chainDict['RequestString']+='_'+processStrPrefix+thisLabel
0521                     #check candidate WF name
0522                     self.candidateWFName = self.user+'_'+chainDict['RequestString']
0523                     if (len(self.candidateWFName)>MAXWORKFLOWLENGTH):
0524                         self.longWFName.append(self.candidateWFName)
0525 
0526 ### PrepID
0527                     chainDict['PrepID'] = chainDict['CMSSWVersion']+'__'+self.batchTime+'-'+s[1].split('+')[0]
0528                     if(self.batchName):
0529                         chainDict['PrepID'] = chainDict['CMSSWVersion']+self.batchName+'-'+s[1].split('+')[0]
0530                         if( 'HIN' in self.batchName ):
0531                             chainDict['SubRequestType'] = "HIRelVal"
0532                         
0533             #wrap up for this one
0534             import pprint
0535             #print 'wrapping up'
0536             #pprint.pprint(chainDict)
0537             #loop on the task list
0538             for i_second in reversed(range(len(chainDict['nowmTasklist']))):
0539                 t_second=chainDict['nowmTasklist'][i_second]
0540                 #print "t_second taskname", t_second['TaskName']
0541                 if 'primary' in t_second['nowmIO']:
0542                     #print t_second['nowmIO']['primary']
0543                     primary=t_second['nowmIO']['primary'][0].replace('file:','')
0544                     for i_input in reversed(range(0,i_second)):
0545                         t_input=chainDict['nowmTasklist'][i_input]
0546                         for (om,o) in t_input['nowmIO'].items():
0547                             if primary in o:
0548                                 #print "found",primary,"procuced by",om,"of",t_input['TaskName']
0549                                 #ad-hoc fix due to restriction in TaskName of 50 characters
0550                                 if (len(t_input['TaskName'])>50):
0551                                     if (t_input['TaskName'].find('GenSim') != -1):
0552                                         t_input['TaskName'] = 'GenSimFull'
0553                                     if (t_input['TaskName'].find('Hadronizer') != -1):
0554                                         t_input['TaskName'] = 'HadronizerFull'
0555                                 t_second['InputTask'] = t_input['TaskName']
0556                                 t_second['InputFromOutputModule'] = om
0557                                 #print 't_second',pprint.pformat(t_second)
0558                                 if t_second['TaskName'].startswith('HARVEST'):
0559                                     chainDict.update(copy.deepcopy(self.defaultHarvest))
0560                                     if "_RD" in t_second['TaskName']:
0561                                         chainDict['DQMHarvestUnit'] = "multiRun"
0562                                     chainDict['DQMConfigCacheID']=t_second['ConfigCacheID']
0563                                     ## the info are not in the task specific dict but in the general dict
0564                                     #t_input.update(copy.deepcopy(self.defaultHarvest))
0565                                     #t_input['DQMConfigCacheID']=t_second['ConfigCacheID']
0566                                 break
0567 
0568             # agreed changes for wm injection:
0569             # - Campaign: *optional* string during creation. It will default to AcqEra value if possible.  
0570             #             Otherwise it will be empty.
0571             # - AcquisitionEra: *mandatory* string at request level during creation. *optional* string
0572             #                   at task level during creation. "optional" during assignment.
0573             # - ProcessingString: *mandatory* string at request level during creation. *optional* string
0574             #                     at task level during creation. "optional" during assignment.
0575             # - ProcessingVersion: *optional* during creation (default 1). *optional* during assignment.
0576             # 
0577             # Which requires following changes here:
0578             #  - reset Global AcuisitionEra, ProcessingString to be the one in the first task
0579             #  - and also Campaign to be always the same as the AcquisitionEra
0580 
0581             if acqEra:
0582                 chainDict['AcquisitionEra'] = chainDict['AcquisitionEra'].values()[0] 
0583                 chainDict['ProcessingString'] = chainDict['ProcessingString'].values()[0]
0584             else:
0585                 chainDict['AcquisitionEra'] = chainDict['nowmTasklist'][0]['AcquisitionEra']
0586                 chainDict['ProcessingString'] = chainDict['nowmTasklist'][0]['ProcessingString']
0587                 
0588 #####  batch name appended to Campaign name
0589 #            chainDict['Campaign'] = chainDict['AcquisitionEra']
0590             chainDict['Campaign'] = chainDict['AcquisitionEra']+self.batchName
0591                
0592             ## clean things up now
0593             itask=0
0594             if self.keep:
0595                 for i in self.keep:
0596                     if isinstance(i, int) and i < len(chainDict['nowmTasklist']):
0597                         chainDict['nowmTasklist'][i]['KeepOutput']=True
0598             for (i,t) in enumerate(chainDict['nowmTasklist']):
0599                 if t['TaskName'].startswith('HARVEST'):
0600                     continue
0601                 if not self.keep:
0602                     t['KeepOutput']=True
0603                 elif t['TaskName'] in self.keep:
0604                     t['KeepOutput']=True
0605                 if t['TaskName'].startswith('HYBRIDRepackHI2015VR'):
0606                     t['KeepOutput']=False
0607                 t.pop('nowmIO')
0608                 itask+=1
0609                 chainDict['Task%d'%(itask)]=t
0610 
0611 
0612             ## 
0613 
0614 
0615             ## provide the number of tasks
0616             chainDict['TaskChain']=itask#len(chainDict['nowmTasklist'])
0617             
0618             chainDict.pop('nowmTasklist')
0619             self.chainDicts[n]=chainDict
0620 
0621             
0622         return 0
0623 
0624     def uploadConf(self,filePath,label,where):
0625         labelInCouch=self.label+'_'+label
0626         cacheName=filePath.split('/')[-1]
0627         if self.testMode:
0628             self.count+=1
0629             print('\tFake upload of',filePath,'to couch with label',labelInCouch)
0630             return self.count
0631         else:
0632             try:
0633                 from modules.wma import upload_to_couch,DATABASE_NAME
0634             except:
0635                 print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0636                 print('\n\t QUIT\n')
0637                 sys.exit(-16)
0638 
0639             if cacheName in self.couchCache:
0640                 print("Not re-uploading",filePath,"to",where,"for",label)
0641                 cacheId=self.couchCache[cacheName]
0642             else:
0643                 print("Loading",filePath,"to",where,"for",label)
0644                 ## totally fork the upload to couch to prevent cross loading of process configurations
0645                 pool = multiprocessing.Pool(1)
0646                 cacheIds = pool.map( upload_to_couch_oneArg, [(filePath,labelInCouch,self.user,self.group,where)] )
0647                 cacheId = cacheIds[0]
0648                 self.couchCache[cacheName]=cacheId
0649             return cacheId
0650     
0651     def upload(self):
0652         for (n,d) in self.chainDicts.items():
0653             for it in d:
0654                 if it.startswith("Task") and it!='TaskChain':
0655                     #upload
0656                     couchID=self.uploadConf(d[it]['ConfigCacheID'],
0657                                             str(n)+d[it]['TaskName'],
0658                                             d['ConfigCacheUrl']
0659                                             )
0660                     print(d[it]['ConfigCacheID']," uploaded to couchDB for",str(n),"with ID",couchID)
0661                     d[it]['ConfigCacheID']=couchID
0662                 if it =='DQMConfigCacheID':
0663                     couchID=self.uploadConf(d['DQMConfigCacheID'],
0664                                             str(n)+'harvesting',
0665                                             d['ConfigCacheUrl']
0666                                             )
0667                     print(d['DQMConfigCacheID'],"uploaded to couchDB for",str(n),"with ID",couchID)
0668                     d['DQMConfigCacheID']=couchID
0669                         
0670             
0671     def submit(self):
0672         try:
0673             from modules.wma import makeRequest,approveRequest
0674             from wmcontrol import random_sleep
0675             print('\n\tFound wmcontrol\n')
0676         except:
0677             print('\n\tUnable to find wmcontrol modules. Please include it in your python path\n')
0678             if not self.testMode:
0679                 print('\n\t QUIT\n')
0680                 sys.exit(-17)
0681 
0682         import pprint
0683         for (n,d) in self.chainDicts.items():
0684             if self.testMode:
0685                 print("Only viewing request",n)
0686                 print(pprint.pprint(d))
0687             else:
0688                 #submit to wmagent each dict
0689                 print("For eyes before submitting",n)
0690                 print(pprint.pprint(d))
0691                 print("Submitting",n,"...........")
0692                 workFlow=makeRequest(self.wmagent,d,encodeDict=True)
0693                 print("...........",n,"submitted")
0694                 random_sleep()
0695         if self.testMode and len(self.longWFName)>0:
0696             print("\n*** WARNING: "+str(len(self.longWFName))+" workflows have too long names for submission (>"+str(MAXWORKFLOWLENGTH)+ "characters) ***")
0697             print('\n'.join(self.longWFName))