Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 11:55:57

0001 from __future__ import print_function
0002 import configparser as ConfigParser
0003 import argparse
0004 import shelve
0005 import sys
0006 import os
0007 import subprocess
0008 import threading
0009 import shutil
0010 import time
0011 import re
0012 from helpers import *
0013 
0014 shelve_name = "dump.shelve" # contains all the measurement objects
0015 history_file = "history.log"
0016 clock_interval = 20 # in seconds
0017 delete_logs_after_finish = False  # if it is not desired to keep the log and submit script files
0018 use_caf = False
0019 
0020 def save(name, object):
0021     # in case of multiple threads running this stops potentially problematic file access
0022     global lock
0023     lock.acquire() 
0024     try:
0025         sh = shelve.open(shelve_name)
0026         sh[name] = object
0027         sh.close()
0028     finally:
0029         lock.release()
0030 
0031 class Dataset:
0032     def __init__(self, config, name):
0033         dsDict = dict(config.items("dataset:{}".format(name)))
0034         self.name = name
0035         self.baseDirectory = dsDict["baseDirectory"].replace("$CMSSW_BASE", os.environ['CMSSW_BASE'])
0036         
0037         self.fileList = []
0038         names = dsDict["fileNames"].split(" ")
0039         for name in names:
0040             parsedNames = replaceAllRanges(name)
0041             for fileName in parsedNames:
0042                 self.fileList.append(self.baseDirectory+"/"+fileName)
0043         self.nFiles = len(self.fileList)
0044         
0045         self.maxEvents = -1
0046         if "maxEvents" in dsDict:
0047             self.maxEvents = int(dsDict["maxEvents"])
0048         
0049         self.sampleType ="data1"
0050         if "isMC" in dsDict and dsDict["isMC"] == "True":
0051             self.sampleType = "MC"
0052         
0053         self.isCosmics = False
0054         if "isCosmics" in dsDict:
0055             self.isCosmics = (dsDict["isCosmics"] == "True")
0056         
0057         self.conditions, self.validConditions = loadConditions(dsDict)      
0058         
0059         # check if any of the sources used for conditions is invalid
0060         if not self.validConditions:
0061             print("Invalid conditions defined for dataset {}".format(self.name))
0062         
0063         # check if all files specified exist
0064         self.existingFiles, missingFiles = allFilesExist(self)
0065         
0066         if not self.existingFiles:
0067             for fileName in missingFiles:
0068                 print("Invalid file name {} defined for dataset {}".format(fileName, self.name))
0069     
0070 class Alignment:        
0071     def __init__(self, config, name):
0072         alDict = dict(config.items("alignment:{}".format(name)))
0073         self.name = name
0074         
0075         self.globalTag = "None"
0076         if "globalTag" in alDict:
0077             self.globalTag = alDict["globalTag"]
0078         self.baselineDir = "Design"
0079         if "baselineDir" in alDict:
0080             self.baselineDir= alDict["baselineDir"]
0081         self.isDesign = False
0082         if "isDesign" in alDict:
0083             self.isDesign= (alDict["isDesign"] == "True")
0084         
0085         # If self.hasAlignmentCondition is true, no other Alignment-Object is loaded in apeEstimator_cfg.py using the 
0086         self.conditions, self.validConditions = loadConditions(alDict) 
0087         
0088         # check if any of the sources used for conditions is invalid
0089         if not self.validConditions:
0090             print("Invalid conditions defined for alignment {}".format(self.name))
0091         
0092 
0093 class ApeMeasurement:
0094     name = "workingArea"
0095     curIteration = 0
0096     firstIteration = 0
0097     maxIterations = 15
0098     maxEvents = None
0099     dataset = None
0100     alignment = None
0101     runningJobs  = None
0102     failedJobs      = None
0103     startTime = ""
0104     finishTime = ""
0105     
0106     def __init__(self, name, config, settings):
0107         self.name = name        
0108         self.status_ = STATE_ITERATION_START
0109         self.runningJobs = []
0110         self.failedJobs = []
0111         self.startTime = subprocess.check_output(["date"]).decode().strip()
0112         
0113         # load conditions from dictionary, overwrite defaults if defined
0114         for key, value in settings.items():
0115             if not key.startswith("condition "):
0116                 setattr(self, key, value)
0117         
0118         # Replace names with actual Dataset and Alignment objects
0119         self.dataset = Dataset(config, settings["dataset"])
0120         self.alignment = Alignment(config, settings["alignment"])
0121         
0122         # If not defined here, replace by setting from Dataset
0123         if not "maxEvents" in settings:
0124             self.maxEvents = self.dataset.maxEvents
0125             
0126         self.firstIteration=int(self.firstIteration)
0127         self.maxIterations=int(self.maxIterations)
0128         self.curIteration = self.firstIteration
0129         self.maxEvents = int(self.maxEvents)
0130         if self.alignment.isDesign:
0131             self.maxIterations = 0
0132         
0133         self.conditions, self.validConditions = loadConditions(settings) 
0134         
0135         # see if sanity checks passed
0136         if not self.alignment.validConditions or not self.dataset.validConditions or not self.dataset.existingFiles or not self.validConditions:
0137             self.setStatus(STATE_INVALID_CONDITIONS, True)
0138             return
0139         
0140         if unitTest:
0141             return
0142         
0143         if self.alignment.isDesign and self.dataset.sampleType != "MC":
0144             # For now, this won't immediately shut down the program
0145             print("APE Measurement {} is scheduled to to an APE baseline measurement with a dataset that is not marked as isMC=True. Is this intended?".format(self.name))
0146         ensurePathExists('{}/hists/{}'.format(base, self.name))
0147         if not self.alignment.isDesign:
0148             ensurePathExists('{}/hists/{}/apeObjects'.format(base, self.name))
0149         
0150     def status(self):
0151         return status_map[self.status_]
0152     
0153     def printStatus(self):
0154         print("APE Measurement {} in iteration {} is now in status {}".format(self.name, self.curIteration, self.status()))
0155     
0156     def setStatus(self, status, terminal=False):
0157         if self.status_ != status:
0158             self.status_ = status
0159             self.printStatus()
0160         if terminal:
0161             self.finishTime = subprocess.check_output(["date"]).decode().strip()
0162     
0163     # submit jobs for track refit and hit categorization
0164     def submitJobs(self):
0165         toSubmit = []
0166         
0167         allConditions = self.alignment.conditions+self.dataset.conditions+self.conditions
0168         allConditions = list({v['record']:v for v in allConditions}.values()) # Removes double definitions of Records
0169         
0170         ensurePathExists("{}/test/autoSubmitter/workingArea".format(base))
0171         
0172         # If conditions are made, create file to load them from
0173         rawFileName = "None"
0174         conditionsFileName = "None"
0175         if len(allConditions) > 0:
0176             conditionsFileName = "{base}/python/conditions/conditions_{name}_iter{iterNo}_cff.py".format(base=base,name=self.name, iterNo=self.curIteration)
0177             rawFileName = "conditions_{name}_iter{iterNo}_cff".format(name=self.name, iterNo=self.curIteration)
0178             with open(conditionsFileName, "w") as fi:
0179                 from autoSubmitterTemplates import conditionsFileHeader
0180                 fi.write(conditionsFileHeader)
0181                 from autoSubmitterTemplates import conditionsTemplate
0182                 for condition in allConditions:
0183                     fi.write(conditionsTemplate.format(record=condition["record"], connect=condition["connect"], tag=condition["tag"]))
0184                 
0185         alignmentNameToUse = "fromConditions"
0186         
0187         lastIter = (self.curIteration==self.maxIterations) and not self.alignment.isDesign
0188         
0189         inputCommands = "sample={sample} fileNumber={fileNo} iterNumber={iterNo} lastIter={lastIter} alignRcd={alignRcd} maxEvents={maxEvents} globalTag={globalTag} measurementName={name} conditions={conditions} cosmics={cosmics}".format(sample=self.dataset.sampleType,fileNo="$1",iterNo=self.curIteration,lastIter=lastIter,alignRcd=alignmentNameToUse, maxEvents=self.maxEvents, globalTag=self.alignment.globalTag, name=self.name, conditions=rawFileName,cosmics=self.dataset.isCosmics)
0190         
0191         from autoSubmitterTemplates import condorJobTemplate
0192         jobFileContent = condorJobTemplate.format(base=base, inputFile="$2", inputCommands=inputCommands)
0193         jobFileName = "{}/test/autoSubmitter/workingArea/batchscript_{}_iter{}.tcsh".format(base, self.name,self.curIteration)
0194         with open(jobFileName, "w") as jobFile:
0195             jobFile.write(jobFileContent)
0196         
0197         # create a batch job file for each input file
0198         arguments = ""
0199         from autoSubmitterTemplates import condorArgumentTemplate
0200         for i in range(self.dataset.nFiles):
0201             inputFile = self.dataset.fileList[i]
0202             fileNumber = i+1
0203             arguments += condorArgumentTemplate.format(fileNumber=fileNumber, inputFile=inputFile)
0204             
0205         # build condor submit script
0206         date = subprocess.check_output(["date", "+%m_%d_%H_%M_%S"]).decode().strip()
0207         sub = "{}/test/autoSubmitter/workingArea/job_{}_iter{}".format(base, self.name, self.curIteration)
0208         
0209         errorFileTemp  = sub+"_error_{}.txt"
0210         errorFile  = errorFileTemp.format("$(ProcId)")
0211         outputFile = sub+"_output_$(ProcId).txt"
0212         logFileTemp= sub+"_condor_{}.log"
0213         logFile    = logFileTemp.format("$(ProcId)")
0214         jobFile    = sub+".tcsh"   
0215         jobName    = "{}_{}".format(self.name, self.curIteration)
0216         for i in range(self.dataset.nFiles):
0217             # make file empty if it existed before
0218             with open(logFileTemp.format(i), "w") as fi:
0219                 pass
0220         
0221         # create submit file
0222         from autoSubmitterTemplates import condorSubTemplate
0223         from autoSubmitterTemplates import condorSubTemplateCAF
0224         if use_caf:
0225             submitFileContent = condorSubTemplateCAF.format(jobFile=jobFileName, outputFile=outputFile, errorFile=errorFile, logFile=logFile, arguments=arguments, jobName=jobName)
0226         else:
0227             submitFileContent = condorSubTemplate.format(jobFile=jobFileName, outputFile=outputFile, errorFile=errorFile, logFile=logFile, arguments=arguments, jobName=jobName)
0228         submitFileName = "{}/test/autoSubmitter/workingArea/submit_{}_jobs_iter{}.sub".format(base, self.name, self.curIteration)
0229         with open(submitFileName, "w") as submitFile:
0230             submitFile.write(submitFileContent)
0231         
0232         # submit batch
0233         from autoSubmitterTemplates import submitCondorTemplate
0234         subOut = subprocess.check_output(submitCondorTemplate.format(subFile=submitFileName), shell=True).decode().strip()
0235     
0236         if len(subOut) == 0:
0237                 print("Running on environment that does not know bsub command or ssh session is timed out (ongoing for longer than 24h?), exiting")
0238                 sys.exit()
0239                 
0240         cluster = subOut.split(" ")[-1][:-1]
0241         for i in range(self.dataset.nFiles):
0242             # list contains condor log files from which to read when job is terminated to detect errors
0243             self.runningJobs.append((logFileTemp.format(i), errorFileTemp.format(i), "{}.{}".format(cluster, i)))
0244         
0245         self.setStatus(STATE_BJOBS_WAITING)
0246     
0247     def checkJobs(self):
0248         stillRunningJobs = []
0249         # check all still running jobs
0250         for logName, errName, jobId in self.runningJobs:
0251             # read condor logs instead of doing condor_q or similar, as it is much faster
0252             if not os.path.isfile(logName):
0253                 print("{} does not exist even though it should, marking job as failed".format(logName))
0254                 self.failedJobs.append( (logName, errName) ) 
0255                 break
0256             with open(logName, "r") as logFile:
0257                 log = logFile.read()
0258             if not "submitted" in log:
0259                 print("{} was apparently not submitted, did you empty the log file or is condor not working?".format(jobId))
0260                 self.failedJobs.append( (logName, errName) ) 
0261                 
0262             if "Job was aborted" in log:
0263                 print("Job {} of measurement {} in iteration {} was aborted".format(jobId, self.name, self.curIteration))
0264                 self.failedJobs.append( (logName, errName) ) 
0265             elif "Job terminated" in log:
0266                 if "Normal termination (return value 0)" in log:
0267                     foundErr = False
0268                     with open(errName, "r") as err:
0269                         for line in err:
0270                             if "Fatal Exception" in line.strip():
0271                                 foundErr = True
0272                                 break
0273                     if not foundErr:
0274                         print("Job {} of measurement {} in iteration {} finished successfully".format(jobId, self.name, self.curIteration))
0275                     else:
0276                         # Fatal error in stderr
0277                         print("Job {} of measurement {} in iteration {} has a fatal error, check stderr".format(jobId, self.name, self.curIteration))
0278                         self.failedJobs.append( (logName, errName) ) 
0279                 else:
0280                     # nonzero return value
0281                     print("Job {} of measurement {} in iteration {} failed, check stderr".format(jobId, self.name, self.curIteration))
0282                     self.failedJobs.append( (logName, errName) ) 
0283             else:
0284                 stillRunningJobs.append( (logName, errName, jobId) )
0285         self.runningJobs = stillRunningJobs
0286         
0287         # at least one job failed
0288         if len(self.failedJobs) > 0:
0289             self.setStatus(STATE_BJOBS_FAILED, True)
0290         elif len(self.runningJobs) == 0:
0291             self.setStatus(STATE_BJOBS_DONE)
0292             print("All condor jobs of APE measurement {} in iteration {} are done".format(self.name, self.curIteration))
0293             
0294             # remove files
0295             if delete_logs_after_finish:
0296                 submitFile = "{}/test/autoSubmitter/workingArea/submit_{}_jobs_iter{}.sub".format(base, self.name, self.curIteration)
0297                 jobFile = "{}/test/autoSubmitter/workingArea/batchscript_{}_iter{}.tcsh".format(base, self.name,self.curIteration)
0298                 os.remove(submitFile)
0299                 os.remove(jobFile)
0300             
0301                 for i in range(self.dataset.nFiles):
0302                     sub = "{}/test/autoSubmitter/workingArea/job_{}_iter{}".format(base, self.name, self.curIteration)
0303                     errorFile  = sub+"_error_{}.txt".format(i)
0304                     outputFile = sub+"_output_{}.txt".format(i)
0305                     logFile    = sub+"_condor_{}.log".format(i) 
0306                     os.remove(errorFile)
0307                     os.remove(outputFile)
0308                     os.remove(logFile)
0309     
0310     # merges files from jobs
0311     def mergeFiles(self):
0312         self.setStatus(STATE_MERGE_WAITING)
0313         if self.alignment.isDesign:
0314             folderName = '{}/hists/{}/baseline'.format(base, self.name)
0315         else:
0316             folderName = '{}/hists/{}/iter{}'.format(base, self.name, self.curIteration)
0317         
0318         # (re)move results from previous measurements before creating folder
0319         if os.path.isdir(folderName):
0320             if os.path.isdir(folderName+"_old"):
0321                 shutil.rmtree("{}_old".format(folderName))
0322             os.rename(folderName, folderName+"_old")
0323         os.makedirs(folderName)
0324         
0325         # This is so that the structure of the tree can be retrieved by ApeEstimatorSummary.cc and the tree does not have to be rebuilt
0326         if self.curIteration > 0 and not self.alignment.isDesign: # don't have to check for isDesign here because it always ends after iteration 0...
0327             shutil.copyfile('{}/hists/{}/iter{}/allData_iterationApe.root'.format(base, self.name, self.curIteration-1),folderName+"/allData_iterationApe.root")
0328         fileNames = ['{}/hists/{}/{}{}.root'.format(base, self.name, self.dataset.sampleType, str(i)) for i in range(1, self.dataset.nFiles+1)]
0329         fileString = " ".join(fileNames)
0330         
0331         from autoSubmitterTemplates import mergeTemplate
0332         merge_result = subprocess.call(mergeTemplate.format(path=folderName, inputFiles=fileString), shell=True) # returns exit code (0 if no error occured)
0333         for name in fileNames:
0334             os.remove(name)
0335             
0336         if rootFileValid("{}/allData.root".format(folderName)) and merge_result == 0:
0337             self.setStatus(STATE_MERGE_DONE)
0338         else:
0339             self.setStatus(STATE_MERGE_FAILED, True)
0340     
0341     # calculates APE
0342     def calculateApe(self):
0343         self.status_ = STATE_SUMMARY_WAITING        
0344         from autoSubmitterTemplates import summaryTemplate
0345         if self.alignment.isDesign:
0346             #use measurement name as baseline folder name in this case
0347             inputCommands = "iterNumber={} setBaseline={} measurementName={} baselineName={}".format(self.curIteration,self.alignment.isDesign,self.name, self.name)
0348         else:
0349             inputCommands = "iterNumber={} setBaseline={} measurementName={} baselineName={}".format(self.curIteration,self.alignment.isDesign,self.name, self.alignment.baselineDir)
0350         
0351         summary_result = subprocess.call(summaryTemplate.format(inputCommands=inputCommands), shell=True) # returns exit code (0 if no error occured)
0352         if summary_result == 0:
0353             self.setStatus(STATE_SUMMARY_DONE)
0354         else:
0355             self.setStatus(STATE_SUMMARY_FAILED, True)
0356     
0357     # saves APE to .db file so it can be read out next iteration
0358     def writeApeToDb(self):
0359         self.setStatus(STATE_LOCAL_WAITING)      
0360         from autoSubmitterTemplates import localSettingTemplate
0361         inputCommands = "iterNumber={} setBaseline={} measurementName={}".format(self.curIteration,self.alignment.isDesign,self.name)
0362 
0363         local_setting_result = subprocess.call(localSettingTemplate.format(inputCommands=inputCommands), shell=True) # returns exit code (0 if no error occured)
0364         if local_setting_result == 0:
0365             self.setStatus(STATE_LOCAL_DONE)
0366         else:
0367             self.setStatus(STATE_LOCAL_FAILED, True)
0368         
0369     def finishIteration(self):
0370         print("APE Measurement {} just finished iteration {}".format(self.name, self.curIteration))
0371         if self.curIteration < self.maxIterations:
0372             self.curIteration += 1
0373             self.setStatus(STATE_ITERATION_START)
0374         else:
0375             self.setStatus(STATE_FINISHED, True)
0376             print("APE Measurement {}, which was started at {} was finished after {} iterations, at {}".format(self.name, self.startTime, self.curIteration, self.finishTime))
0377             
0378     def kill(self):
0379         from autoSubmitterTemplates import killJobTemplate
0380         for log, err, jobId in self.runningJobs:
0381             subprocess.call(killJobTemplate.format(jobId=jobId), shell=True)
0382         self.runningJobs = []
0383         self.setStatus(STATE_NONE)
0384         
0385     def purge(self):
0386         self.kill()
0387         folderName = '{}/hists/{}'.format(base, self.name)
0388         shutil.rmtree(folderName)
0389         # remove log-files as well?
0390         
0391     def runIteration(self):
0392         global threadcounter
0393         global measurements
0394         threadcounter.acquire()
0395         try:
0396             if self.status_ == STATE_ITERATION_START:
0397                 # start bjobs
0398                 print("APE Measurement {} just started iteration {}".format(self.name, self.curIteration))
0399 
0400                 try:
0401                     self.submitJobs()
0402                     save("measurements", measurements)
0403                 except Exception as e:
0404                     # this is needed in case the scheduler goes down
0405                     print("Error submitting jobs for APE measurement {}".format(self.name))
0406                     print(e)
0407                     return
0408                     
0409             if self.status_ == STATE_BJOBS_WAITING:
0410                 # check if bjobs are finished
0411                 self.checkJobs()
0412                 save("measurements", measurements)
0413             if self.status_ == STATE_BJOBS_DONE:
0414                 # merge files
0415                 self.mergeFiles()
0416                 save("measurements", measurements)
0417             if self.status_ == STATE_MERGE_DONE:
0418                 # start summary
0419                 self.calculateApe()
0420                 save("measurements", measurements)
0421             if self.status_ == STATE_SUMMARY_DONE:
0422                 # start local setting (only if not a baseline measurement)
0423                 if self.alignment.isDesign:
0424                     self.setStatus(STATE_LOCAL_DONE)
0425                 else:
0426                     self.writeApeToDb()
0427                 save("measurements", measurements)
0428             if self.status_ == STATE_LOCAL_DONE:
0429                 self.finishIteration()
0430                 save("measurements", measurements)
0431                 # go to next iteration or finish measurement
0432             
0433             if self.status_ == STATE_BJOBS_FAILED or \
0434                 self.status_ == STATE_MERGE_FAILED or \
0435                 self.status_ == STATE_SUMMARY_FAILED or \
0436                 self.status_ == STATE_LOCAL_FAILED or \
0437                 self.status_ == STATE_INVALID_CONDITIONS or \
0438                 self.status_ == STATE_FINISHED:
0439                     with open(history_file, "a") as fi:
0440                         fi.write("APE measurement {name} which was started at {start} finished at {end} with state {state} in iteration {iteration}\n".format(name=self.name, start=self.startTime, end=self.finishTime, state=self.status(), iteration=self.curIteration))
0441                     if self.status_ == STATE_FINISHED:
0442                         global finished_measurements
0443                         finished_measurements[self.name] = self
0444                         save("finished", finished_measurements)
0445                     else:
0446                         global failed_measurements
0447                         failed_measurements[self.name] = self
0448                         
0449                         self.setStatus(STATE_NONE)
0450                         save("failed", failed_measurements)
0451                     save("measurements", measurements)
0452             if self.status_ == STATE_ITERATION_START: # this ensures that jobs do not go into idle if many measurements are done simultaneously
0453                 # start bjobs
0454                 print("APE Measurement {} just started iteration {}".format(self.name, self.curIteration))
0455                 self.submitJobs()
0456                 save("measurements", measurements)   
0457         finally:
0458             threadcounter.release()
0459 
0460 def main():    
0461     parser = argparse.ArgumentParser(description="Automatically run APE measurements")
0462     parser.add_argument("-c", "--config", action="append", dest="configs", default=[],
0463                           help="Config file that has list of measurements")
0464     parser.add_argument("-k", "--kill", action="append", dest="kill", default=[],
0465                           help="List of measurement names to kill (=remove from list and kill all bjobs)")
0466     parser.add_argument("-p", "--purge", action="append", dest="purge", default=[],
0467                           help="List of measurement names to purge (=kill and remove folder)")
0468     parser.add_argument("-r", "--resume", action="append", dest="resume", default=[],
0469                           help="Resume interrupted APE measurements which are stored in shelves (specify shelves)")
0470     parser.add_argument("-d", "--dump", action="store", dest="dump", default=None,
0471                           help='Specify in which .shelve file to store the measurements')
0472     parser.add_argument("-n", "--ncores", action="store", dest="ncores", default=1, type=int,
0473                           help='Number of threads running in parallel')
0474     parser.add_argument("-C", "--caf",action="store_true", dest="caf", default=False,
0475                                               help="Use CAF queue for condor jobs")
0476     parser.add_argument("-u", "--unitTest", action="store_true", dest="unitTest", default=False,
0477                           help='If this is used, as soon as a measurement fails, the program will exit and as exit code the status of the measurement, i.e., where it failed')
0478     args = parser.parse_args()
0479     
0480     global base
0481     global clock_interval
0482     global shelve_name
0483     global threadcounter
0484     global lock
0485     global use_caf
0486     global unitTest 
0487     
0488     use_caf = args.caf
0489     unitTest = args.unitTest
0490     
0491     threadcounter = threading.BoundedSemaphore(args.ncores)
0492     lock = threading.Lock()
0493     
0494     if args.dump != None: # choose different file than default
0495         shelve_name = args.dump
0496     elif args.resume != []:
0497         shelve_name = args.resume[0]
0498     try:
0499         base = os.environ['CMSSW_BASE']+"/src/Alignment/APEEstimation"
0500     except KeyError:
0501         print("No CMSSW environment was set, exiting")
0502         sys.exit(1)
0503 
0504     killTargets = []
0505     purgeTargets = []
0506     for toConvert in args.kill:
0507         killTargets += replaceAllRanges(toConvert)
0508         
0509     for toConvert in args.purge:
0510         purgeTargets += replaceAllRanges(toConvert)
0511     
0512     global measurements
0513     measurements = []
0514     global finished_measurements
0515     finished_measurements = {}
0516     global failed_measurements
0517     failed_measurements = {}
0518     
0519     if args.resume != []:
0520         for resumeFile in args.resume:
0521             try:
0522                 sh = shelve.open(resumeFile)
0523                 resumed = sh["measurements"]
0524                 
0525                 resumed_failed = sh["failed"]
0526                 resumed_finished = sh["finished"]
0527                 sh.close()
0528                 
0529                 for res in resumed:
0530                     measurements.append(res)
0531                     print("Measurement {} in state {} in iteration {} was resumed".format(res.name, res.status(), res.curIteration))
0532                     # Killing and purging is done here, because it doesn't make 
0533                     # sense to kill or purge a measurement that was just started
0534                     for to_kill in args.kill:
0535                         if res.name == to_kill:
0536                             res.kill()
0537                     for to_purge in args.purge:
0538                         if res.name == to_purge:
0539                             res.purge()
0540                 
0541                 failed_measurements.update(resumed_failed)
0542                 finished_measurements.update(resumed_finished)
0543                 
0544             except IOError:
0545                 print("Could not resume because {} could not be opened, exiting".format(shelve_name))
0546                 sys.exit(2)
0547             
0548     # read out from config file
0549     if args.configs != []:
0550         config = ConfigParser.RawConfigParser()
0551         config.optionxform = str 
0552         config.read(args.configs)
0553         
0554         # read measurement names
0555         meas = [str(x.split("ape:")[1]) for x in list(config.keys()) if x.startswith("ape:")]
0556 
0557         for name in meas:
0558             if name in [x.name for x in measurements]:
0559                 print("Error: APE Measurement with name {} already exists, skipping".format(name))
0560                 continue
0561             settings = dict(config.items("ape:{}".format(name)))
0562             
0563             measurement = ApeMeasurement(name, config, settings)
0564             
0565             if measurement.status_ >= STATE_ITERATION_START:
0566                 measurements.append(measurement)
0567                 print("APE Measurement {} was started".format(measurement.name))
0568     
0569     if unitTest:
0570             # status is 0 if successful, 101 if wrongly configured
0571             sys.exit(measurement.status_)
0572     
0573     initializeModuleLoading()
0574     enableCAF(use_caf)
0575     
0576     
0577     while True:
0578         # remove finished and failed measurements
0579         measurements = [measurement for measurement in measurements if not (measurement.status_==STATE_NONE or measurement.status_ == STATE_FINISHED)]
0580         save("measurements", measurements)
0581         save("failed", failed_measurements)
0582         save("finished", finished_measurements)
0583         
0584         list_threads = []
0585         for measurement in measurements:
0586             t = threading.Thread(target=measurement.runIteration)
0587             list_threads.append(t)
0588             t.start()
0589         
0590         # wait for iterations to finish
0591         for t in list_threads:
0592             t.join()
0593      
0594         if len(measurements) == 0:
0595             print("No APE measurements are active, exiting")
0596             sys.exit(0)      
0597         
0598         try: # so that interrupting does not give an error message and just ends the program
0599             time_remaining = clock_interval
0600             while time_remaining > 0:
0601                 print("Sleeping for {} seconds, you can safely [CTRL+C] now".format(time_remaining))
0602                 time.sleep(1)
0603                 time_remaining -= 1
0604                 sys.stdout.write("\033[F")
0605                 sys.stdout.write("\033[K")
0606             print("")
0607             sys.stdout.write("\033[F")
0608             sys.stdout.write("\033[K")
0609         except KeyboardInterrupt:
0610             sys.exit(0)
0611 
0612 if __name__ == "__main__":
0613     main()