Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:01

0001 import configparser as ConfigParser
0002 import argparse
0003 import shelve
0004 import sys
0005 import os
0006 import subprocess
0007 import threading
0008 import shutil
0009 import time
0010 import re
0011 from helpers import *
0012 
0013 shelve_name = "dump.shelve" # contains all the measurement objects
0014 history_file = "history.log"
0015 clock_interval = 20 # in seconds
0016 delete_logs_after_finish = False  # if it is not desired to keep the log and submit script files
0017 use_caf = False
0018 
0019 def save(name, object):
0020     # in case of multiple threads running this stops potentially problematic file access
0021     global lock
0022     lock.acquire() 
0023     try:
0024         sh = shelve.open(shelve_name)
0025         sh[name] = object
0026         sh.close()
0027     finally:
0028         lock.release()
0029 
0030 class Dataset:
0031     def __init__(self, config, name):
0032         dsDict = dict(config.items("dataset:{}".format(name)))
0033         self.name = name
0034         self.baseDirectory = dsDict["baseDirectory"].replace("$CMSSW_BASE", os.environ['CMSSW_BASE'])
0035         
0036         self.fileList = []
0037         names = dsDict["fileNames"].split(" ")
0038         for name in names:
0039             parsedNames = replaceAllRanges(name)
0040             for fileName in parsedNames:
0041                 self.fileList.append(self.baseDirectory+"/"+fileName)
0042         self.nFiles = len(self.fileList)
0043         
0044         self.maxEvents = -1
0045         if "maxEvents" in dsDict:
0046             self.maxEvents = int(dsDict["maxEvents"])
0047         
0048         self.sampleType ="data1"
0049         if "isMC" in dsDict and dsDict["isMC"] == "True":
0050             self.sampleType = "MC"
0051         
0052         self.isCosmics = False
0053         if "isCosmics" in dsDict:
0054             self.isCosmics = (dsDict["isCosmics"] == "True")
0055         
0056         self.conditions, self.validConditions = loadConditions(dsDict)      
0057         
0058         # check if any of the sources used for conditions is invalid
0059         if not self.validConditions:
0060             print("Invalid conditions defined for dataset {}".format(self.name))
0061         
0062         # check if all files specified exist
0063         self.existingFiles, missingFiles = allFilesExist(self)
0064         
0065         if not self.existingFiles:
0066             for fileName in missingFiles:
0067                 print("Invalid file name {} defined for dataset {}".format(fileName, self.name))
0068     
0069 class Alignment:        
0070     def __init__(self, config, name):
0071         alDict = dict(config.items("alignment:{}".format(name)))
0072         self.name = name
0073         
0074         self.globalTag = "None"
0075         if "globalTag" in alDict:
0076             self.globalTag = alDict["globalTag"]
0077         self.baselineDir = "Design"
0078         if "baselineDir" in alDict:
0079             self.baselineDir= alDict["baselineDir"]
0080         self.isDesign = False
0081         if "isDesign" in alDict:
0082             self.isDesign= (alDict["isDesign"] == "True")
0083         
0084         # If self.hasAlignmentCondition is true, no other Alignment-Object is loaded in apeEstimator_cfg.py using the 
0085         self.conditions, self.validConditions = loadConditions(alDict) 
0086         
0087         # check if any of the sources used for conditions is invalid
0088         if not self.validConditions:
0089             print("Invalid conditions defined for alignment {}".format(self.name))
0090         
0091 
0092 class ApeMeasurement:
0093     name = "workingArea"
0094     curIteration = 0
0095     firstIteration = 0
0096     maxIterations = 15
0097     maxEvents = None
0098     dataset = None
0099     alignment = None
0100     runningJobs  = None
0101     failedJobs      = None
0102     startTime = ""
0103     finishTime = ""
0104     
0105     def __init__(self, name, config, settings):
0106         self.name = name        
0107         self.status_ = STATE_ITERATION_START
0108         self.runningJobs = []
0109         self.failedJobs = []
0110         self.startTime = subprocess.check_output(["date"]).decode().strip()
0111         
0112         # load conditions from dictionary, overwrite defaults if defined
0113         for key, value in settings.items():
0114             if not key.startswith("condition "):
0115                 setattr(self, key, value)
0116         
0117         # Replace names with actual Dataset and Alignment objects
0118         self.dataset = Dataset(config, settings["dataset"])
0119         self.alignment = Alignment(config, settings["alignment"])
0120         
0121         # If not defined here, replace by setting from Dataset
0122         if not "maxEvents" in settings:
0123             self.maxEvents = self.dataset.maxEvents
0124             
0125         self.firstIteration=int(self.firstIteration)
0126         self.maxIterations=int(self.maxIterations)
0127         self.curIteration = self.firstIteration
0128         self.maxEvents = int(self.maxEvents)
0129         if self.alignment.isDesign:
0130             self.maxIterations = 0
0131         
0132         self.conditions, self.validConditions = loadConditions(settings) 
0133         
0134         # see if sanity checks passed
0135         if not self.alignment.validConditions or not self.dataset.validConditions or not self.dataset.existingFiles or not self.validConditions:
0136             self.setStatus(STATE_INVALID_CONDITIONS, True)
0137             return
0138         
0139         if unitTest:
0140             return
0141         
0142         if self.alignment.isDesign and self.dataset.sampleType != "MC":
0143             # For now, this won't immediately shut down the program
0144             print("APE Measurement {} is scheduled to to an APE baseline measurement with a dataset that is not marked as isMC=True. Is this intended?".format(self.name))
0145         ensurePathExists('{}/hists/{}'.format(base, self.name))
0146         if not self.alignment.isDesign:
0147             ensurePathExists('{}/hists/{}/apeObjects'.format(base, self.name))
0148         
0149     def status(self):
0150         return status_map[self.status_]
0151     
0152     def printStatus(self):
0153         print("APE Measurement {} in iteration {} is now in status {}".format(self.name, self.curIteration, self.status()))
0154     
0155     def setStatus(self, status, terminal=False):
0156         if self.status_ != status:
0157             self.status_ = status
0158             self.printStatus()
0159         if terminal:
0160             self.finishTime = subprocess.check_output(["date"]).decode().strip()
0161     
0162     # submit jobs for track refit and hit categorization
0163     def submitJobs(self):
0164         toSubmit = []
0165         
0166         allConditions = self.alignment.conditions+self.dataset.conditions+self.conditions
0167         allConditions = list({v['record']:v for v in allConditions}.values()) # Removes double definitions of Records
0168         
0169         ensurePathExists("{}/test/autoSubmitter/workingArea".format(base))
0170         
0171         # If conditions are made, create file to load them from
0172         rawFileName = "None"
0173         conditionsFileName = "None"
0174         if len(allConditions) > 0:
0175             conditionsFileName = "{base}/python/conditions/conditions_{name}_iter{iterNo}_cff.py".format(base=base,name=self.name, iterNo=self.curIteration)
0176             rawFileName = "conditions_{name}_iter{iterNo}_cff".format(name=self.name, iterNo=self.curIteration)
0177             with open(conditionsFileName, "w") as fi:
0178                 from autoSubmitterTemplates import conditionsFileHeader
0179                 fi.write(conditionsFileHeader)
0180                 from autoSubmitterTemplates import conditionsTemplate
0181                 for condition in allConditions:
0182                     fi.write(conditionsTemplate.format(record=condition["record"], connect=condition["connect"], tag=condition["tag"]))
0183                 
0184         alignmentNameToUse = "fromConditions"
0185         
0186         lastIter = (self.curIteration==self.maxIterations) and not self.alignment.isDesign
0187         
0188         inputCommands = "sample={sample} fileNumber={fileNo} iterNumber={iterNo} lastIter={lastIter} alignRcd={alignRcd} maxEvents={maxEvents} globalTag={globalTag} measurementName={name} conditions={conditions} cosmics={cosmics}".format(sample=self.dataset.sampleType,fileNo="$1",iterNo=self.curIteration,lastIter=lastIter,alignRcd=alignmentNameToUse, maxEvents=self.maxEvents, globalTag=self.alignment.globalTag, name=self.name, conditions=rawFileName,cosmics=self.dataset.isCosmics)
0189         
0190         from autoSubmitterTemplates import condorJobTemplate
0191         jobFileContent = condorJobTemplate.format(base=base, inputFile="$2", inputCommands=inputCommands)
0192         jobFileName = "{}/test/autoSubmitter/workingArea/batchscript_{}_iter{}.tcsh".format(base, self.name,self.curIteration)
0193         with open(jobFileName, "w") as jobFile:
0194             jobFile.write(jobFileContent)
0195         
0196         # create a batch job file for each input file
0197         arguments = ""
0198         from autoSubmitterTemplates import condorArgumentTemplate
0199         for i in range(self.dataset.nFiles):
0200             inputFile = self.dataset.fileList[i]
0201             fileNumber = i+1
0202             arguments += condorArgumentTemplate.format(fileNumber=fileNumber, inputFile=inputFile)
0203             
0204         # build condor submit script
0205         date = subprocess.check_output(["date", "+%m_%d_%H_%M_%S"]).decode().strip()
0206         sub = "{}/test/autoSubmitter/workingArea/job_{}_iter{}".format(base, self.name, self.curIteration)
0207         
0208         errorFileTemp  = sub+"_error_{}.txt"
0209         errorFile  = errorFileTemp.format("$(ProcId)")
0210         outputFile = sub+"_output_$(ProcId).txt"
0211         logFileTemp= sub+"_condor_{}.log"
0212         logFile    = logFileTemp.format("$(ProcId)")
0213         jobFile    = sub+".tcsh"   
0214         jobName    = "{}_{}".format(self.name, self.curIteration)
0215         for i in range(self.dataset.nFiles):
0216             # make file empty if it existed before
0217             with open(logFileTemp.format(i), "w") as fi:
0218                 pass
0219         
0220         # create submit file
0221         from autoSubmitterTemplates import condorSubTemplate
0222         from autoSubmitterTemplates import condorSubTemplateCAF
0223         if use_caf:
0224             submitFileContent = condorSubTemplateCAF.format(jobFile=jobFileName, outputFile=outputFile, errorFile=errorFile, logFile=logFile, arguments=arguments, jobName=jobName)
0225         else:
0226             submitFileContent = condorSubTemplate.format(jobFile=jobFileName, outputFile=outputFile, errorFile=errorFile, logFile=logFile, arguments=arguments, jobName=jobName)
0227         submitFileName = "{}/test/autoSubmitter/workingArea/submit_{}_jobs_iter{}.sub".format(base, self.name, self.curIteration)
0228         with open(submitFileName, "w") as submitFile:
0229             submitFile.write(submitFileContent)
0230         
0231         # submit batch
0232         from autoSubmitterTemplates import submitCondorTemplate
0233         subOut = subprocess.check_output(submitCondorTemplate.format(subFile=submitFileName), shell=True).decode().strip()
0234     
0235         if len(subOut) == 0:
0236                 print("Running on environment that does not know bsub command or ssh session is timed out (ongoing for longer than 24h?), exiting")
0237                 sys.exit()
0238                 
0239         cluster = subOut.split(" ")[-1][:-1]
0240         for i in range(self.dataset.nFiles):
0241             # list contains condor log files from which to read when job is terminated to detect errors
0242             self.runningJobs.append((logFileTemp.format(i), errorFileTemp.format(i), "{}.{}".format(cluster, i)))
0243         
0244         self.setStatus(STATE_BJOBS_WAITING)
0245     
0246     def checkJobs(self):
0247         stillRunningJobs = []
0248         # check all still running jobs
0249         for logName, errName, jobId in self.runningJobs:
0250             # read condor logs instead of doing condor_q or similar, as it is much faster
0251             if not os.path.isfile(logName):
0252                 print("{} does not exist even though it should, marking job as failed".format(logName))
0253                 self.failedJobs.append( (logName, errName) ) 
0254                 break
0255             with open(logName, "r") as logFile:
0256                 log = logFile.read()
0257             if not "submitted" in log:
0258                 print("{} was apparently not submitted, did you empty the log file or is condor not working?".format(jobId))
0259                 self.failedJobs.append( (logName, errName) ) 
0260                 
0261             if "Job was aborted" in log:
0262                 print("Job {} of measurement {} in iteration {} was aborted".format(jobId, self.name, self.curIteration))
0263                 self.failedJobs.append( (logName, errName) ) 
0264             elif "Job terminated" in log:
0265                 if "Normal termination (return value 0)" in log:
0266                     foundErr = False
0267                     with open(errName, "r") as err:
0268                         for line in err:
0269                             if "Fatal Exception" in line.strip():
0270                                 foundErr = True
0271                                 break
0272                     if not foundErr:
0273                         print("Job {} of measurement {} in iteration {} finished successfully".format(jobId, self.name, self.curIteration))
0274                     else:
0275                         # Fatal error in stderr
0276                         print("Job {} of measurement {} in iteration {} has a fatal error, check stderr".format(jobId, self.name, self.curIteration))
0277                         self.failedJobs.append( (logName, errName) ) 
0278                 else:
0279                     # nonzero return value
0280                     print("Job {} of measurement {} in iteration {} failed, check stderr".format(jobId, self.name, self.curIteration))
0281                     self.failedJobs.append( (logName, errName) ) 
0282             else:
0283                 stillRunningJobs.append( (logName, errName, jobId) )
0284         self.runningJobs = stillRunningJobs
0285         
0286         # at least one job failed
0287         if len(self.failedJobs) > 0:
0288             self.setStatus(STATE_BJOBS_FAILED, True)
0289         elif len(self.runningJobs) == 0:
0290             self.setStatus(STATE_BJOBS_DONE)
0291             print("All condor jobs of APE measurement {} in iteration {} are done".format(self.name, self.curIteration))
0292             
0293             # remove files
0294             if delete_logs_after_finish:
0295                 submitFile = "{}/test/autoSubmitter/workingArea/submit_{}_jobs_iter{}.sub".format(base, self.name, self.curIteration)
0296                 jobFile = "{}/test/autoSubmitter/workingArea/batchscript_{}_iter{}.tcsh".format(base, self.name,self.curIteration)
0297                 os.remove(submitFile)
0298                 os.remove(jobFile)
0299             
0300                 for i in range(self.dataset.nFiles):
0301                     sub = "{}/test/autoSubmitter/workingArea/job_{}_iter{}".format(base, self.name, self.curIteration)
0302                     errorFile  = sub+"_error_{}.txt".format(i)
0303                     outputFile = sub+"_output_{}.txt".format(i)
0304                     logFile    = sub+"_condor_{}.log".format(i) 
0305                     os.remove(errorFile)
0306                     os.remove(outputFile)
0307                     os.remove(logFile)
0308     
0309     # merges files from jobs
0310     def mergeFiles(self):
0311         self.setStatus(STATE_MERGE_WAITING)
0312         if self.alignment.isDesign:
0313             folderName = '{}/hists/{}/baseline'.format(base, self.name)
0314         else:
0315             folderName = '{}/hists/{}/iter{}'.format(base, self.name, self.curIteration)
0316         
0317         # (re)move results from previous measurements before creating folder
0318         if os.path.isdir(folderName):
0319             if os.path.isdir(folderName+"_old"):
0320                 shutil.rmtree("{}_old".format(folderName))
0321             os.rename(folderName, folderName+"_old")
0322         os.makedirs(folderName)
0323         
0324         # This is so that the structure of the tree can be retrieved by ApeEstimatorSummary.cc and the tree does not have to be rebuilt
0325         if self.curIteration > 0 and not self.alignment.isDesign: # don't have to check for isDesign here because it always ends after iteration 0...
0326             shutil.copyfile('{}/hists/{}/iter{}/allData_iterationApe.root'.format(base, self.name, self.curIteration-1),folderName+"/allData_iterationApe.root")
0327         fileNames = ['{}/hists/{}/{}{}.root'.format(base, self.name, self.dataset.sampleType, str(i)) for i in range(1, self.dataset.nFiles+1)]
0328         fileString = " ".join(fileNames)
0329         
0330         from autoSubmitterTemplates import mergeTemplate
0331         merge_result = subprocess.call(mergeTemplate.format(path=folderName, inputFiles=fileString), shell=True) # returns exit code (0 if no error occured)
0332         for name in fileNames:
0333             os.remove(name)
0334             
0335         if rootFileValid("{}/allData.root".format(folderName)) and merge_result == 0:
0336             self.setStatus(STATE_MERGE_DONE)
0337         else:
0338             self.setStatus(STATE_MERGE_FAILED, True)
0339     
0340     # calculates APE
0341     def calculateApe(self):
0342         self.status_ = STATE_SUMMARY_WAITING        
0343         from autoSubmitterTemplates import summaryTemplate
0344         if self.alignment.isDesign:
0345             #use measurement name as baseline folder name in this case
0346             inputCommands = "iterNumber={} setBaseline={} measurementName={} baselineName={}".format(self.curIteration,self.alignment.isDesign,self.name, self.name)
0347         else:
0348             inputCommands = "iterNumber={} setBaseline={} measurementName={} baselineName={}".format(self.curIteration,self.alignment.isDesign,self.name, self.alignment.baselineDir)
0349         
0350         summary_result = subprocess.call(summaryTemplate.format(inputCommands=inputCommands), shell=True) # returns exit code (0 if no error occured)
0351         if summary_result == 0:
0352             self.setStatus(STATE_SUMMARY_DONE)
0353         else:
0354             self.setStatus(STATE_SUMMARY_FAILED, True)
0355     
0356     # saves APE to .db file so it can be read out next iteration
0357     def writeApeToDb(self):
0358         self.setStatus(STATE_LOCAL_WAITING)      
0359         from autoSubmitterTemplates import localSettingTemplate
0360         inputCommands = "iterNumber={} setBaseline={} measurementName={}".format(self.curIteration,self.alignment.isDesign,self.name)
0361 
0362         local_setting_result = subprocess.call(localSettingTemplate.format(inputCommands=inputCommands), shell=True) # returns exit code (0 if no error occured)
0363         if local_setting_result == 0:
0364             self.setStatus(STATE_LOCAL_DONE)
0365         else:
0366             self.setStatus(STATE_LOCAL_FAILED, True)
0367         
0368     def finishIteration(self):
0369         print("APE Measurement {} just finished iteration {}".format(self.name, self.curIteration))
0370         if self.curIteration < self.maxIterations:
0371             self.curIteration += 1
0372             self.setStatus(STATE_ITERATION_START)
0373         else:
0374             self.setStatus(STATE_FINISHED, True)
0375             print("APE Measurement {}, which was started at {} was finished after {} iterations, at {}".format(self.name, self.startTime, self.curIteration, self.finishTime))
0376             
0377     def kill(self):
0378         from autoSubmitterTemplates import killJobTemplate
0379         for log, err, jobId in self.runningJobs:
0380             subprocess.call(killJobTemplate.format(jobId=jobId), shell=True)
0381         self.runningJobs = []
0382         self.setStatus(STATE_NONE)
0383         
0384     def purge(self):
0385         self.kill()
0386         folderName = '{}/hists/{}'.format(base, self.name)
0387         shutil.rmtree(folderName)
0388         # remove log-files as well?
0389         
0390     def runIteration(self):
0391         global threadcounter
0392         global measurements
0393         threadcounter.acquire()
0394         try:
0395             if self.status_ == STATE_ITERATION_START:
0396                 # start bjobs
0397                 print("APE Measurement {} just started iteration {}".format(self.name, self.curIteration))
0398 
0399                 try:
0400                     self.submitJobs()
0401                     save("measurements", measurements)
0402                 except Exception as e:
0403                     # this is needed in case the scheduler goes down
0404                     print("Error submitting jobs for APE measurement {}".format(self.name))
0405                     print(e)
0406                     return
0407                     
0408             if self.status_ == STATE_BJOBS_WAITING:
0409                 # check if bjobs are finished
0410                 self.checkJobs()
0411                 save("measurements", measurements)
0412             if self.status_ == STATE_BJOBS_DONE:
0413                 # merge files
0414                 self.mergeFiles()
0415                 save("measurements", measurements)
0416             if self.status_ == STATE_MERGE_DONE:
0417                 # start summary
0418                 self.calculateApe()
0419                 save("measurements", measurements)
0420             if self.status_ == STATE_SUMMARY_DONE:
0421                 # start local setting (only if not a baseline measurement)
0422                 if self.alignment.isDesign:
0423                     self.setStatus(STATE_LOCAL_DONE)
0424                 else:
0425                     self.writeApeToDb()
0426                 save("measurements", measurements)
0427             if self.status_ == STATE_LOCAL_DONE:
0428                 self.finishIteration()
0429                 save("measurements", measurements)
0430                 # go to next iteration or finish measurement
0431             
0432             if self.status_ == STATE_BJOBS_FAILED or \
0433                 self.status_ == STATE_MERGE_FAILED or \
0434                 self.status_ == STATE_SUMMARY_FAILED or \
0435                 self.status_ == STATE_LOCAL_FAILED or \
0436                 self.status_ == STATE_INVALID_CONDITIONS or \
0437                 self.status_ == STATE_FINISHED:
0438                     with open(history_file, "a") as fi:
0439                         fi.write("APE measurement {name} which was started at {start} finished at {end} with state {state} in iteration {iteration}\n".format(name=self.name, start=self.startTime, end=self.finishTime, state=self.status(), iteration=self.curIteration))
0440                     if self.status_ == STATE_FINISHED:
0441                         global finished_measurements
0442                         finished_measurements[self.name] = self
0443                         save("finished", finished_measurements)
0444                     else:
0445                         global failed_measurements
0446                         failed_measurements[self.name] = self
0447                         
0448                         self.setStatus(STATE_NONE)
0449                         save("failed", failed_measurements)
0450                     save("measurements", measurements)
0451             if self.status_ == STATE_ITERATION_START: # this ensures that jobs do not go into idle if many measurements are done simultaneously
0452                 # start bjobs
0453                 print("APE Measurement {} just started iteration {}".format(self.name, self.curIteration))
0454                 self.submitJobs()
0455                 save("measurements", measurements)   
0456         finally:
0457             threadcounter.release()
0458 
0459 def main():    
0460     parser = argparse.ArgumentParser(description="Automatically run APE measurements")
0461     parser.add_argument("-c", "--config", action="append", dest="configs", default=[],
0462                           help="Config file that has list of measurements")
0463     parser.add_argument("-k", "--kill", action="append", dest="kill", default=[],
0464                           help="List of measurement names to kill (=remove from list and kill all bjobs)")
0465     parser.add_argument("-p", "--purge", action="append", dest="purge", default=[],
0466                           help="List of measurement names to purge (=kill and remove folder)")
0467     parser.add_argument("-r", "--resume", action="append", dest="resume", default=[],
0468                           help="Resume interrupted APE measurements which are stored in shelves (specify shelves)")
0469     parser.add_argument("-d", "--dump", action="store", dest="dump", default=None,
0470                           help='Specify in which .shelve file to store the measurements')
0471     parser.add_argument("-n", "--ncores", action="store", dest="ncores", default=1, type=int,
0472                           help='Number of threads running in parallel')
0473     parser.add_argument("-C", "--caf",action="store_true", dest="caf", default=False,
0474                                               help="Use CAF queue for condor jobs")
0475     parser.add_argument("-u", "--unitTest", action="store_true", dest="unitTest", default=False,
0476                           help='If this is used, as soon as a measurement fails, the program will exit and as exit code the status of the measurement, i.e., where it failed')
0477     args = parser.parse_args()
0478     
0479     global base
0480     global clock_interval
0481     global shelve_name
0482     global threadcounter
0483     global lock
0484     global use_caf
0485     global unitTest 
0486     
0487     use_caf = args.caf
0488     unitTest = args.unitTest
0489     
0490     threadcounter = threading.BoundedSemaphore(args.ncores)
0491     lock = threading.Lock()
0492     
0493     if args.dump != None: # choose different file than default
0494         shelve_name = args.dump
0495     elif args.resume != []:
0496         shelve_name = args.resume[0]
0497     try:
0498         base = os.environ['CMSSW_BASE']+"/src/Alignment/APEEstimation"
0499     except KeyError:
0500         print("No CMSSW environment was set, exiting")
0501         sys.exit(1)
0502 
0503     killTargets = []
0504     purgeTargets = []
0505     for toConvert in args.kill:
0506         killTargets += replaceAllRanges(toConvert)
0507         
0508     for toConvert in args.purge:
0509         purgeTargets += replaceAllRanges(toConvert)
0510     
0511     global measurements
0512     measurements = []
0513     global finished_measurements
0514     finished_measurements = {}
0515     global failed_measurements
0516     failed_measurements = {}
0517     
0518     if args.resume != []:
0519         for resumeFile in args.resume:
0520             try:
0521                 sh = shelve.open(resumeFile)
0522                 resumed = sh["measurements"]
0523                 
0524                 resumed_failed = sh["failed"]
0525                 resumed_finished = sh["finished"]
0526                 sh.close()
0527                 
0528                 for res in resumed:
0529                     measurements.append(res)
0530                     print("Measurement {} in state {} in iteration {} was resumed".format(res.name, res.status(), res.curIteration))
0531                     # Killing and purging is done here, because it doesn't make 
0532                     # sense to kill or purge a measurement that was just started
0533                     for to_kill in args.kill:
0534                         if res.name == to_kill:
0535                             res.kill()
0536                     for to_purge in args.purge:
0537                         if res.name == to_purge:
0538                             res.purge()
0539                 
0540                 failed_measurements.update(resumed_failed)
0541                 finished_measurements.update(resumed_finished)
0542                 
0543             except IOError:
0544                 print("Could not resume because {} could not be opened, exiting".format(shelve_name))
0545                 sys.exit(2)
0546             
0547     # read out from config file
0548     if args.configs != []:
0549         config = ConfigParser.RawConfigParser()
0550         config.optionxform = str 
0551         config.read(args.configs)
0552         
0553         # read measurement names
0554         meas = [str(x.split("ape:")[1]) for x in list(config.keys()) if x.startswith("ape:")]
0555 
0556         for name in meas:
0557             if name in [x.name for x in measurements]:
0558                 print("Error: APE Measurement with name {} already exists, skipping".format(name))
0559                 continue
0560             settings = dict(config.items("ape:{}".format(name)))
0561             
0562             measurement = ApeMeasurement(name, config, settings)
0563             
0564             if measurement.status_ >= STATE_ITERATION_START:
0565                 measurements.append(measurement)
0566                 print("APE Measurement {} was started".format(measurement.name))
0567     
0568     if unitTest:
0569             # status is 0 if successful, 101 if wrongly configured
0570             sys.exit(measurement.status_)
0571     
0572     initializeModuleLoading()
0573     enableCAF(use_caf)
0574     
0575     
0576     while True:
0577         # remove finished and failed measurements
0578         measurements = [measurement for measurement in measurements if not (measurement.status_==STATE_NONE or measurement.status_ == STATE_FINISHED)]
0579         save("measurements", measurements)
0580         save("failed", failed_measurements)
0581         save("finished", finished_measurements)
0582         
0583         list_threads = []
0584         for measurement in measurements:
0585             t = threading.Thread(target=measurement.runIteration)
0586             list_threads.append(t)
0587             t.start()
0588         
0589         # wait for iterations to finish
0590         for t in list_threads:
0591             t.join()
0592      
0593         if len(measurements) == 0:
0594             print("No APE measurements are active, exiting")
0595             sys.exit(0)      
0596         
0597         try: # so that interrupting does not give an error message and just ends the program
0598             time_remaining = clock_interval
0599             while time_remaining > 0:
0600                 print("Sleeping for {} seconds, you can safely [CTRL+C] now".format(time_remaining))
0601                 time.sleep(1)
0602                 time_remaining -= 1
0603                 sys.stdout.write("\033[F")
0604                 sys.stdout.write("\033[K")
0605             print("")
0606             sys.stdout.write("\033[F")
0607             sys.stdout.write("\033[K")
0608         except KeyboardInterrupt:
0609             sys.exit(0)
0610 
0611 if __name__ == "__main__":
0612     main()