Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:29:04

0001 #!/usr/bin/env python3
0002 #____________________________________________________________
0003 #
0004 #  BeamSpotWorkflow
0005 #
0006 # A very complicate way to automatize the beam spot workflow
0007 #
0008 # Francisco Yumiceva, Lorenzo Uplegger
0009 # yumiceva@fnal.gov, uplegger@fnal.gov
0010 #
0011 # Fermilab, 2010
0012 #
0013 #____________________________________________________________
0014 
0015 """
0016    BeamSpotWorkflow.py
0017 
0018    A very complicate script to upload the results into the DB
0019 
0020    usage: %prog -d <data file/directory> -t <tag name>
0021    -c, --cfg = CFGFILE : Use a different configuration file than the default
0022    -l, --lock = LOCK   : Create a lock file to have just one script running 
0023    -o, --overwrite     : Overwrite results files when copying.
0024    -T, --Test          : Upload files to Test dropbox for data validation.   
0025    -u, --upload        : Upload files to offline drop box via scp.
0026    -z, --zlarge        : Enlarge sigmaZ to 10 +/- 0.005 cm.
0027 
0028    Francisco Yumiceva (yumiceva@fnal.gov)
0029    Lorenzo Uplegger   (send an email to Francisco)
0030    Fermilab 2010
0031    
0032 """
0033 from __future__ import print_function
0034 
0035 
0036 from builtins import range
0037 import sys,os
0038 import subprocess, re, time
0039 import datetime
0040 import configparser as ConfigParser
0041 import xmlrpclib
0042 from BeamSpotObj import BeamSpot
0043 from IOVObj import IOV
0044 from CommonMethods import *
0045 
0046 try: # FUTURE: Python 2.6, prior to 2.6 requires simplejson
0047     import json
0048 except:
0049     try:
0050         import simplejson as json
0051     except:
0052         error = "Please set a crab environment in order to get the proper JSON lib"
0053         exit(error)
0054 
0055 #####################################################################################
0056 # General functions
0057 #####################################################################################
0058 def getLastUploadedIOV(tagName,destDB="oracle://cms_orcoff_prod/CMS_COND_31X_BEAMSPOT"):
0059     #return 582088327592295
0060     listIOVCommand = "cmscond_list_iov -c " + destDB + " -P /afs/cern.ch/cms/DB/conddb -t " + tagName 
0061     dbError = subprocess.getstatusoutput( listIOVCommand )
0062     if dbError[0] != 0 :
0063         if dbError[1].find("metadata entry \"" + tagName + "\" does not exist") != -1:
0064             print("Creating a new tag because I got the following error contacting the DB")
0065             print(dbError[1])
0066             return 1
0067             #return 133928
0068         else:
0069             exit("ERROR: Can\'t connect to db because:\n" + dbError[1])
0070 
0071 
0072     aCommand = listIOVCommand + " | grep DB= | tail -1 | awk \'{print $1}\'"
0073     output = subprocess.getstatusoutput( aCommand )
0074 
0075     #WARNING when we pass to lumi IOV this should be long long
0076     if output[1] == '':
0077         exit("ERROR: The tag " + tagName + " exists but I can't get the value of the last IOV")
0078 
0079     return long(output[1])
0080 
0081 ########################################################################
0082 def getListOfFilesToProcess(dataSet,lastRun=-1):
0083     queryCommand = "dbs --search --query \"find file where dataset=" + dataSet
0084     if lastRun != -1:
0085         queryCommand = queryCommand + " and run > " + str(lastRun)
0086     queryCommand = queryCommand + "\" | grep .root"    
0087 #    print " >> " + queryCommand
0088     output = subprocess.getstatusoutput( queryCommand )
0089     return output[1].split('\n')
0090 
0091 ########################################################################
0092 def getNumberOfFilesToProcessForRun(dataSet,run):
0093     queryCommand = "dbs --search --query \"find file where dataset=" + dataSet + " and run = " + str(run) + "\" | grep .root"
0094     #print " >> " + queryCommand
0095     output = subprocess.getstatusoutput( queryCommand )
0096     if output[0] != 0:
0097         return 0
0098     else:
0099         return len(output[1].split('\n'))
0100 
0101 ########################################################################
0102 def getListOfRunsAndLumiFromDBS(dataSet,lastRun=-1):
0103     datasetList = dataSet.split(',')
0104     outputList = []
0105     for data in datasetList:
0106         queryCommand = "dbs --search --query \"find run,lumi where dataset=" + data
0107         if lastRun != -1:
0108             queryCommand = queryCommand + " and run > " + str(lastRun)
0109         queryCommand += "\""
0110         print(" >> " + queryCommand)
0111         output = []
0112         for i in range(0,3):
0113             output = subprocess.getstatusoutput( queryCommand )
0114             if output[0] == 0 and not (output[1].find("ERROR") != -1 or output[1].find("Error") != -1) :
0115                 break
0116         if output[0] != 0:
0117             exit("ERROR: I can't contact DBS for the following reason:\n" + output[1])
0118         #print output[1]
0119         tmpList = output[1].split('\n')
0120         for file in tmpList:
0121             outputList.append(file)
0122     runsAndLumis = {}
0123     for out in outputList:
0124         regExp = re.search('(\d+)\s+(\d+)',out)
0125         if regExp:
0126             run  = long(regExp.group(1))
0127             lumi = long(regExp.group(2))
0128             if not run in runsAndLumis:
0129                 runsAndLumis[run] = []
0130             runsAndLumis[run].append(lumi)
0131 
0132 #    print runsAndLumis
0133 #    exit("ok")
0134     return runsAndLumis
0135 
0136 #####################################################################################
0137 def getListOfRunsAndLumiFromFile(firstRun=-1,fileName=""):
0138     file = open(fileName);
0139     jsonFile = file.read();
0140     file.close()
0141     jsonList=json.loads(jsonFile);
0142 
0143     selected_dcs = {};
0144     for element in jsonList:
0145         selected_dcs[long(element)]=jsonList[element]
0146     return selected_dcs
0147 
0148 ########################################################################
0149 def getListOfRunsAndLumiFromRR(firstRun=-1):
0150     RunReg  ="http://pccmsdqm04.cern.ch/runregistry"
0151     #RunReg  = "http://localhost:40010/runregistry"
0152     #Dataset=%Online%
0153     Group   = "Collisions10"
0154 
0155     # get handler to RR XML-RPC server
0156     FULLADDRESS=RunReg + "/xmlrpc"
0157     #print "RunRegistry from: ",FULLADDRESS
0158     server = xmlrpclib.ServerProxy(FULLADDRESS)
0159     #sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {datasetName} LIKE '" + Dataset + "'"
0160     sel_runtable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) 
0161     #sel_dcstable="{groupName} ='" + Group + "' and {runNumber} > " + str(firstRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
0162 
0163     maxAttempts = 3;
0164     tries = 0;
0165     while tries<maxAttempts:
0166         try:
0167             run_data = server.DataExporter.export('RUN'           , 'GLOBAL', 'csv_runs', sel_runtable)
0168             #dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json'    , sel_dcstable)
0169             break
0170         except:
0171             print("Something wrong in accessing runregistry, retrying in 2s....", tries, "/", maxAttempts)
0172             tries += 1
0173             time.sleep(2)
0174         if tries==maxAttempts:
0175             error = "Run registry unaccessible.....exiting now"
0176             return {};
0177 
0178 
0179     listOfRuns=[]
0180     for line in run_data.split("\n"):
0181         run=line.split(',')[0]
0182         if run.isdigit():
0183             listOfRuns.append(run)
0184 
0185 
0186     firstRun = listOfRuns[len(listOfRuns)-1];
0187     lastRun  = listOfRuns[0];
0188     sel_dcstable="{groupName} ='" + Group + "' and {runNumber} >= " + str(firstRun) + " and {runNumber} <= " + str(lastRun) + " and {parDcsBpix} = 1 and {parDcsFpix} = 1 and {parDcsTibtid} = 1 and {parDcsTecM} = 1 and {parDcsTecP} = 1 and {parDcsTob} = 1 and {parDcsEbminus} = 1 and {parDcsEbplus} = 1 and {parDcsEeMinus} = 1 and {parDcsEePlus} = 1 and {parDcsEsMinus} = 1 and {parDcsEsPlus} = 1 and {parDcsHbheA} = 1 and {parDcsHbheB} = 1 and {parDcsHbheC} = 1 and {parDcsH0} = 1 and {parDcsHf} = 1"
0189 
0190     tries = 0;
0191     while tries<maxAttempts:
0192         try:
0193             #run_data = server.DataExporter.export('RUN'           , 'GLOBAL', 'csv_runs', sel_runtable)
0194             dcs_data = server.DataExporter.export('RUNLUMISECTION', 'GLOBAL', 'json'    , sel_dcstable)
0195             break
0196         except:
0197             print("I was able to get the list of runs and now I am trying to access the detector status, retrying in 2s....", tries, "/", maxAttempts)
0198             tries += 1
0199             time.sleep(2)
0200         if tries==maxAttempts:
0201             error = "Run registry unaccessible.....exiting now"
0202             return {};
0203 
0204     selected_dcs={}
0205     jsonList=json.loads(dcs_data)
0206 
0207     #for element in jsonList:
0208     for element in listOfRuns:
0209         #if element in listOfRuns:
0210         if element in jsonList:
0211             selected_dcs[long(element)]=jsonList[element]
0212         else:
0213             print("WARNING: Run " + element + " is a collision10 run with 0 lumis in Run Registry!") 
0214             selected_dcs[long(element)]= [[]] 
0215     #print selected_dcs        
0216     return selected_dcs
0217 
0218 ########################################################################
0219 def getLastClosedRun(DBSListOfFiles):
0220     runs = []
0221     for file in DBSListOfFiles:
0222         runNumber = getRunNumberFromDBSName(file)
0223         if runs.count(runNumber) == 0: 
0224             runs.append(runNumber)
0225 
0226     if len(runs) <= 1: #No closed run
0227         return -1
0228     else:
0229         runs.sort()
0230         return long(runs[len(runs)-2])
0231 
0232 ########################################################################
0233 def getRunNumberFromFileName(fileName):
0234 #    regExp = re.search('(\D+)_(\d+)_(\d+)_(\d+)',fileName)
0235     regExp = re.search('(\D+)_(\d+)_(\d+)_',fileName)
0236     if not regExp:
0237         return -1
0238     return long(regExp.group(3))
0239 
0240 ########################################################################
0241 def getRunNumberFromDBSName(fileName):
0242     regExp = re.search('(\D+)/(\d+)/(\d+)/(\d+)/(\D+)',fileName)
0243     if not regExp:
0244         return -1
0245     return long(regExp.group(3)+regExp.group(4))
0246 
0247 ########################################################################
0248 def getNewRunList(fromDir,lastUploadedIOV):
0249     newRunList = []
0250     listOfFiles = ls(fromDir,".txt")
0251     runFileMap = {}
0252     for fileName in listOfFiles:
0253         runNumber = getRunNumberFromFileName(fileName) 
0254         if runNumber > lastUploadedIOV:
0255             newRunList.append(fileName)
0256     return newRunList        
0257 
0258 ########################################################################
0259 def selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,newRunList,runListDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout):
0260     runsAndLumisProcessed = {}
0261     runsAndFiles = {}
0262     for fileName in newRunList:
0263         file = open(runListDir+fileName)
0264         for line in file:
0265             if line.find("Runnumber") != -1:
0266                 run = long(line.replace('\n','').split(' ')[1])
0267             elif line.find("LumiRange") != -1:
0268                 lumiLine = line.replace('\n','').split(' ')
0269                 begLumi = long(lumiLine[1])
0270                 endLumi = long(lumiLine[3])
0271                 if begLumi != endLumi:
0272                     error = "The lumi range is greater than 1 for run " + str(run) + " " + line + " in file: " + runListDir + fileName
0273                     exit(error)
0274                 else:
0275                     if not run in runsAndLumisProcessed:
0276                         runsAndLumisProcessed[run] = []
0277                     if begLumi in runsAndLumisProcessed[run]:
0278                         print("Lumi " + str(begLumi) + " in event " + str(run) + " already exist. This MUST not happen but right now I will ignore this lumi!")
0279                     else:    
0280                         runsAndLumisProcessed[run].append(begLumi)
0281         if not run in runsAndFiles:
0282             runsAndFiles[run] = []
0283         runsAndFiles[run].append(fileName)    
0284         file.close()
0285 
0286     rrKeys = sorted(listOfRunsAndLumiFromRR.keys())
0287     dbsKeys = listOfRunsAndLumiFromDBS.keys()
0288     dbsKeys.sort()
0289     #I remove the last entry from DBS since I am not sure it is an already closed run!
0290     lastUnclosedRun = dbsKeys.pop()
0291     #print "Last unclosed run: " + str(lastUnclosedRun)
0292     procKeys = runsAndLumisProcessed.keys()
0293     procKeys.sort()
0294     #print "Run Registry:"    
0295     #print rrKeys
0296     #print "DBS:"    
0297     #print dbsKeys
0298     #print "List:"    
0299     #print procKeys
0300     #print lastUnclosedRun
0301     filesToProcess = []
0302     for run in rrKeys:
0303         RRList = []
0304         for lumiRange in listOfRunsAndLumiFromRR[run]:
0305             if lumiRange != []: 
0306                 for l in range(lumiRange[0],lumiRange[1]+1):
0307                     RRList.append(long(l))
0308         if run in procKeys and run < lastUnclosedRun:
0309             #print "run " + str(run) + " is in procKeys"
0310             if not run in dbsKeys and run != lastUnclosedRun:
0311                 error = "Impossible but run " + str(run) + " has been processed and it is also in the run registry but it is not in DBS!" 
0312                 exit(error)
0313             print("Working on run " + str(run))
0314             nFiles = 0
0315             for data in dataSet.split(','):
0316                 nFiles = getNumberOfFilesToProcessForRun(data,run)
0317                 if nFiles != 0:
0318                     break
0319             if len(runsAndFiles[run]) < nFiles:
0320                 print("I haven't processed all files yet : " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " for run: " + str(run)) 
0321                 if nFiles - len(runsAndFiles[run]) <= missingFilesTolerance:
0322                     timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run)) # resetting this timeout
0323                     timeoutType = timeoutManager("DBS_MISMATCH_Run"+str(run),missingLumisTimeout)
0324                     if timeoutType == 1:
0325                         print("WARNING: I previously set a timeout that expired...I'll continue with the script even if I didn't process all the lumis!")
0326                     else:
0327                         if timeoutType == -1:
0328                             print("WARNING: Setting the DBS_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!")
0329                         else:
0330                             print("WARNING: Timeout DBS_MISMATCH_Run" + str(run) + " is in progress.")
0331                         return filesToProcess
0332                 else:
0333                     timeoutType = timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run),missingLumisTimeout)
0334                     if timeoutType == 1:
0335                         error = "ERROR: I previously set a timeout that expired...I can't continue with the script because there are too many (" + str(nFiles - len(runsAndFiles[run])) + " files missing) and for too long " + str(missingLumisTimeout/3600) + " hours! I will process anyway the runs before this one (" + str(run) + ")"
0336                         sendEmail(mailList,error)
0337                         return filesToProcess
0338                         #exit(error)
0339                     else:
0340                         if timeoutType == -1:
0341                             print("WARNING: Setting the DBS_VERY_BIG_MISMATCH_Run" + str(run) + " timeout because I haven't processed all files!")
0342                         else:
0343                             print("WARNING: Timeout DBS_VERY_BIG_MISMATCH_Run" + str(run) + " is in progress.")
0344                         return filesToProcess
0345 
0346             else:
0347                 timeoutManager("DBS_VERY_BIG_MISMATCH_Run"+str(run))
0348                 timeoutManager("DBS_MISMATCH_Run"+str(run))
0349                 print("I have processed " + str(len(runsAndFiles[run])) + " out of " + str(nFiles) + " files that are in DBS. So I should have all the lumis!") 
0350             errors          = []
0351             badProcessed    = []
0352             badDBSProcessed = []
0353             badDBS          = []
0354             badRRProcessed  = []
0355             badRR           = []
0356             #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
0357             badDBSProcessed,badDBS = compareLumiLists(runsAndLumisProcessed[run],listOfRunsAndLumiFromDBS[run],errors)
0358             for i in range(0,len(errors)):
0359                 errors[i] = errors[i].replace("listA","the processed lumis")
0360                 errors[i] = errors[i].replace("listB","DBS")
0361             #print errors
0362             #print badProcessed
0363             #print badDBS
0364             #exit("ciao")
0365             if len(badDBS) != 0:
0366                 print("This is weird because I processed more lumis than the ones that are in DBS!")
0367             if len(badDBSProcessed) != 0 and run in rrKeys:
0368                 lastError = len(errors)
0369                 #print RRList            
0370                 #It is important for runsAndLumisProcessed[run] to be the first because the comparision is not ==
0371                 badRRProcessed,badRR = compareLumiLists(runsAndLumisProcessed[run],RRList,errors)
0372                 for i in range(0,len(errors)):
0373                     errors[i] = errors[i].replace("listA","the processed lumis")
0374                     errors[i] = errors[i].replace("listB","Run Registry")
0375                 #print errors
0376                 #print badProcessed
0377                 #print badRunRegistry
0378 
0379                 if len(badRRProcessed) != 0:    
0380                     print("I have not processed some of the lumis that are in the run registry for run: " + str(run))
0381                     for lumi in badDBSProcessed:
0382                         if lumi in badRRProcessed:
0383                             badProcessed.append(lumi)
0384                     lenA = len(badProcessed)
0385                     lenB = len(RRList)
0386                     if 100.*lenA/lenB <= dbsTolerancePercent:
0387                         print("WARNING: I didn't process " + str(100.*lenA/lenB) + "% of the lumis but I am within the " + str(dbsTolerancePercent) + "% set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis")
0388                         #print errors
0389                         badProcessed = []
0390                     elif lenA <= dbsTolerance:
0391                         print("WARNING: I didn't process " + str(lenA) + " lumis but I am within the " + str(dbsTolerance) + " lumis set in the configuration. Which corrispond to " + str(lenA) + " out of " + str(lenB) + " lumis")
0392                         #print errors
0393                         badProcessed = []
0394                     else:    
0395                         error = "ERROR: For run " + str(run) + " I didn't process " + str(100.*lenA/lenB) + "% of the lumis and I am not within the " + str(dbsTolerancePercent) + "% set in the configuration. The number of lumis that I didn't process (" + str(lenA) + " out of " + str(lenB) + ") is greater also than the " + str(dbsTolerance) + " lumis that I can tolerate. I can't process runs >= " + str(run) + " but I'll process the runs before!"
0396                         sendEmail(mailList,error)
0397                         print(error)
0398                         return filesToProcess
0399                         #exit(errors)
0400                     #return filesToProcess
0401                 elif len(errors) != 0:
0402                     print("The number of lumi sections processed didn't match the one in DBS but they cover all the ones in the Run Registry, so it is ok!")
0403                     #print errors
0404 
0405             #If I get here it means that I passed or the DBS or the RR test            
0406             if len(badProcessed) == 0:
0407                 for file in runsAndFiles[run]:
0408                     filesToProcess.append(file)
0409             else:
0410                 #print errors
0411                 print("This should never happen because if I have errors I return or exit! Run: " + str(run))
0412         else:
0413             error = "Run " + str(run) + " is in the run registry but it has not been processed yet!"
0414             print(error)
0415             timeoutType = timeoutManager("MISSING_RUNREGRUN_Run"+str(run),missingLumisTimeout)
0416             if timeoutType == 1:
0417                 if len(RRList) <= rrTolerance:
0418                     error = "WARNING: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " but it only had " + str(len(RRList)) + " <= " + str(rrTolerance) + " lumis. So I will continue and ignore it... "
0419                     #print listOfRunsAndLumiFromRR[run]
0420                     print(error)
0421                     #sendEmail(mailList,error)
0422                 else:
0423                     error = "ERROR: I previously set the MISSING_RUNREGRUN_Run" + str(run) + " timeout that expired...I am missing run " + str(run) + " which has " + str(len(RRList)) + " > " + str(rrTolerance) + " lumis. I can't continue but I'll process the runs before this one"
0424                     sendEmail(mailList,error)
0425                     return filesToProcess
0426                     #exit(error)
0427             else:
0428                 if timeoutType == -1:
0429                     print("WARNING: Setting the MISSING_RUNREGRUN_Run" + str(run) + " timeout because I haven't processed a run!")
0430                 else:
0431                     print("WARNING: Timeout MISSING_RUNREGRUN_Run" + str(run) + " is in progress.")
0432                 return filesToProcess
0433 
0434     return filesToProcess
0435 ########################################################################
0436 def compareLumiLists(listA,listB,errors=[],tolerance=0):
0437     lenA = len(listA)
0438     lenB = len(listB)
0439     if lenA < lenB-(lenB*float(tolerance)/100):
0440         errors.append("ERROR: The number of lumi sections is different: listA(" + str(lenA) + ")!=(" + str(lenB) + ")listB")
0441     #else:
0442         #errors.append("Lumi check ok!listA(" + str(lenA) + ")-(" + str(lenB) + ")listB")
0443     #print errors
0444     listA.sort()
0445     listB.sort()
0446     #shorter = lenA
0447     #if lenB < shorter:
0448     #    shorter = lenB
0449     #a = 0
0450     #b = 0
0451     badA = []
0452     badB = []
0453     #print listB
0454     #print listA
0455     #print len(listA)
0456     #print len(listB)
0457     #counter = 1
0458     for lumi in listA:
0459         #print str(counter) + "->" + str(lumi)
0460         #counter += 1
0461         if not lumi in listB:
0462             errors.append("Lumi (" + str(lumi) + ") is in listA but not in listB")
0463             badB.append(lumi)
0464             #print "Bad B: " + str(lumi)
0465     #exit("hola")
0466     for lumi in listB:
0467         if not lumi in listA:
0468             errors.append("Lumi (" + str(lumi) + ") is in listB but not in listA")
0469             badA.append(lumi)
0470             #print "Bad A: " + str(lumi)
0471 
0472     return badA,badB
0473 
0474 ########################################################################
0475 def removeUncompleteRuns(newRunList,dataSet):
0476     processedRuns = {}
0477     for fileName in newRunList:
0478         run = getRunNumberFromFileName(fileName)
0479         if not run in processedRuns:
0480             processedRuns[run] = 0
0481         processedRuns[run] += 1
0482 
0483     for run in processedRuns.keys():   
0484         nFiles = getNumberOfFilesToProcessForRun(dataSet,run)
0485         if processedRuns[run] < nFiles:
0486             print("I haven't processed all files yet : " + str(processedRuns[run]) + " out of " + str(nFiles) + " for run: " + str(run))
0487         else:
0488             print("All files have been processed for run: " + str(run) + " (" + str(processedRuns[run]) + " out of " + str(nFiles) + ")")
0489 
0490 ########################################################################
0491 def aselectFilesToProcess(listOfFilesToProcess,newRunList):
0492     selectedFiles = []
0493     runsToProcess = {}
0494     processedRuns = {}
0495     for file in listOfFilesToProcess:
0496         run = getRunNumberFromDBSName(file)
0497 #        print "To process: " + str(run) 
0498         if run not in runsToProcess:
0499             runsToProcess[run] = 1
0500         else:
0501             runsToProcess[run] = runsToProcess[run] + 1 
0502 
0503     for file in newRunList:
0504         run = getRunNumberFromFileName(file)
0505 #        print "Processed: " + str(run)
0506         if run not in processedRuns:
0507             processedRuns[run] = 1
0508         else:
0509             processedRuns[run] = processedRuns[run] + 1 
0510 
0511     #WARNING: getLastClosedRun MUST also have a timeout otherwise the last run will not be considered
0512     lastClosedRun = getLastClosedRun(listOfFilesToProcess)
0513 #    print "LastClosedRun:-" + str(lastClosedRun) + "-"
0514 
0515     processedRunsKeys = sorted(processedRuns.keys())
0516 
0517     for run in processedRunsKeys:
0518         if run <= lastClosedRun :
0519             print("For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files and in DBS there are " + str(runsToProcess[run]) + " files!")
0520             if not run in runsToProcess:
0521                 exit("ERROR: I have a result file for run " + str(run) + " but it doesn't exist in DBS. Impossible but it happened!")
0522             lumiList = getDBSLumiListForRun(run)
0523             if processedRuns[run] == runsToProcess[run]:
0524                 for file in newRunList:
0525                     if run == getRunNumberFromFileName(file):
0526                         selectedFiles.append(file)
0527             else:
0528                 exit("ERROR: For run " + str(run) + " I have processed " + str(processedRuns[run]) + " files but in DBS there are " + str(runsToProcess[run]) + " files!")
0529     return selectedFiles            
0530 
0531 ########################################################################
0532 def main():
0533     ######### COMMAND LINE OPTIONS ##############
0534     option,args = parse(__doc__)
0535 
0536     ######### Check if there is already a megascript running ########
0537     if option.lock:
0538         setLockName('.' + option.lock)
0539         if checkLock():
0540             print("There is already a megascript runnning...exiting")
0541             return
0542         else:
0543             lock()
0544 
0545 
0546     destDB = 'oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT'
0547     if option.Test:
0548         destDB = 'oracle://cms_orcoff_prep/CMS_COND_BEAMSPOT'
0549 
0550     ######### CONFIGURATION FILE ################
0551     cfgFile = "BeamSpotWorkflow.cfg"    
0552     if option.cfg:
0553         cfgFile = option.cfg
0554     configurationFile = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/scripts/" + cfgFile
0555     configuration     = ConfigParser.ConfigParser()
0556     print('Reading configuration from ', configurationFile)
0557     configuration.read(configurationFile)
0558 
0559     sourceDir             = configuration.get('Common','SOURCE_DIR')
0560     archiveDir            = configuration.get('Common','ARCHIVE_DIR')
0561     workingDir            = configuration.get('Common','WORKING_DIR')
0562     databaseTag           = configuration.get('Common','DBTAG')
0563     dataSet               = configuration.get('Common','DATASET')
0564     fileIOVBase           = configuration.get('Common','FILE_IOV_BASE')
0565     dbIOVBase             = configuration.get('Common','DB_IOV_BASE')
0566     dbsTolerance          = float(configuration.get('Common','DBS_TOLERANCE'))
0567     dbsTolerancePercent   = float(configuration.get('Common','DBS_TOLERANCE_PERCENT'))
0568     rrTolerance           = float(configuration.get('Common','RR_TOLERANCE'))
0569     missingFilesTolerance = float(configuration.get('Common','MISSING_FILES_TOLERANCE'))
0570     missingLumisTimeout   = float(configuration.get('Common','MISSING_LUMIS_TIMEOUT'))
0571     jsonFileName          = configuration.get('Common','JSON_FILE')
0572     mailList              = configuration.get('Common','EMAIL')
0573 
0574     ######### DIRECTORIES SETUP #################
0575     if sourceDir[len(sourceDir)-1] != '/':
0576         sourceDir = sourceDir + '/'
0577     if not dirExists(sourceDir):
0578         error = "ERROR: The source directory " + sourceDir + " doesn't exist!"
0579         sendEmail(mailList,error)
0580         exit(error)
0581 
0582     if archiveDir[len(archiveDir)-1] != '/':
0583         archiveDir = archiveDir + '/'
0584     if not os.path.isdir(archiveDir):
0585         os.mkdir(archiveDir)
0586 
0587     if workingDir[len(workingDir)-1] != '/':
0588         workingDir = workingDir + '/'
0589     if not os.path.isdir(workingDir):
0590         os.mkdir(workingDir)
0591     else:
0592         os.system("rm -f "+ workingDir + "*") 
0593 
0594 
0595     print("Getting last IOV for tag: " + databaseTag)
0596     lastUploadedIOV = 1
0597     if destDB == "oracle://cms_orcon_prod/CMS_COND_31X_BEAMSPOT": 
0598         lastUploadedIOV = getLastUploadedIOV(databaseTag)
0599     else:
0600         lastUploadedIOV = getLastUploadedIOV(databaseTag,destDB)
0601 
0602     #lastUploadedIOV = 133885
0603     #lastUploadedIOV = 575216380019329
0604     if dbIOVBase == "lumiid":
0605         lastUploadedIOV = unpackLumiid(lastUploadedIOV)["run"]
0606 
0607     ######### Get list of files processed after the last IOV  
0608     print("Getting list of files processed after IOV " + str(lastUploadedIOV))
0609     newProcessedRunList      = getNewRunList(sourceDir,lastUploadedIOV)
0610     if len(newProcessedRunList) == 0:
0611         exit("There are no new runs after " + str(lastUploadedIOV))
0612 
0613     ######### Copy files to archive directory
0614     print("Copying files to archive directory")
0615     copiedFiles = []
0616     for i in range(3):
0617         copiedFiles = cp(sourceDir,archiveDir,newProcessedRunList)    
0618         if len(copiedFiles) == len(newProcessedRunList):
0619             break;
0620     if len(copiedFiles) != len(newProcessedRunList):
0621         error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(newProcessedRunList)) 
0622         sendEmail(mailList,error)
0623         exit(error)
0624 
0625 
0626     ######### Get from DBS the list of files after last IOV    
0627     #listOfFilesToProcess = getListOfFilesToProcess(dataSet,lastUploadedIOV) 
0628     print("Getting list of files from DBS")
0629     listOfRunsAndLumiFromDBS = getListOfRunsAndLumiFromDBS(dataSet,lastUploadedIOV)
0630     if len(listOfRunsAndLumiFromDBS) == 0:
0631         exit("There are no files in DBS to process") 
0632     print("Getting list of files from RR")
0633     listOfRunsAndLumiFromRR  = getListOfRunsAndLumiFromRR(lastUploadedIOV) 
0634     if(not listOfRunsAndLumiFromRR):
0635         print("Looks like I can't get anything from the run registry so I'll get the data from the json file " + jsonFileName)
0636         listOfRunsAndLumiFromRR  = getListOfRunsAndLumiFromFile(lastUploadedIOV,jsonFileName) 
0637     ######### Get list of files to process for DB
0638     #selectedFilesToProcess = selectFilesToProcess(listOfFilesToProcess,copiedFiles)
0639     #completeProcessedRuns = removeUncompleteRuns(copiedFiles,dataSet)
0640     #print copiedFiles
0641     #print completeProcessedRuns
0642     #exit("complete")
0643     print("Getting list of files to process")
0644     selectedFilesToProcess = selectFilesToProcess(listOfRunsAndLumiFromDBS,listOfRunsAndLumiFromRR,copiedFiles,archiveDir,dataSet,mailList,dbsTolerance,dbsTolerancePercent,rrTolerance,missingFilesTolerance,missingLumisTimeout)
0645     if len(selectedFilesToProcess) == 0:
0646         exit("There are no files to process")
0647 
0648     #print selectedFilesToProcess
0649     ######### Copy files to working directory
0650     print("Copying files from archive to working directory")
0651     copiedFiles = []
0652     for i in range(3):
0653         copiedFiles = cp(archiveDir,workingDir,selectedFilesToProcess)    
0654         if len(copiedFiles) == len(selectedFilesToProcess):
0655             break;
0656         else:
0657             subprocess.getstatusoutput("rm -rf " + workingDir)
0658     if len(copiedFiles) != len(selectedFilesToProcess):
0659         error = "ERROR: I can't copy more than " + str(len(copiedFiles)) + " files out of " + str(len(selectedFilesToProcess)) + " from " + archiveDir + " to " + workingDir 
0660         sendEmail(mailList,error)
0661         exit(error)
0662 
0663     print("Sorting and cleaning beamlist")
0664     beamSpotObjList = []
0665     for fileName in copiedFiles:
0666         readBeamSpotFile(workingDir+fileName,beamSpotObjList,fileIOVBase)
0667 
0668     sortAndCleanBeamList(beamSpotObjList,fileIOVBase)
0669 
0670     if len(beamSpotObjList) == 0:
0671         error = "WARNING: None of the processed and copied payloads has a valid fit so there are no results. This shouldn't happen since we are filtering using the run register, so there should be at least one good run."
0672         exit(error)
0673 
0674     payloadFileName = "PayloadFile.txt"
0675 
0676     runBased = False
0677     if dbIOVBase == "runnumber":
0678         runBased = True
0679 
0680     payloadList = createWeightedPayloads(workingDir+payloadFileName,beamSpotObjList,runBased)
0681     if len(payloadList) == 0:
0682         error = "WARNING: I wasn't able to create any payload even if I have some BeamSpot objects."
0683         exit(error)
0684 
0685 
0686     tmpPayloadFileName = workingDir + "SingleTmpPayloadFile.txt"
0687     tmpSqliteFileName  = workingDir + "SingleTmpSqliteFile.db"
0688 
0689     writeDBTemplate = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/write2DB_template.py"
0690     readDBTemplate  = os.getenv("CMSSW_BASE") + "/src/RecoVertex/BeamSpotProducer/test/readDB_template.py"
0691     payloadNumber = -1
0692     iovSinceFirst = '0';
0693     iovTillLast   = '0';
0694 
0695     #Creating the final name for the combined sqlite file
0696     uuid = subprocess.getstatusoutput('uuidgen -t')[1]
0697     final_sqlite_file_name = databaseTag + '@' + uuid
0698     sqlite_file     = workingDir + final_sqlite_file_name + ".db"
0699     metadata_file   = workingDir + final_sqlite_file_name + ".txt"
0700 
0701     for payload in payloadList:
0702         payloadNumber += 1
0703         if option.zlarge:
0704             payload.sigmaZ = 10
0705             payload.sigmaZerr = 2.5e-05
0706         tmpFile = file(tmpPayloadFileName,'w')
0707         dumpValues(payload,tmpFile)
0708         tmpFile.close()
0709         if not writeSqliteFile(tmpSqliteFileName,databaseTag,dbIOVBase,tmpPayloadFileName,writeDBTemplate,workingDir):
0710             error = "An error occurred while writing the sqlite file: " + tmpSqliteFileName
0711             exit(error)
0712         readSqliteFile(tmpSqliteFileName,databaseTag,readDBTemplate,workingDir)
0713 
0714         ##############################################################
0715         #WARNING I am not sure if I am packing the right values
0716         if dbIOVBase == "runnumber":
0717             iov_since = str(payload.Run)
0718             iov_till  = iov_since
0719         elif dbIOVBase == "lumiid":
0720             iov_since = str( pack(int(payload.Run), int(payload.IOVfirst)) )
0721             iov_till  = str( pack(int(payload.Run), int(payload.IOVlast)) )
0722         elif dbIOVBase == "timestamp":
0723             error = "ERROR: IOV " + dbIOVBase + " still not implemented."
0724             exit(error)
0725         else:
0726             error = "ERROR: IOV " + dbIOVBase + " unrecognized!"
0727             exit(error)
0728 
0729         if payloadNumber == 0:
0730             iovSinceFirst = iov_since
0731         if payloadNumber == len(payloadList)-1:
0732             iovTillLast   = iov_till
0733 
0734         appendSqliteFile(final_sqlite_file_name + ".db", tmpSqliteFileName, databaseTag, iov_since, iov_till ,workingDir)
0735         os.system("rm -f " + tmpPayloadFileName + " " + tmpSqliteFileName)
0736 
0737 
0738     #### CREATE payload for merged output
0739 
0740     print(" create MERGED payload card for dropbox ...")
0741 
0742     dfile = open(metadata_file,'w')
0743 
0744     dfile.write('destDB '  + destDB        +'\n')
0745     dfile.write('tag '     + databaseTag   +'\n')
0746     dfile.write('inputtag'                 +'\n')
0747     dfile.write('since '   + iovSinceFirst +'\n')
0748     #dfile.write('till '    + iov_till      +'\n')
0749     dfile.write('Timetype '+ dbIOVBase     +'\n')
0750 
0751     ###################################################
0752     # WARNING tagType forced to offline
0753     print("WARNING TAG TYPE forced to be just offline")
0754     tagType = "offline"
0755     checkType = tagType
0756     if tagType == "express":
0757         checkType = "hlt"
0758     dfile.write('IOVCheck ' + checkType + '\n')
0759     dfile.write('usertext Beam spot position\n')
0760 
0761     dfile.close()
0762 
0763 
0764 
0765     if option.upload:
0766         print(" scp files to offline Drop Box")
0767         dropbox = "/DropBox"
0768         if option.Test:
0769             dropbox = "/DropBox_test"
0770         print("UPLOADING TO TEST DB")
0771         uploadSqliteFile(workingDir, final_sqlite_file_name, dropbox)
0772 
0773     archive_sqlite_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + final_sqlite_file_name
0774     archive_results_file_name = "Payloads_" + iovSinceFirst + "_" + iovTillLast + "_" + databaseTag + ".txt"
0775     if not os.path.isdir(archiveDir + 'payloads'):
0776         os.mkdir(archiveDir + 'payloads')
0777     subprocess.getstatusoutput('mv ' + sqlite_file   + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.db')
0778     subprocess.getstatusoutput('mv ' + metadata_file + ' ' + archiveDir + 'payloads/' + archive_sqlite_file_name + '.txt')
0779     subprocess.getstatusoutput('cp ' + workingDir + payloadFileName + ' ' + archiveDir + 'payloads/' + archive_results_file_name)
0780 
0781     print(archiveDir + "payloads/" + archive_sqlite_file_name + '.db')
0782     print(archiveDir + "payloads/" + archive_sqlite_file_name + '.txt')
0783 
0784     rmLock()
0785 
0786 if __name__ == '__main__':
0787     main()