File indexing completed on 2024-11-25 02:29:08
0001
0002
0003
0004
0005
0006
0007 import os,sys,glob
0008 import tarfile
0009 import xml.etree.ElementTree as ET
0010 import imp
0011 import json
0012 import optparse
0013 import subprocess
0014 import logging
0015 import datetime
0016 import uuid
0017 import time
0018 from http.client import HTTPException
0019 from multiprocessing import Process, Queue
0020
0021 from CRABAPI.RawCommand import crabCommand
0022 from CRABClient.UserUtilities import getConsoleLogLevel, setConsoleLogLevel
0023 from CRABClient.ClientUtilities import LOGLEVEL_MUTE
0024 from CRABClient.ClientExceptions import CachefileNotFoundException
0025
0026
0027
0028
0029
0030 class CrabController():
0031
0032
0033
0034
0035 def __init__(self, debug=0, logger = None , workingArea = None, voGroup = None, username = None):
0036
0037 setConsoleLogLevel(LOGLEVEL_MUTE)
0038 self.debug = debug
0039 if workingArea is not None:
0040 self.workingArea = workingArea
0041 else:
0042 self.workingArea = os.getcwd()
0043 self.dry_run = False
0044 if voGroup is not None:
0045 self.voGroup = voGroup
0046 else:
0047 self.voGroup = "dcms"
0048 if username is not None:
0049 self.username = username
0050 else:
0051 self.username = None
0052
0053 if logger is not None:
0054 self.logger = logger.getChild("CrabController")
0055 else:
0056
0057 self.logger = logging.getLogger("CrabController")
0058
0059
0060
0061 if len(logging.getLogger().handlers) < 1 :
0062 ch = logging.FileHandler('crabController.log', mode='a', encoding=None, delay=False)
0063 ch.setLevel(logging.DEBUG)
0064
0065 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
0066
0067 ch.setFormatter(formatter)
0068 self.logger.addHandler(ch)
0069
0070 self.crab_q = Queue()
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080 def checkwrite(self,site='T2_DE_RWTH',path='noPath'):
0081 if self.username is None: self.checkusername()
0082 try:
0083 self.logger.info( "Checking if user can write to /store/user/%s on site %s with voGroup %s"%(self.username,site , self.voGroup) )
0084 if not 'noPath' in path:
0085 res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup,'--lfn', path)
0086 else:
0087 res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup)
0088 if res['status'] == 'SUCCESS':
0089 self.logger.info("Checkwrite was sucessfully called.")
0090 return True
0091 else:
0092 self.logger.error( "The crab checkwrite command failed for site: %s"%site )
0093 return False
0094 except:
0095 self.logger.error( 'Unable to perform crab checkwrite')
0096 return False
0097
0098
0099
0100
0101
0102
0103 def submit(self,name):
0104 if self.dry_run:
0105 res = self.callCrabCommand(('submit', '--dryrun', name))
0106 self.logger.info('Dry-run: You may check the created config and sandbox')
0107 else:
0108 res = self.callCrabCommand(('submit','--wait' , name))
0109 self.logger.info("crab sumbit called for task %s"%name)
0110 if self.debug > 1:
0111 self.logger.info(str(res))
0112 return res
0113
0114
0115
0116
0117
0118
0119
0120
0121 def resubmit(self,name,joblist = None):
0122 if self.dry_run:
0123 self.logger.info('Dry-run: Created config file. ')
0124 return {}
0125
0126
0127
0128 if False:
0129 pass
0130 else:
0131 cmd = ('resubmit','--wait', os.path.join(self.workingArea,self._prepareFoldername(name)) )
0132 res = self.callCrabCommand( cmd )
0133 self.logger.info("crab resumbit called for task %s"%name)
0134 return res
0135
0136
0137
0138
0139
0140 def checkusername(self):
0141
0142
0143 try:
0144 username = os.environ["CERNUSERNAME"]
0145 return username
0146 except:pass
0147 res = crabCommand('checkusername')
0148 try:
0149 self.username = res['username']
0150 return res['username']
0151 except:
0152 return "noHNname"
0153
0154
0155
0156
0157
0158
0159
0160 def status(self,name):
0161 if self.dry_run:
0162 self.logger.info('Dry-run: Created config file. crab command would have been: %s'%cmd)
0163 else:
0164 try:
0165 if not "crab_" in name:
0166 callname = "crab_" + name
0167 else:
0168 callname = name
0169 res = self.callCrabCommand( ('status', '--long', callname) )
0170 if 'taskFailureMsg' in res and 'jobs' in res:
0171 return res['status'], res['jobs'], res['taskFailureMsg']
0172 elif 'jobs' in res and 'taskFailureMsg' not in res:
0173 return res['status'], res['jobs'],None
0174 elif 'jobs' not in res and 'taskFailureMsg' in res:
0175 return res['status'], {},res['taskFailureMsg']
0176 else:
0177 return res['status'],{},None
0178 except Exception as e:
0179 print(e)
0180 self.logger.error("Can not run crab status request")
0181 return "NOSTATE",{},None
0182
0183
0184
0185
0186
0187 def callCrabCommand( self, crabArgs ):
0188 crabCommandProcessArgs = (self.crab_q, crabArgs)
0189 p = Process(target=crabCommandProcess, args=(crabCommandProcessArgs))
0190 p.start()
0191 res = self.crab_q.get()
0192 p.join()
0193 return res
0194
0195
0196
0197
0198
0199 def getlog(self, name):
0200 foldername = self._prepareFoldername( name)
0201 try:
0202
0203 res = self.callCrabCommand( ('getlog', '%s' % foldername) )
0204 return res['success'], res['failed']
0205 except:
0206 self.logger.error("Error calling crab getlog for %s" %foldername)
0207 return {}, {}
0208
0209
0210
0211
0212
0213 def report(self, name):
0214 foldername = self._prepareFoldername( name)
0215 try:
0216 res = self.callCrabCommand( ('report', '%s' % foldername) )
0217 return res['analyzedLumis']
0218 except:
0219 self.logger.error("Error calling crab report for %s" %foldername)
0220
0221
0222
0223
0224
0225
0226 def readCrabConfig( self, name ):
0227 try:
0228 if os.path.exists(name):
0229 pset = name
0230 else:
0231 pset = 'crab_%s_cfg.py' % name
0232 with open( pset, 'r') as cfgfile:
0233 cfo = imp.load_source("pycfg", pset, cfgfile )
0234 config = cfo.config
0235 del cfo
0236 return config
0237 except:
0238 return False
0239
0240
0241
0242
0243
0244 @property
0245 def crabFolders(self):
0246 results = []
0247 dirlist = [ x for x in os.listdir( self.workingArea ) if (x.startswith('crab_') and os.path.isdir( os.path.join(self.workingArea,x) ) )]
0248 return dirlist
0249
0250
0251
0252
0253 def _prepareFoldername(self, name):
0254 if name.startswith("crab_"):
0255 crabfolder = '%s'%name
0256 else:
0257 crabfolder = "crab_%s "%name
0258 return crabfolder.strip()
0259
0260
0261
0262
0263
0264
0265
0266
0267
0268
0269
0270 def commandlineOptions(self,parser = optparse.OptionParser( 'usage: %prog' )):
0271
0272
0273 (currentoptions, args ) = parser.parse_args([" "])
0274
0275
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287 if not hasattr(currentoptions, 'dry_run'):
0288 parser.add_option( '--dry-run', action='store_true', default=False,
0289 help='Do everything except calling CRAB or registering samples to the database.' )
0290 if not hasattr(currentoptions, 'workingArea'):
0291 parser.add_option( '--workingArea',metavar='DIR',default=os.getcwd(),help='The area (full or relative path) where to create the CRAB project directory. '
0292 'If the area doesn\'t exist, CRAB will try to create it using the mkdir command' \
0293 ' (without -p option). Defaults to the current working directory.' )
0294
0295
0296
0297
0298
0299
0300
0301
0302
0303
0304 return parser
0305
0306
0307
0308
0309
0310
0311
0312
0313
0314 def crabCommandProcess(q,crabCommandArgs):
0315
0316 i=0
0317 while True:
0318 i+=1
0319 try:
0320 res = crabCommand(*crabCommandArgs)
0321 break
0322 except HTTPException as e:
0323 print("crab error ---------------")
0324 print(e)
0325 print("end error ---------------")
0326 print("will try again!")
0327 import time
0328 time.sleep(5)
0329 except CachefileNotFoundException as e:
0330 print("crab error ---------------")
0331 print(e)
0332 print("end error ---------------")
0333 print(crabCommandArgs)
0334 res={ 'status':"CachefileNotFound",'jobs':{}}
0335 break
0336 if i>5:
0337 res={ 'status':"UnexpectedError",'jobs':{}}
0338 break
0339 q.put( res )
0340
0341 class CertInfo:
0342 def __init__( self ):
0343 p = subprocess.Popen("voms-proxy-info --fqan",
0344 stdout = subprocess.PIPE,
0345 stderr = subprocess.PIPE,
0346 shell=True)
0347 stdout, stderr = p.communicate()
0348 print(stdout)
0349 if p.returncode != 0:
0350 self.vo = ""
0351 self.voGroup = ""
0352 self.voRole = ""
0353 else:
0354 lines = stdout.split(b"\n")
0355 splitline = lines[0].split(b"/")
0356 if len(splitline) < 4:
0357 splitline = lines[1].split(b"/")
0358 self.vo = splitline[1]
0359 self.voGroup = splitline[2]
0360 try:
0361 self.voRole = splitline[2].split("=")[1]
0362 if "NULL" in self.voRole:
0363 self.voGroup = ""
0364 except:
0365 self.voRole = ""
0366
0367
0368
0369
0370 class CrabTask:
0371
0372
0373
0374
0375
0376
0377 def __init__(self,
0378 taskname="",
0379 crab_config="",
0380 crabController = None ,
0381 initUpdate = True,
0382 debuglevel = "ERROR",
0383 datasetpath = "",
0384 localDir = "",
0385 outlfn = "" ,):
0386
0387
0388 self._crabConfig = None
0389
0390 self._crabFolder = None
0391
0392 if taskname:
0393 self.name = taskname
0394 else:
0395 if not crab_config:
0396 raise ValueError("Either taskname or crab_config needs to be set")
0397 if not os.path.exists( crab_config):
0398 raise IOError("File %s not found" % crab_config )
0399 self.name = crab_config
0400 self.name = self.crabConfig.General.requestName
0401 self.uuid = uuid.uuid4()
0402
0403
0404 self.log = logging.getLogger( 'crabTask' )
0405 self.log.setLevel(logging.getLevelName(debuglevel))
0406 self.jobs = {}
0407 self.localDir = localDir
0408 self.outlfn = outlfn
0409 self.isUpdating = False
0410 self.taskId = -1
0411
0412 self.nJobs = 0
0413 self.state = "NOSTATE"
0414 self.maxjobnumber = 0
0415 self.nUnsubmitted = 0
0416 self.nIdle = 0
0417 self.nRunning = 0
0418 self.nTransferring = 0
0419 self.nCooloff = 0
0420 self.nFailed = 0
0421 self.nFinished = 0
0422 self.nComplete = 0
0423 self.failureReason = None
0424 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0425
0426 self._isData = None
0427 self.resubmitCount = 0
0428
0429 self.debug = False
0430
0431 self.finalFiles = []
0432 self.totalEvents = 0
0433
0434
0435 self._datasetpath_default = datasetpath
0436
0437
0438 if initUpdate:
0439 self.update()
0440 self.updateJobStats()
0441
0442
0443
0444
0445 @property
0446 def isData( self ):
0447 if self._isData is None:
0448 try:
0449 test = self.crabConfig.Data.lumiMask
0450 self._isData = True
0451 except:
0452 if self.name.startswith( "Data_" ):
0453 self._isData = True
0454 else:
0455 self._isData = False
0456 return self._isData
0457
0458
0459
0460
0461
0462 @property
0463 def crabConfig( self ):
0464 if self._crabConfig is None:
0465 crab = CrabController()
0466 self._crabConfig = crab.readCrabConfig( self.name )
0467 return self._crabConfig
0468
0469 @property
0470 def datasetpath( self ):
0471 try:
0472 return self.crabConfig.Data.inputDataset
0473 except:
0474 pass
0475 return self._datasetpath_default
0476
0477 @property
0478 def crabFolder( self ):
0479 if not self._crabFolder is None: return self._crabFolder
0480 crab = CrabController()
0481 if os.path.exists( os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) ) ):
0482 self._crabFolder = os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) )
0483 return self._crabFolder
0484 alternative_path = os.path.join(os.path.cwd(), crab._prepareFoldername( self.name ) )
0485 if os.path.exists( alternative_path ):
0486 self._crabFolder = alternative_path
0487 return self._crabFolder
0488 self.log.error( "Unable to find folder for Task")
0489 return ""
0490
0491
0492
0493
0494 def resubmit_failed( self ):
0495 failedJobIds = []
0496 controller = CrabController()
0497 for jobkey in self.jobs.keys():
0498 job = self.jobs[jobkey]
0499 if job['State'] == 'failed':
0500 failedJobIds.append( job['JobIds'][-1] )
0501 controller.resubmit( self.name, joblist = failedJobIds )
0502 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0503
0504 @property
0505 def crab_folder(self):
0506 return os.path.join( self.crabConfig.General.workArea,
0507 "crab_" + self.crabConfig.General.requestName)
0508
0509
0510
0511 def update(self):
0512
0513 self.log.debug( "Start update for task %s" % self.name )
0514 self.isUpdating = True
0515 controller = CrabController()
0516 self.state = "UPDATING"
0517
0518
0519 self.log.debug( "Try to get status for task" )
0520 self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
0521 self.log.debug( "Found state: %s" % self.state )
0522 if self.state=="FAILED":
0523
0524 time.sleep(2)
0525 self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
0526 self.nJobs = len(self.jobs)
0527 self.updateJobStats()
0528 if self.state == "NOSTATE":
0529 self.log.debug( "Trying to resubmit because of NOSTATE" )
0530 if self.resubmitCount < 3: self.self.handleNoState()
0531
0532
0533 self.isUpdating = False
0534 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0535
0536
0537
0538
0539
0540 def handleNoState( self ):
0541 crab = CrabController()
0542 if "The CRAB3 server backend could not resubmit your task because the Grid scheduler answered with an error." in task.failureReason:
0543
0544 cmd = 'mv %s bak_%s' %(crab._prepareFoldername( self.name ),crab._prepareFoldername( self.name ))
0545 p = subprocess.Popen(cmd,stdout=subprocess.PIPE, shell=True)
0546 (out,err) = p.communicate()
0547 self.state = "SHEDERR"
0548 configName = '%s_cfg.py' %(crab._prepareFoldername( self.name ))
0549 crab.submit( configName )
0550
0551 elif task.failureReason is not None:
0552 self.state = "ERRHANDLE"
0553 crab.resubmit( self.name )
0554 self.resubmitCount += 1
0555
0556 def test_print(self):
0557 return self.uuid
0558
0559
0560
0561
0562 def updateJobStats(self,dCacheFileList = None):
0563 jobKeys = sorted(self.jobs.keys())
0564 try:
0565 intJobkeys = [int(x) for x in jobKeys]
0566 except:
0567 print("error parsing job numers to int")
0568
0569
0570
0571 stateDict = {'unsubmitted':0,'idle':0,'running':0,'transferring':0,'cooloff':0,'failed':0,'finished':0}
0572 nComplete = 0
0573
0574
0575 for key in jobKeys:
0576 job = self.jobs[key]
0577
0578 for statekey in stateDict.keys():
0579 if statekey in job['State']:
0580 stateDict[statekey]+=1
0581
0582 if dCacheFileList is not None:
0583 outputFilename = "%s_%s"%( self.name, key)
0584 if 'finished' in statekey and any(outputFilename in s for s in dCacheFileList):
0585 nComplete +=1
0586
0587 for state in stateDict:
0588 attrname = "n" + state.capitalize()
0589 setattr(self, attrname, stateDict[state])
0590 self.nComplete = nComplete
0591
0592
0593
0594
0595
0596
0597 def readLogArch(self, logArchName):
0598 JobNumber = logArchName.split("/")[-1].split("_")[1].split(".")[0]
0599 log = {'readEvents' : 0}
0600 with tarfile.open( logArchName, "r") as tar:
0601 try:
0602 JobXmlFile = tar.extractfile('FrameworkJobReport-%s.xml' % JobNumber)
0603 root = ET.fromstring( JobXmlFile.read() )
0604 for child in root:
0605 if child.tag == 'InputFile':
0606 for subchild in child:
0607 if subchild.tag == 'EventsRead':
0608 nEvents = int(subchild.text)
0609 log.update({'readEvents' : nEvents})
0610 break
0611 break
0612 except:
0613 print("Can not parse / read %s" % logArchName)
0614 return log
0615
0616
0617
0618
0619 class TaskStats:
0620
0621
0622
0623
0624
0625
0626
0627 def __init__(self, tasklist = None):
0628 if tasklist is not None:
0629 self.updateStats(tasklist)
0630 else:
0631 self.clearStats()
0632
0633
0634
0635
0636
0637
0638
0639 def updateStats(self,tasklist):
0640 self.clearStats()
0641 self.nTasks = len(tasklist)
0642 for task in tasklist:
0643 if not task.isUpdating:
0644 self.nUnsubmitted += task.nUnsubmitted
0645 self.nIdle += task.nIdle
0646 self.nRunning += task.nRunning
0647 self.nTransferring += task.nTransferring
0648 self.nCooloff += task.nCooloff
0649 self.nFailed += task.nFailed
0650 self.nFinished += task.nFinished
0651 self.nComplete += task.nComplete
0652
0653
0654
0655
0656
0657 def clearStats(self):
0658 self.nTasks = 0
0659 self.nUnsubmitted = 0
0660 self.nIdle = 0
0661 self.nRunning = 0
0662 self.nTransferring = 0
0663 self.nCooloff = 0
0664 self.nFailed = 0
0665 self.nFinished = 0
0666 self.nComplete = 0