File indexing completed on 2024-04-06 11:58:29
0001
0002
0003
0004
0005
0006
0007 from __future__ import print_function
0008 import os,sys,glob
0009 import tarfile
0010 import xml.etree.ElementTree as ET
0011 import imp
0012 import json
0013 import optparse
0014 import subprocess
0015 import logging
0016 import datetime
0017 import uuid
0018 import time
0019 from http.client import HTTPException
0020 from multiprocessing import Process, Queue
0021
0022 from CRABAPI.RawCommand import crabCommand
0023 from CRABClient.UserUtilities import getConsoleLogLevel, setConsoleLogLevel
0024 from CRABClient.ClientUtilities import LOGLEVEL_MUTE
0025 from CRABClient.ClientExceptions import CachefileNotFoundException
0026
0027
0028
0029
0030
0031 class CrabController():
0032
0033
0034
0035
0036 def __init__(self, debug=0, logger = None , workingArea = None, voGroup = None, username = None):
0037
0038 setConsoleLogLevel(LOGLEVEL_MUTE)
0039 self.debug = debug
0040 if workingArea is not None:
0041 self.workingArea = workingArea
0042 else:
0043 self.workingArea = os.getcwd()
0044 self.dry_run = False
0045 if voGroup is not None:
0046 self.voGroup = voGroup
0047 else:
0048 self.voGroup = "dcms"
0049 if username is not None:
0050 self.username = username
0051 else:
0052 self.username = None
0053
0054 if logger is not None:
0055 self.logger = logger.getChild("CrabController")
0056 else:
0057
0058 self.logger = logging.getLogger("CrabController")
0059
0060
0061
0062 if len(logging.getLogger().handlers) < 1 :
0063 ch = logging.FileHandler('crabController.log', mode='a', encoding=None, delay=False)
0064 ch.setLevel(logging.DEBUG)
0065
0066 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
0067
0068 ch.setFormatter(formatter)
0069 self.logger.addHandler(ch)
0070
0071 self.crab_q = Queue()
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081 def checkwrite(self,site='T2_DE_RWTH',path='noPath'):
0082 if self.username is None: self.checkusername()
0083 try:
0084 self.logger.info( "Checking if user can write to /store/user/%s on site %s with voGroup %s"%(self.username,site , self.voGroup) )
0085 if not 'noPath' in path:
0086 res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup,'--lfn', path)
0087 else:
0088 res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup)
0089 if res['status'] == 'SUCCESS':
0090 self.logger.info("Checkwrite was sucessfully called.")
0091 return True
0092 else:
0093 self.logger.error( "The crab checkwrite command failed for site: %s"%site )
0094 return False
0095 except:
0096 self.logger.error( 'Unable to perform crab checkwrite')
0097 return False
0098
0099
0100
0101
0102
0103
0104 def submit(self,name):
0105 if self.dry_run:
0106 res = self.callCrabCommand(('submit', '--dryrun', name))
0107 self.logger.info('Dry-run: You may check the created config and sandbox')
0108 else:
0109 res = self.callCrabCommand(('submit','--wait' , name))
0110 self.logger.info("crab sumbit called for task %s"%name)
0111 if self.debug > 1:
0112 self.logger.info(str(res))
0113 return res
0114
0115
0116
0117
0118
0119
0120
0121
0122 def resubmit(self,name,joblist = None):
0123 if self.dry_run:
0124 self.logger.info('Dry-run: Created config file. ')
0125 return {}
0126
0127
0128
0129 if False:
0130 pass
0131 else:
0132 cmd = ('resubmit','--wait', os.path.join(self.workingArea,self._prepareFoldername(name)) )
0133 res = self.callCrabCommand( cmd )
0134 self.logger.info("crab resumbit called for task %s"%name)
0135 return res
0136
0137
0138
0139
0140
0141 def checkusername(self):
0142
0143
0144 try:
0145 username = os.environ["CERNUSERNAME"]
0146 return username
0147 except:pass
0148 res = crabCommand('checkusername')
0149 try:
0150 self.username = res['username']
0151 return res['username']
0152 except:
0153 return "noHNname"
0154
0155
0156
0157
0158
0159
0160
0161 def status(self,name):
0162 if self.dry_run:
0163 self.logger.info('Dry-run: Created config file. crab command would have been: %s'%cmd)
0164 else:
0165 try:
0166 if not "crab_" in name:
0167 callname = "crab_" + name
0168 else:
0169 callname = name
0170 res = self.callCrabCommand( ('status', '--long', callname) )
0171 if 'taskFailureMsg' in res and 'jobs' in res:
0172 return res['status'], res['jobs'], res['taskFailureMsg']
0173 elif 'jobs' in res and 'taskFailureMsg' not in res:
0174 return res['status'], res['jobs'],None
0175 elif 'jobs' not in res and 'taskFailureMsg' in res:
0176 return res['status'], {},res['taskFailureMsg']
0177 else:
0178 return res['status'],{},None
0179 except Exception as e:
0180 print(e)
0181 self.logger.error("Can not run crab status request")
0182 return "NOSTATE",{},None
0183
0184
0185
0186
0187
0188 def callCrabCommand( self, crabArgs ):
0189 crabCommandProcessArgs = (self.crab_q, crabArgs)
0190 p = Process(target=crabCommandProcess, args=(crabCommandProcessArgs))
0191 p.start()
0192 res = self.crab_q.get()
0193 p.join()
0194 return res
0195
0196
0197
0198
0199
0200 def getlog(self, name):
0201 foldername = self._prepareFoldername( name)
0202 try:
0203
0204 res = self.callCrabCommand( ('getlog', '%s' % foldername) )
0205 return res['success'], res['failed']
0206 except:
0207 self.logger.error("Error calling crab getlog for %s" %foldername)
0208 return {}, {}
0209
0210
0211
0212
0213
0214 def report(self, name):
0215 foldername = self._prepareFoldername( name)
0216 try:
0217 res = self.callCrabCommand( ('report', '%s' % foldername) )
0218 return res['analyzedLumis']
0219 except:
0220 self.logger.error("Error calling crab report for %s" %foldername)
0221
0222
0223
0224
0225
0226
0227 def readCrabConfig( self, name ):
0228 try:
0229 if os.path.exists(name):
0230 pset = name
0231 else:
0232 pset = 'crab_%s_cfg.py' % name
0233 with open( pset, 'r') as cfgfile:
0234 cfo = imp.load_source("pycfg", pset, cfgfile )
0235 config = cfo.config
0236 del cfo
0237 return config
0238 except:
0239 return False
0240
0241
0242
0243
0244
0245 @property
0246 def crabFolders(self):
0247 results = []
0248 dirlist = [ x for x in os.listdir( self.workingArea ) if (x.startswith('crab_') and os.path.isdir( os.path.join(self.workingArea,x) ) )]
0249 return dirlist
0250
0251
0252
0253
0254 def _prepareFoldername(self, name):
0255 if name.startswith("crab_"):
0256 crabfolder = '%s'%name
0257 else:
0258 crabfolder = "crab_%s "%name
0259 return crabfolder.strip()
0260
0261
0262
0263
0264
0265
0266
0267
0268
0269
0270
0271 def commandlineOptions(self,parser = optparse.OptionParser( 'usage: %prog' )):
0272
0273
0274 (currentoptions, args ) = parser.parse_args([" "])
0275
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287
0288 if not hasattr(currentoptions, 'dry_run'):
0289 parser.add_option( '--dry-run', action='store_true', default=False,
0290 help='Do everything except calling CRAB or registering samples to the database.' )
0291 if not hasattr(currentoptions, 'workingArea'):
0292 parser.add_option( '--workingArea',metavar='DIR',default=os.getcwd(),help='The area (full or relative path) where to create the CRAB project directory. '
0293 'If the area doesn\'t exist, CRAB will try to create it using the mkdir command' \
0294 ' (without -p option). Defaults to the current working directory.' )
0295
0296
0297
0298
0299
0300
0301
0302
0303
0304
0305 return parser
0306
0307
0308
0309
0310
0311
0312
0313
0314
0315 def crabCommandProcess(q,crabCommandArgs):
0316
0317 i=0
0318 while True:
0319 i+=1
0320 try:
0321 res = crabCommand(*crabCommandArgs)
0322 break
0323 except HTTPException as e:
0324 print("crab error ---------------")
0325 print(e)
0326 print("end error ---------------")
0327 print("will try again!")
0328 import time
0329 time.sleep(5)
0330 except CachefileNotFoundException as e:
0331 print("crab error ---------------")
0332 print(e)
0333 print("end error ---------------")
0334 print(crabCommandArgs)
0335 res={ 'status':"CachefileNotFound",'jobs':{}}
0336 break
0337 if i>5:
0338 res={ 'status':"UnexpectedError",'jobs':{}}
0339 break
0340 q.put( res )
0341
0342 class CertInfo:
0343 def __init__( self ):
0344 p = subprocess.Popen("voms-proxy-info --fqan",
0345 stdout = subprocess.PIPE,
0346 stderr = subprocess.PIPE,
0347 shell=True)
0348 stdout, stderr = p.communicate()
0349 print(stdout)
0350 if p.returncode != 0:
0351 self.vo = ""
0352 self.voGroup = ""
0353 self.voRole = ""
0354 else:
0355 lines = stdout.split(b"\n")
0356 splitline = lines[0].split(b"/")
0357 if len(splitline) < 4:
0358 splitline = lines[1].split(b"/")
0359 self.vo = splitline[1]
0360 self.voGroup = splitline[2]
0361 try:
0362 self.voRole = splitline[2].split("=")[1]
0363 if "NULL" in self.voRole:
0364 self.voGroup = ""
0365 except:
0366 self.voRole = ""
0367
0368
0369
0370
0371 class CrabTask:
0372
0373
0374
0375
0376
0377
0378 def __init__(self,
0379 taskname="",
0380 crab_config="",
0381 crabController = None ,
0382 initUpdate = True,
0383 debuglevel = "ERROR",
0384 datasetpath = "",
0385 localDir = "",
0386 outlfn = "" ,):
0387
0388
0389 self._crabConfig = None
0390
0391 self._crabFolder = None
0392
0393 if taskname:
0394 self.name = taskname
0395 else:
0396 if not crab_config:
0397 raise ValueError("Either taskname or crab_config needs to be set")
0398 if not os.path.exists( crab_config):
0399 raise IOError("File %s not found" % crab_config )
0400 self.name = crab_config
0401 self.name = self.crabConfig.General.requestName
0402 self.uuid = uuid.uuid4()
0403
0404
0405 self.log = logging.getLogger( 'crabTask' )
0406 self.log.setLevel(logging.getLevelName(debuglevel))
0407 self.jobs = {}
0408 self.localDir = localDir
0409 self.outlfn = outlfn
0410 self.isUpdating = False
0411 self.taskId = -1
0412
0413 self.nJobs = 0
0414 self.state = "NOSTATE"
0415 self.maxjobnumber = 0
0416 self.nUnsubmitted = 0
0417 self.nIdle = 0
0418 self.nRunning = 0
0419 self.nTransferring = 0
0420 self.nCooloff = 0
0421 self.nFailed = 0
0422 self.nFinished = 0
0423 self.nComplete = 0
0424 self.failureReason = None
0425 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0426
0427 self._isData = None
0428 self.resubmitCount = 0
0429
0430 self.debug = False
0431
0432 self.finalFiles = []
0433 self.totalEvents = 0
0434
0435
0436 self._datasetpath_default = datasetpath
0437
0438
0439 if initUpdate:
0440 self.update()
0441 self.updateJobStats()
0442
0443
0444
0445
0446 @property
0447 def isData( self ):
0448 if self._isData is None:
0449 try:
0450 test = self.crabConfig.Data.lumiMask
0451 self._isData = True
0452 except:
0453 if self.name.startswith( "Data_" ):
0454 self._isData = True
0455 else:
0456 self._isData = False
0457 return self._isData
0458
0459
0460
0461
0462
0463 @property
0464 def crabConfig( self ):
0465 if self._crabConfig is None:
0466 crab = CrabController()
0467 self._crabConfig = crab.readCrabConfig( self.name )
0468 return self._crabConfig
0469
0470 @property
0471 def datasetpath( self ):
0472 try:
0473 return self.crabConfig.Data.inputDataset
0474 except:
0475 pass
0476 return self._datasetpath_default
0477
0478 @property
0479 def crabFolder( self ):
0480 if not self._crabFolder is None: return self._crabFolder
0481 crab = CrabController()
0482 if os.path.exists( os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) ) ):
0483 self._crabFolder = os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) )
0484 return self._crabFolder
0485 alternative_path = os.path.join(os.path.cwd(), crab._prepareFoldername( self.name ) )
0486 if os.path.exists( alternative_path ):
0487 self._crabFolder = alternative_path
0488 return self._crabFolder
0489 self.log.error( "Unable to find folder for Task")
0490 return ""
0491
0492
0493
0494
0495 def resubmit_failed( self ):
0496 failedJobIds = []
0497 controller = CrabController()
0498 for jobkey in self.jobs.keys():
0499 job = self.jobs[jobkey]
0500 if job['State'] == 'failed':
0501 failedJobIds.append( job['JobIds'][-1] )
0502 controller.resubmit( self.name, joblist = failedJobIds )
0503 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0504
0505 @property
0506 def crab_folder(self):
0507 return os.path.join( self.crabConfig.General.workArea,
0508 "crab_" + self.crabConfig.General.requestName)
0509
0510
0511
0512 def update(self):
0513
0514 self.log.debug( "Start update for task %s" % self.name )
0515 self.isUpdating = True
0516 controller = CrabController()
0517 self.state = "UPDATING"
0518
0519
0520 self.log.debug( "Try to get status for task" )
0521 self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
0522 self.log.debug( "Found state: %s" % self.state )
0523 if self.state=="FAILED":
0524
0525 time.sleep(2)
0526 self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
0527 self.nJobs = len(self.jobs)
0528 self.updateJobStats()
0529 if self.state == "NOSTATE":
0530 self.log.debug( "Trying to resubmit because of NOSTATE" )
0531 if self.resubmitCount < 3: self.self.handleNoState()
0532
0533
0534 self.isUpdating = False
0535 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0536
0537
0538
0539
0540
0541 def handleNoState( self ):
0542 crab = CrabController()
0543 if "The CRAB3 server backend could not resubmit your task because the Grid scheduler answered with an error." in task.failureReason:
0544
0545 cmd = 'mv %s bak_%s' %(crab._prepareFoldername( self.name ),crab._prepareFoldername( self.name ))
0546 p = subprocess.Popen(cmd,stdout=subprocess.PIPE, shell=True)
0547 (out,err) = p.communicate()
0548 self.state = "SHEDERR"
0549 configName = '%s_cfg.py' %(crab._prepareFoldername( self.name ))
0550 crab.submit( configName )
0551
0552 elif task.failureReason is not None:
0553 self.state = "ERRHANDLE"
0554 crab.resubmit( self.name )
0555 self.resubmitCount += 1
0556
0557 def test_print(self):
0558 return self.uuid
0559
0560
0561
0562
0563 def updateJobStats(self,dCacheFileList = None):
0564 jobKeys = sorted(self.jobs.keys())
0565 try:
0566 intJobkeys = [int(x) for x in jobKeys]
0567 except:
0568 print("error parsing job numers to int")
0569
0570
0571
0572 stateDict = {'unsubmitted':0,'idle':0,'running':0,'transferring':0,'cooloff':0,'failed':0,'finished':0}
0573 nComplete = 0
0574
0575
0576 for key in jobKeys:
0577 job = self.jobs[key]
0578
0579 for statekey in stateDict.keys():
0580 if statekey in job['State']:
0581 stateDict[statekey]+=1
0582
0583 if dCacheFileList is not None:
0584 outputFilename = "%s_%s"%( self.name, key)
0585 if 'finished' in statekey and any(outputFilename in s for s in dCacheFileList):
0586 nComplete +=1
0587
0588 for state in stateDict:
0589 attrname = "n" + state.capitalize()
0590 setattr(self, attrname, stateDict[state])
0591 self.nComplete = nComplete
0592
0593
0594
0595
0596
0597
0598 def readLogArch(self, logArchName):
0599 JobNumber = logArchName.split("/")[-1].split("_")[1].split(".")[0]
0600 log = {'readEvents' : 0}
0601 with tarfile.open( logArchName, "r") as tar:
0602 try:
0603 JobXmlFile = tar.extractfile('FrameworkJobReport-%s.xml' % JobNumber)
0604 root = ET.fromstring( JobXmlFile.read() )
0605 for child in root:
0606 if child.tag == 'InputFile':
0607 for subchild in child:
0608 if subchild.tag == 'EventsRead':
0609 nEvents = int(subchild.text)
0610 log.update({'readEvents' : nEvents})
0611 break
0612 break
0613 except:
0614 print("Can not parse / read %s" % logArchName)
0615 return log
0616
0617
0618
0619
0620 class TaskStats:
0621
0622
0623
0624
0625
0626
0627
0628 def __init__(self, tasklist = None):
0629 if tasklist is not None:
0630 self.updateStats(tasklist)
0631 else:
0632 self.clearStats()
0633
0634
0635
0636
0637
0638
0639
0640 def updateStats(self,tasklist):
0641 self.clearStats()
0642 self.nTasks = len(tasklist)
0643 for task in tasklist:
0644 if not task.isUpdating:
0645 self.nUnsubmitted += task.nUnsubmitted
0646 self.nIdle += task.nIdle
0647 self.nRunning += task.nRunning
0648 self.nTransferring += task.nTransferring
0649 self.nCooloff += task.nCooloff
0650 self.nFailed += task.nFailed
0651 self.nFinished += task.nFinished
0652 self.nComplete += task.nComplete
0653
0654
0655
0656
0657
0658 def clearStats(self):
0659 self.nTasks = 0
0660 self.nUnsubmitted = 0
0661 self.nIdle = 0
0662 self.nRunning = 0
0663 self.nTransferring = 0
0664 self.nCooloff = 0
0665 self.nFailed = 0
0666 self.nFinished = 0
0667 self.nComplete = 0