Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-08-15 22:40:10

0001 from __future__ import print_function
0002 import configparser as ConfigParser
0003 import argparse
0004 import shelve
0005 import sys
0006 import os
0007 import subprocess
0008 import threading
0009 import shutil
0010 import time
0011 import re
0012 from helpers import *
0013 
0014 shelve_name = "dump.shelve" # contains all the measurement objects
0015 history_file = "history.log"
0016 clock_interval = 20 # in seconds
0017 delete_logs_after_finish = False  # if it is not desired to keep the log and submit script files
0018 use_caf = False
0019 
0020 def save(name, object):
0021     # in case of multiple threads running this stops potentially problematic file access
0022     global lock
0023     lock.acquire() 
0024     try:
0025         sh = shelve.open(shelve_name)
0026         sh[name] = object
0027         sh.close()
0028     finally:
0029         lock.release()
0030 
0031 class Dataset:
0032     name = ""
0033     nFiles = 0
0034     maxEvents = -1
0035     baseDirectory = ""
0036     sampleType = "data1"
0037     fileList = []
0038     conditions = []
0039     
0040     def __init__(self, config, name):
0041         dsDict = dict(config.items("dataset:{}".format(name)))
0042         self.name = name
0043         self.baseDirectory = dsDict["baseDirectory"]
0044         
0045         self.fileList = []
0046         names = dsDict["fileNames"].split(" ")
0047         for name in names:
0048             parsedNames = replaceAllRanges(name)
0049             for fileName in parsedNames:
0050                 self.fileList.append(self.baseDirectory+"/"+fileName)
0051         self.nFiles = len(self.fileList)
0052 
0053         if "maxEvents" in dsDict:
0054             self.maxEvents = int(dsDict["maxEvents"])
0055         if "isMC" in dsDict:
0056             if dsDict["isMC"] == "True":
0057                 self.sampleType = "MC"
0058             else:
0059                 self.sampleType ="data1"
0060                 
0061         if "isCosmics" in dsDict:
0062             self.isCosmics = (dsDict["isCosmics"] == "True")
0063         else:
0064             self.isCosmics = False
0065         
0066         self.conditions, dummy, self.validConditions = loadConditions(dsDict)      
0067         
0068         # check if any of the sources used for conditions is invalid
0069         if not self.validConditions:
0070             print("Invalid conditions defined for dataset {}".format(self.name))
0071         
0072         # check if all files specified exist
0073         self.existingFiles, missingFiles = allFilesExist(self)
0074         
0075         if not self.existingFiles:
0076             for fileName in missingFiles:
0077                 print("Invalid file name {} defined for dataset {}".format(fileName, self.name))
0078         
0079 
0080 class Alignment:        
0081     name = ""
0082     alignmentName = None
0083     baselineDir = "Design"
0084     globalTag = "None"
0085     isDesign = False
0086     hasAlignmentCondition = False
0087     conditions = []
0088     
0089     def __init__(self, config, name):
0090         alDict = dict(config.items("alignment:{}".format(name)))
0091         self.name = name
0092         if "alignmentName" in alDict:
0093             self.alignmentName = alDict["alignmentName"]
0094         if "globalTag" in alDict:
0095             self.globalTag = alDict["globalTag"]
0096         if "baselineDir" in alDict:
0097             self.baselineDir= alDict["baselineDir"]
0098         if "isDesign" in alDict:
0099             self.isDesign= (alDict["isDesign"] == "True")
0100         
0101         # If self.hasAlignmentCondition is true, no other Alignment-Object is loaded in apeEstimator_cfg.py using the alignmentName
0102         self.conditions, self.hasAlignmentCondition, self.validConditions = loadConditions(alDict) 
0103         
0104         # check if any of the sources used for conditions is invalid
0105         if not self.validConditions:
0106             print("Invalid conditions defined for alignment {}".format(self.name))
0107         
0108         
0109         # check if at least one of the two ways to define the alignment was used
0110         if self.alignmentName == None and not self.hasAlignmentCondition:
0111             print("Error: No alignment object name or record was defined for alignment {}".format(self.name))
0112             sys.exit()
0113         
0114 
0115 class ApeMeasurement:
0116     name = "workingArea"
0117     curIteration = 0
0118     firstIteration = 0
0119     maxIterations = 15
0120     maxEvents = None
0121     status = STATE_NONE
0122     dataset = None
0123     alignment = None
0124     runningJobs  = None
0125     failedJobs      = None
0126     startTime = ""
0127     finishTime = ""
0128     
0129     def __init__(self, name, config, settings):
0130         self.name = name        
0131         self.status = STATE_ITERATION_START
0132         self.runningJobs = []
0133         self.failedJobs = []
0134         self.startTime = subprocess.check_output(["date"]).strip()
0135         
0136         # load conditions from dictionary, overwrite defaults if defined
0137         for key, value in settings.items():
0138             if not key.startswith("condition "):
0139                 setattr(self, key, value)
0140         
0141         # Replace names with actual Dataset and Alignment objects
0142         # In principle, one could preload all these once so they are not
0143         # redefined for each measurement, but right now this does not 
0144         # seem necessary
0145         self.dataset = Dataset(config, settings["dataset"])
0146         self.alignment = Alignment(config, settings["alignment"])
0147         
0148         # If not defined here, replace by setting from Dataset
0149         if not "maxEvents" in settings:
0150             self.maxEvents = self.dataset.maxEvents
0151             
0152         self.firstIteration=int(self.firstIteration)
0153         self.maxIterations=int(self.maxIterations)
0154         self.curIteration = self.firstIteration
0155         self.maxEvents = int(self.maxEvents)
0156         if self.alignment.isDesign:
0157             self.maxIterations = 0
0158         
0159         self.conditions, dummy, self.validConditions = loadConditions(settings) 
0160         
0161         # see if sanity checks passed
0162         if not self.alignment.validConditions or not self.dataset.validConditions or not self.dataset.existingFiles or not self.validConditions:
0163             self.status = STATE_INVALID_CONDITIONS
0164             self.print_status()
0165             self.finishTime = subprocess.check_output(["date"]).strip()
0166             return
0167         
0168             
0169         if self.alignment.isDesign and self.dataset.sampleType != "MC":
0170             # For now, this won't immediately shut down the program
0171             print("APE Measurement {} is scheduled to to an APE baseline measurement with a dataset that is not marked as isMC=True. Is this intended?".format(self.name))
0172         ensurePathExists('{}/hists/{}'.format(base, self.name))
0173         if not self.alignment.isDesign:
0174             ensurePathExists('{}/hists/{}/apeObjects'.format(base, self.name))
0175         
0176             
0177     def get_status(self):
0178         return status_map[self.status]
0179     
0180     def print_status(self):
0181         print("APE Measurement {} in iteration {} is now in status {}".format(self.name, self.curIteration, self.get_status()))
0182     
0183     # submit jobs for track refit and hit categorization
0184     def submit_jobs(self):
0185         toSubmit = []
0186         
0187         allConditions = self.alignment.conditions+self.dataset.conditions+self.conditions
0188         allConditions = list({v['record']:v for v in allConditions}.values()) # should we clean for duplicate records? the record last defined (from dataset) 
0189                                                                               # will be kept in case of overlap, which is the same as if there was no overlap removal
0190         
0191         ensurePathExists("{}/test/autoSubmitter/workingArea".format(base))
0192         
0193         # If conditions are made, create file to load them from
0194         rawFileName = "None"
0195         conditionsFileName = "None"
0196         if len(allConditions) > 0:
0197             conditionsFileName = "{base}/python/conditions/conditions_{name}_iter{iterNo}_cff.py".format(base=base,name=self.name, iterNo=self.curIteration)
0198             rawFileName = "conditions_{name}_iter{iterNo}_cff".format(name=self.name, iterNo=self.curIteration)
0199             with open(conditionsFileName, "w") as fi:
0200                 from autoSubmitterTemplates import conditionsFileHeader
0201                 fi.write(conditionsFileHeader)
0202                 from autoSubmitterTemplates import conditionsTemplate
0203                 for condition in allConditions:
0204                     fi.write(conditionsTemplate.format(record=condition["record"], connect=condition["connect"], tag=condition["tag"]))
0205                 
0206                 
0207         alignmentNameToUse = self.alignment.alignmentName
0208         if self.alignment.hasAlignmentCondition:
0209                 alignmentNameToUse = "fromConditions"
0210         
0211         lastIter = (self.curIteration==self.maxIterations) and not self.alignment.isDesign
0212         
0213         inputCommands = "sample={sample} fileNumber={fileNo} iterNumber={iterNo} lastIter={lastIter} alignRcd={alignRcd} maxEvents={maxEvents} globalTag={globalTag} measurementName={name} conditions={conditions} cosmics={cosmics}".format(sample=self.dataset.sampleType,fileNo="$1",iterNo=self.curIteration,lastIter=lastIter,alignRcd=alignmentNameToUse, maxEvents=self.maxEvents, globalTag=self.alignment.globalTag, name=self.name, conditions=rawFileName,cosmics=self.dataset.isCosmics)
0214         
0215         from autoSubmitterTemplates import condorJobTemplate
0216         jobFileContent = condorJobTemplate.format(base=base, inputFile="$2", inputCommands=inputCommands)
0217         jobFileName = "{}/test/autoSubmitter/workingArea/batchscript_{}_iter{}.tcsh".format(base, self.name,self.curIteration)
0218         with open(jobFileName, "w") as jobFile:
0219             jobFile.write(jobFileContent)
0220         
0221         # create a batch job file for each input file
0222         arguments = ""
0223         from autoSubmitterTemplates import condorArgumentTemplate
0224         for i in range(self.dataset.nFiles):
0225             inputFile = self.dataset.fileList[i]
0226             fileNumber = i+1
0227             arguments += condorArgumentTemplate.format(fileNumber=fileNumber, inputFile=inputFile)
0228             
0229         # build condor submit script
0230         date = subprocess.check_output(["date", "+%m_%d_%H_%M_%S"]).strip()
0231         sub = "{}/test/autoSubmitter/workingArea/job_{}_iter{}".format(base, self.name, self.curIteration)
0232         
0233         errorFileTemp  = sub+"_error_{}.txt"
0234         errorFile  = errorFileTemp.format("$(ProcId)")
0235         outputFile = sub+"_output_$(ProcId).txt"
0236         logFileTemp= sub+"_condor_{}.log"
0237         logFile    = logFileTemp.format("$(ProcId)")
0238         jobFile    = sub+".tcsh"   
0239         jobName    = "{}_{}".format(self.name, self.curIteration)
0240         for i in range(self.dataset.nFiles):
0241             # make file empty if it existed before
0242             with open(logFileTemp.format(i), "w") as fi:
0243                 pass
0244         
0245         # create submit file
0246         from autoSubmitterTemplates import condorSubTemplate
0247         from autoSubmitterTemplates import condorSubTemplateCAF
0248         if use_caf:
0249             submitFileContent = condorSubTemplateCAF.format(jobFile=jobFileName, outputFile=outputFile, errorFile=errorFile, logFile=logFile, arguments=arguments, jobName=jobName)
0250         else:
0251             submitFileContent = condorSubTemplate.format(jobFile=jobFileName, outputFile=outputFile, errorFile=errorFile, logFile=logFile, arguments=arguments, jobName=jobName)
0252         submitFileName = "{}/test/autoSubmitter/workingArea/submit_{}_jobs_iter{}.sub".format(base, self.name, self.curIteration)
0253         with open(submitFileName, "w") as submitFile:
0254             submitFile.write(submitFileContent)
0255         
0256         # submit batch
0257         from autoSubmitterTemplates import submitCondorTemplate
0258         subOut = subprocess.check_output(submitCondorTemplate.format(subFile=submitFileName), shell=True).strip()
0259     
0260         if len(subOut) == 0:
0261                 print("Running on environment that does not know bsub command or ssh session is timed out (ongoing for longer than 24h?), exiting")
0262                 sys.exit()
0263                 
0264         cluster = subOut.split(" ")[-1][:-1]
0265         for i in range(self.dataset.nFiles):
0266             # list contains condor log files from which to read when job is terminated to detect errors
0267             self.runningJobs.append((logFileTemp.format(i), errorFileTemp.format(i), "{}.{}".format(cluster, i)))
0268         
0269         
0270         self.status = STATE_BJOBS_WAITING
0271         self.print_status()
0272     
0273     def check_jobs(self):
0274         lastStatus = self.status
0275         stillRunningJobs = []
0276         # check all still running jobs
0277         for logName, errName, jobId in self.runningJobs:
0278             # read condor logs instead of doing condor_q or similar, as it is much faster
0279             if not os.path.isfile(logName):
0280                 print("{} does not exist even though it should, marking job as failed".format(logName))
0281                 self.failedJobs.append( (logName, errName) ) 
0282                 break
0283             with open(logName, "r") as logFile:
0284                 log = logFile.read()
0285             if not "submitted" in log:
0286                 print("{} was apparently not submitted, did you empty the log file or is condor not working?".format(jobId))
0287                 self.failedJobs.append( (logName, errName) ) 
0288                 
0289             if "Job was aborted" in log:
0290                 print("Job {} of measurement {} in iteration {} was aborted".format(jobId, self.name, self.curIteration))
0291                 self.failedJobs.append( (logName, errName) ) 
0292             elif "Job terminated" in log:
0293                 if "Normal termination (return value 0)" in log:
0294                     foundErr = False
0295                     with open(errName, "r") as err:
0296                         for line in err:
0297                             if "Fatal Exception" in line.strip():
0298                                 foundErr = True
0299                                 break
0300                     if not foundErr:
0301                         print("Job {} of measurement {} in iteration {} finished successfully".format(jobId, self.name, self.curIteration))
0302                     else:
0303                         # Fatal error in stderr
0304                         print("Job {} of measurement {} in iteration {} has a fatal error, check stderr".format(jobId, self.name, self.curIteration))
0305                         self.failedJobs.append( (logName, errName) ) 
0306                 else:
0307                     # nonzero return value
0308                     print("Job {} of measurement {} in iteration {} failed, check stderr".format(jobId, self.name, self.curIteration))
0309                     self.failedJobs.append( (logName, errName) ) 
0310             else:
0311                 stillRunningJobs.append( (logName, errName, jobId) )
0312         self.runningJobs = stillRunningJobs
0313         
0314         # at least one job failed
0315         if len(self.failedJobs) > 0:
0316             self.status = STATE_BJOBS_FAILED
0317             self.finishTime = subprocess.check_output(["date"]).strip()
0318         elif len(self.runningJobs) == 0:
0319             self.status = STATE_BJOBS_DONE
0320             print("All condor jobs of APE measurement {} in iteration {} are done".format(self.name, self.curIteration))
0321             
0322             # remove files
0323             if delete_logs_after_finish:
0324                 submitFile = "{}/test/autoSubmitter/workingArea/submit_{}_jobs_iter{}.sub".format(base, self.name, self.curIteration)
0325                 jobFile = "{}/test/autoSubmitter/workingArea/batchscript_{}_iter{}.tcsh".format(base, self.name,self.curIteration)
0326                 os.remove(submitFile)
0327                 os.remove(jobFile)
0328             
0329                 for i in range(self.dataset.nFiles):
0330                     sub = "{}/test/autoSubmitter/workingArea/job_{}_iter{}".format(base, self.name, self.curIteration)
0331                     errorFile  = sub+"_error_{}.txt".format(i)
0332                     outputFile = sub+"_output_{}.txt".format(i)
0333                     logFile    = sub+"_condor_{}.log".format(i) 
0334                     os.remove(errorFile)
0335                     os.remove(outputFile)
0336                     os.remove(logFile)
0337                     
0338         if lastStatus != self.status:
0339             self.print_status() 
0340     
0341     # merges files from jobs
0342     def do_merge(self):
0343         self.status = STATE_MERGE_WAITING
0344         if self.alignment.isDesign:
0345             folderName = '{}/hists/{}/baseline'.format(base, self.name)
0346         else:
0347             folderName = '{}/hists/{}/iter{}'.format(base, self.name, self.curIteration)
0348         
0349         # (re)move results from previous measurements before creating folder
0350         if os.path.isdir(folderName):
0351             if os.path.isdir(folderName+"_old"):
0352                 shutil.rmtree("{}_old".format(folderName))
0353             os.rename(folderName, folderName+"_old")
0354         os.makedirs(folderName)
0355         
0356         # This is so that the structure of the tree can be retrieved by ApeEstimatorSummary.cc and the tree does not have to be rebuilt
0357         if self.curIteration > 0 and not self.alignment.isDesign: # don't have to check for isDesign here because it always ends after iteration 0...
0358             shutil.copyfile('{}/hists/{}/iter{}/allData_iterationApe.root'.format(base, self.name, self.curIteration-1),folderName+"/allData_iterationApe.root")
0359         fileNames = ['{}/hists/{}/{}{}.root'.format(base, self.name, self.dataset.sampleType, str(i)) for i in range(1, self.dataset.nFiles+1)]
0360         fileString = " ".join(fileNames)
0361         
0362         from autoSubmitterTemplates import mergeTemplate
0363         merge_result = subprocess.call(mergeTemplate.format(path=folderName, inputFiles=fileString), shell=True) # returns exit code (0 if no error occured)
0364         for name in fileNames:
0365             os.remove(name)
0366             
0367         if rootFileValid("{}/allData.root".format(folderName)) and merge_result == 0:
0368             self.status = STATE_MERGE_DONE
0369         else:
0370             self.status = STATE_MERGE_FAILED
0371             self.finishTime = subprocess.check_output(["date"]).strip()
0372         self.print_status()
0373     
0374     # calculates APE
0375     def do_summary(self):
0376         self.status = STATE_SUMMARY_WAITING        
0377         from autoSubmitterTemplates import summaryTemplate
0378         if self.alignment.isDesign:
0379             #use measurement name as baseline folder name in this case
0380             inputCommands = "iterNumber={} setBaseline={} measurementName={} baselineName={}".format(self.curIteration,self.alignment.isDesign,self.name, self.name)
0381         else:
0382             inputCommands = "iterNumber={} setBaseline={} measurementName={} baselineName={}".format(self.curIteration,self.alignment.isDesign,self.name, self.alignment.baselineDir)
0383         
0384         summary_result = subprocess.call(summaryTemplate.format(inputCommands=inputCommands), shell=True) # returns exit code (0 if no error occured)
0385         if summary_result == 0:
0386             self.status = STATE_SUMMARY_DONE
0387         else:
0388             self.status = STATE_SUMMARY_FAILED
0389             self.finishTime = subprocess.check_output(["date"]).strip()
0390         self.print_status()
0391     
0392     # saves APE to .db file so it can be read out next iteration
0393     def do_local_setting(self):
0394         self.status = STATE_LOCAL_WAITING       
0395         from autoSubmitterTemplates import localSettingTemplate
0396         inputCommands = "iterNumber={} setBaseline={} measurementName={}".format(self.curIteration,self.alignment.isDesign,self.name)
0397 
0398         local_setting_result = subprocess.call(localSettingTemplate.format(inputCommands=inputCommands), shell=True) # returns exit code (0 if no error occured)
0399         if local_setting_result == 0:
0400             self.status = STATE_LOCAL_DONE
0401         else:
0402             self.status = STATE_LOCAL_FAILED
0403             self.finishTime = subprocess.check_output(["date"]).strip()
0404         self.print_status()
0405         
0406     def finish_iteration(self):
0407         print("APE Measurement {} just finished iteration {}".format(self.name, self.curIteration))
0408         if self.curIteration < self.maxIterations:
0409             self.curIteration += 1
0410             self.status = STATE_ITERATION_START
0411         else:
0412             self.status = STATE_FINISHED
0413             self.finishTime = subprocess.check_output(["date"]).strip()
0414             print("APE Measurement {}, which was started at {} was finished after {} iterations, at {}".format(self.name, self.startTime, self.curIteration, self.finishTime))
0415             
0416     def kill(self):
0417         from autoSubmitterTemplates import killJobTemplate
0418         for log, err, jobId in self.runningJobs:
0419             subprocess.call(killJobTemplate.format(jobId=jobId), shell=True)
0420         self.runningJobs = []
0421         self.status = STATE_NONE
0422         
0423     def purge(self):
0424         self.kill()
0425         folderName = '{}/hists/{}'.format(base, self.name)
0426         shutil.rmtree(folderName)
0427         # remove log-files as well?
0428         
0429     def run_iteration(self):
0430         global threadcounter
0431         global measurements
0432         threadcounter.acquire()
0433         try:
0434             if self.status == STATE_ITERATION_START:
0435                 # start bjobs
0436                 print("APE Measurement {} just started iteration {}".format(self.name, self.curIteration))
0437 
0438 
0439                 try:
0440                     self.submit_jobs()
0441                     save("measurements", measurements)
0442                 except Exception as e:
0443                     # this is needed in case the scheduler goes down
0444                     print("Error submitting jobs for APE measurement {}".format(self.name))
0445                     print(e)
0446                     return
0447                     
0448             if self.status == STATE_BJOBS_WAITING:
0449                 # check if bjobs are finished
0450                 self.check_jobs()
0451                 save("measurements", measurements)
0452             if self.status == STATE_BJOBS_DONE:
0453                 # merge files
0454                 self.do_merge()
0455                 save("measurements", measurements)
0456             if self.status == STATE_MERGE_DONE:
0457                 # start summary
0458                 self.do_summary()
0459                 save("measurements", measurements)
0460             if self.status == STATE_SUMMARY_DONE:
0461                 # start local setting (only if not a baseline measurement)
0462                 if self.alignment.isDesign:
0463                     self.status = STATE_LOCAL_DONE
0464                 else:
0465                     self.do_local_setting()
0466                 save("measurements", measurements)
0467             if self.status == STATE_LOCAL_DONE:
0468                 self.finish_iteration()
0469                 save("measurements", measurements)
0470                 # go to next iteration or finish measurement
0471             if self.status == STATE_BJOBS_FAILED or \
0472                 self.status == STATE_MERGE_FAILED or \
0473                 self.status == STATE_SUMMARY_FAILED or \
0474                 self.status == STATE_LOCAL_FAILED or \
0475                 self.status == STATE_INVALID_CONDITIONS or \
0476                 self.status == STATE_FINISHED:
0477                     with open(history_file, "a") as fi:
0478                         fi.write("APE measurement {name} which was started at {start} finished at {end} with state {state} in iteration {iteration}\n".format(name=self.name, start=self.startTime, end=self.finishTime, state=self.get_status(), iteration=self.curIteration))
0479                     if self.status == STATE_FINISHED:
0480                         global finished_measurements
0481                         finished_measurements[self.name] = self
0482                         save("finished", finished_measurements)
0483                     else:
0484                         global failed_measurements
0485                         failed_measurements[self.name] = self
0486                         self.status = STATE_NONE
0487                         save("failed", failed_measurements)
0488                     save("measurements", measurements)
0489             if self.status == STATE_ITERATION_START: # this ensures that jobs do not go into idle if many measurements are done simultaneously
0490                 # start bjobs
0491                 print("APE Measurement {} just started iteration {}".format(self.name, self.curIteration))
0492                 self.submit_jobs()
0493                 save("measurements", measurements)   
0494         finally:
0495             threadcounter.release()
0496 
0497 
0498 def main():    
0499     parser = argparse.ArgumentParser(description="Automatically run APE measurements")
0500     parser.add_argument("-c", "--config", action="append", dest="configs", default=[],
0501                           help="Config file that has list of measurements")
0502     parser.add_argument("-k", "--kill", action="append", dest="kill", default=[],
0503                           help="List of measurement names to kill (=remove from list and kill all bjobs)")
0504     parser.add_argument("-p", "--purge", action="append", dest="purge", default=[],
0505                           help="List of measurement names to purge (=kill and remove folder)")
0506     parser.add_argument("-r", "--resume", action="append", dest="resume", default=[],
0507                           help="Resume interrupted APE measurements which are stored in shelves (specify shelves)")
0508     parser.add_argument("-d", "--dump", action="store", dest="dump", default=None,
0509                           help='Specify in which .shelve file to store the measurements')
0510     parser.add_argument("-n", "--ncores", action="store", dest="ncores", default=1, type=int,
0511                           help='Number of threads running in parallel')
0512     parser.add_argument("-C", "--caf",action="store_true", dest="caf", default=False,
0513                                               help="Use CAF queue for condor jobs")
0514     args = parser.parse_args()
0515     
0516     global base
0517     global clock_interval
0518     global shelve_name
0519     global threadcounter
0520     global lock
0521     global use_caf
0522     
0523     use_caf = args.caf
0524     enableCAF(use_caf)
0525     
0526     
0527     threadcounter = threading.BoundedSemaphore(args.ncores)
0528     lock = threading.Lock()
0529     
0530     if args.dump != None: # choose different file than default
0531         shelve_name = args.dump
0532     elif args.resume != []:
0533         shelve_name = args.resume[0]
0534     try:
0535         base = os.environ['CMSSW_BASE']+"/src/Alignment/APEEstimation"
0536     except KeyError:
0537         print("No CMSSW environment was set, exiting")
0538         sys.exit()
0539     
0540     
0541     killTargets = []
0542     purgeTargets = []
0543     for toConvert in args.kill:
0544         killTargets += replaceAllRanges(toConvert)
0545         
0546     for toConvert in args.purge:
0547         purgeTargets += replaceAllRanges(toConvert)
0548     
0549     global measurements
0550     measurements = []
0551     global finished_measurements
0552     finished_measurements = {}
0553     global failed_measurements
0554     failed_measurements = {}
0555     
0556     if args.resume != []:
0557         for resumeFile in args.resume:
0558             try:
0559                 sh = shelve.open(resumeFile)
0560                 resumed = sh["measurements"]
0561                 
0562                 resumed_failed = sh["failed"]
0563                 resumed_finished = sh["finished"]
0564                 sh.close()
0565                 
0566                 for res in resumed:
0567                     measurements.append(res)
0568                     print("Measurement {} in state {} in iteration {} was resumed".format(res.name, res.get_status(), res.curIteration))
0569                     # Killing and purging is done here, because it doesn't make 
0570                     # sense to kill or purge a measurement that was just started
0571                     for to_kill in args.kill:
0572                         if res.name == to_kill:
0573                             res.kill()
0574                     for to_purge in args.purge:
0575                         if res.name == to_purge:
0576                             res.purge()
0577                 
0578                 failed_measurements.update(resumed_failed)
0579                 finished_measurements.update(resumed_finished)
0580                 
0581             except IOError:
0582                 print("Could not resume because {} could not be opened, exiting".format(shelve_name))
0583                 sys.exit()
0584             
0585     # read out from config file
0586     if args.configs != []:
0587         config = ConfigParser.RawConfigParser()
0588         config.optionxform = str 
0589         config.read(args.configs)
0590         
0591         # read measurement names
0592         meas = [str(x.split("ape:")[1]) for x in list(config.keys()) if x.startswith("ape:")]
0593 
0594         for name in meas:
0595             if name in [x.name for x in measurements]:
0596                 print("Error: APE Measurement with name {} already exists, skipping".format(name))
0597                 continue
0598             settings = dict(config.items("ape:{}".format(name)))
0599             
0600             measurement = ApeMeasurement(name, config, settings)
0601             
0602             if measurement.status >= STATE_ITERATION_START and measurement.status <= STATE_FINISHED:
0603                 measurements.append(measurement)
0604                 print("APE Measurement {} was started".format(measurement.name))
0605         
0606         
0607     
0608     while True:
0609         # remove finished and failed measurements
0610         measurements = [measurement for measurement in measurements if not (measurement.status==STATE_NONE or measurement.status == STATE_FINISHED)]
0611         save("measurements", measurements)
0612         save("failed", failed_measurements)
0613         save("finished", finished_measurements)
0614         
0615         list_threads = []
0616         for measurement in measurements:
0617             t = threading.Thread(target=measurement.run_iteration)
0618             list_threads.append(t)
0619             t.start()
0620         
0621         # wait for iterations to finish
0622         for t in list_threads:
0623             t.join()
0624      
0625         if len(measurements) == 0:
0626             print("No APE measurements are active, exiting")
0627             break        
0628         
0629         try: # so that interrupting does not give an error message and just ends the program
0630             time_remaining = clock_interval
0631             while time_remaining > 0:
0632                 print("Sleeping for {} seconds, you can safely [CTRL+C] now".format(time_remaining))
0633                 time.sleep(1)
0634                 time_remaining -= 1
0635                 sys.stdout.write("\033[F")
0636                 sys.stdout.write("\033[K")
0637             print("")
0638             sys.stdout.write("\033[F")
0639             sys.stdout.write("\033[K")
0640         except KeyboardInterrupt:
0641             sys.exit(0)
0642         
0643             
0644 if __name__ == "__main__":
0645     main()