File indexing completed on 2025-05-29 03:17:18
0001
0002
0003
0004
0005
0006
0007 import os,sys,glob
0008 import tarfile
0009 import xml.etree.ElementTree as ET
0010 import imp
0011 import json
0012 import optparse
0013 import subprocess
0014 import logging
0015 import datetime
0016 import uuid
0017 import time
0018 from http.client import HTTPException
0019 from multiprocessing import Process, Queue
0020
0021 from CRABAPI.RawCommand import crabCommand
0022 from CRABClient.UserUtilities import getConsoleLogLevel, setConsoleLogLevel
0023 from CRABClient.ClientUtilities import LOGLEVEL_MUTE
0024 from CRABClient.ClientExceptions import CachefileNotFoundException
0025
0026
0027
0028
0029
0030 class CrabController():
0031
0032
0033
0034
0035 def __init__(self, debug=0, logger = None , workingArea = None, voGroup = None, username = None):
0036
0037
0038 setConsoleLogLevel(LOGLEVEL_MUTE)
0039 self.debug = debug
0040 if workingArea is not None:
0041 self.workingArea = workingArea
0042 else:
0043 self.workingArea = os.getcwd()
0044 self.dry_run = False
0045 if voGroup is not None:
0046 self.voGroup = voGroup
0047 else:
0048 self.voGroup = "dcms"
0049 if username is not None:
0050 self.username = username
0051 else:
0052 self.username = None
0053
0054 if logger is not None:
0055 self.logger = logger.getChild("CrabController")
0056 else:
0057
0058 self.logger = logging.getLogger("CrabController")
0059
0060
0061
0062 if len(logging.getLogger().handlers) < 1 :
0063 ch = logging.FileHandler('crabController.log', mode='a', encoding=None, delay=False)
0064 ch.setLevel(logging.DEBUG)
0065
0066 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
0067
0068 ch.setFormatter(formatter)
0069 self.logger.addHandler(ch)
0070
0071 self.crab_q = Queue()
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081 def checkwrite(self,site='T2_DE_RWTH',path='noPath'):
0082 if self.username is None: self.checkusername()
0083 try:
0084 self.logger.info( "Checking if user can write to /store/user/%s on site %s with voGroup %s"%(self.username,site , self.voGroup) )
0085 if not 'noPath' in path:
0086 res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup,'--lfn', path)
0087 else:
0088 res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup)
0089 if res['status'] == 'SUCCESS':
0090 self.logger.info("Checkwrite was sucessfully called.")
0091 return True
0092 else:
0093 self.logger.error( "The crab checkwrite command failed for site: %s"%site )
0094 return False
0095 except:
0096 self.logger.error( 'Unable to perform crab checkwrite')
0097 return False
0098
0099
0100
0101
0102
0103
0104 def submit(self,name):
0105 if self.dry_run:
0106 res = self.callCrabCommand(('submit', '--dryrun', name))
0107 self.logger.info('Dry-run: You may check the created config and sandbox')
0108 else:
0109 res = self.callCrabCommand(('submit','--wait' , name))
0110 self.logger.info("crab sumbit called for task %s"%name)
0111 if self.debug > 1:
0112 self.logger.info(str(res))
0113 return res
0114
0115
0116
0117
0118
0119
0120
0121
0122 def resubmit(self,name,joblist = None):
0123 if self.dry_run:
0124 self.logger.info('Dry-run: Created config file. ')
0125 return {}
0126
0127
0128
0129 if False:
0130 pass
0131 else:
0132 cmd = ('resubmit','--wait', os.path.join(self.workingArea,self._prepareFoldername(name)) )
0133 res = self.callCrabCommand( cmd )
0134 self.logger.info("crab resumbit called for task %s"%name)
0135 return res
0136
0137
0138
0139
0140
0141 def checkusername(self):
0142
0143
0144 try:
0145 username = os.environ["CERNUSERNAME"]
0146 return username
0147 except:pass
0148 res = crabCommand('checkusername')
0149 try:
0150 self.username = res['username']
0151 return res['username']
0152 except:
0153 return "noHNname"
0154
0155
0156
0157
0158
0159
0160
0161 def status(self,name):
0162 if self.dry_run:
0163 self.logger.info('Dry-run: Created config file. crab command would have been: %s'%cmd)
0164 else:
0165 try:
0166 if not "crab_" in name:
0167 callname = "crab_" + name
0168 else:
0169 callname = name
0170 res = self.callCrabCommand( ('status', '--long', callname) )
0171 if 'taskFailureMsg' in res and 'jobs' in res:
0172 return res['status'], res['jobs'], res['taskFailureMsg']
0173 elif 'jobs' in res and 'taskFailureMsg' not in res:
0174 return res['status'], res['jobs'],None
0175 elif 'jobs' not in res and 'taskFailureMsg' in res:
0176 return res['status'], {},res['taskFailureMsg']
0177 else:
0178 return res['status'],{},None
0179 except Exception as e:
0180 print(e)
0181 self.logger.error("Can not run crab status request")
0182 return "NOSTATE",{},None
0183
0184
0185
0186
0187
0188 def callCrabCommand( self, crabArgs ):
0189 crabCommandProcessArgs = (self.crab_q, crabArgs)
0190 print("Will run the callCrabCommand:", crabArgs)
0191 p = Process(target=crabCommandProcess, args=(crabCommandProcessArgs))
0192 p.start()
0193
0194 res = self.crab_q.get()
0195 p.join()
0196 print("The Crab command Process has finished.")
0197 return res
0198
0199
0200
0201
0202
0203 def getlog(self, name):
0204 foldername = self._prepareFoldername( name)
0205 try:
0206
0207 res = self.callCrabCommand( ('getlog', '%s' % foldername) )
0208 return res['success'], res['failed']
0209 except:
0210 self.logger.error("Error calling crab getlog for %s" %foldername)
0211 return {}, {}
0212
0213
0214
0215
0216
0217 def report(self, name):
0218 foldername = self._prepareFoldername( name)
0219 try:
0220 res = self.callCrabCommand( ('report', '%s' % foldername) )
0221 return res['analyzedLumis']
0222 except:
0223 self.logger.error("Error calling crab report for %s" %foldername)
0224
0225
0226
0227
0228
0229
0230 def readCrabConfig( self, name ):
0231 try:
0232 if os.path.exists(name):
0233 pset = name
0234 else:
0235 pset = 'crab_%s_cfg.py' % name
0236 with open( pset, 'r') as cfgfile:
0237 cfo = imp.load_source("pycfg", pset, cfgfile )
0238 config = cfo.config
0239 del cfo
0240 return config
0241 except:
0242 return False
0243
0244
0245
0246
0247
0248 @property
0249 def crabFolders(self):
0250 results = []
0251 dirlist = [ x for x in os.listdir( self.workingArea ) if (x.startswith('crab_') and os.path.isdir( os.path.join(self.workingArea,x) ) )]
0252 return dirlist
0253
0254
0255
0256
0257 def _prepareFoldername(self, name):
0258 if name.startswith("crab_"):
0259 crabfolder = '%s'%name
0260 else:
0261 crabfolder = "crab_%s "%name
0262 return crabfolder.strip()
0263
0264
0265
0266
0267
0268
0269
0270
0271
0272
0273
0274 def commandlineOptions(self,parser = optparse.OptionParser( 'usage: %prog' )):
0275
0276
0277 (currentoptions, args ) = parser.parse_args([" "])
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287
0288
0289
0290
0291 if not hasattr(currentoptions, 'dry_run'):
0292 parser.add_option( '--dry-run', action='store_true', default=False,
0293 help='Do everything except calling CRAB or registering samples to the database.' )
0294 if not hasattr(currentoptions, 'workingArea'):
0295 parser.add_option( '--workingArea',metavar='DIR',default=os.getcwd(),help='The area (full or relative path) where to create the CRAB project directory. '
0296 'If the area doesn\'t exist, CRAB will try to create it using the mkdir command' \
0297 ' (without -p option). Defaults to the current working directory.' )
0298
0299
0300
0301
0302
0303
0304
0305
0306
0307
0308 return parser
0309
0310
0311
0312
0313
0314
0315
0316
0317
0318 def crabCommandProcess(q,crabCommandArgs):
0319
0320 i=0
0321 while True:
0322 i+=1
0323 try:
0324 res = crabCommand(*crabCommandArgs)
0325 break
0326 except HTTPException as e:
0327 print("crab error ---------------")
0328 print(e)
0329 print("end error ---------------")
0330 print("will try again!")
0331 import time
0332 time.sleep(5)
0333 except CachefileNotFoundException as e:
0334 print("crab error ---------------")
0335 print(e)
0336 print("end error ---------------")
0337 print(crabCommandArgs)
0338 res={ 'status':"CachefileNotFound",'jobs':{}}
0339 break
0340 if i>5:
0341 res={ 'status':"UnexpectedError",'jobs':{}}
0342 break
0343 q.put( res )
0344
0345 class CertInfo:
0346 def __init__( self ):
0347
0348 p = subprocess.Popen("voms-proxy-info --fqan",
0349 stdout = subprocess.PIPE,
0350 stderr = subprocess.PIPE,
0351 shell=True)
0352 stdout, stderr = p.communicate()
0353
0354 if p.returncode != 0:
0355 self.vo = ""
0356 self.voGroup = ""
0357 self.voRole = ""
0358 else:
0359 lines = stdout.split(b"\n")
0360 splitline = lines[0].split(b"/")
0361 if len(splitline) < 4:
0362 splitline = lines[1].split(b"/")
0363 self.vo = splitline[1]
0364 self.voGroup = splitline[2]
0365 try:
0366 self.voRole = splitline[2].split("=")[1]
0367 if "NULL" in self.voRole:
0368 self.voGroup = ""
0369 except:
0370 self.voRole = ""
0371
0372
0373
0374 class CrabTask:
0375
0376
0377
0378
0379
0380
0381 def __init__(self,
0382 taskname="",
0383 crab_config="",
0384 crabController = None ,
0385 initUpdate = True,
0386 debuglevel = "ERROR",
0387 datasetpath = "",
0388 localDir = "",
0389 outlfn = "" ,):
0390
0391
0392 self._crabConfig = None
0393
0394 self._crabFolder = None
0395
0396 if taskname:
0397 self.name = taskname
0398 else:
0399 if not crab_config:
0400 raise ValueError("Either taskname or crab_config needs to be set")
0401 if not os.path.exists( crab_config):
0402 raise IOError("File %s not found" % crab_config )
0403 self.name = crab_config
0404 self.name = self.crabConfig.General.requestName
0405
0406 self.uuid = uuid.uuid4()
0407
0408
0409 self.log = logging.getLogger( 'crabTask' )
0410 self.log.setLevel(logging.getLevelName(debuglevel))
0411 self.jobs = {}
0412 self.localDir = localDir
0413 self.outlfn = outlfn
0414 self.isUpdating = False
0415 self.taskId = -1
0416
0417 self.nJobs = 0
0418 self.state = "NOSTATE"
0419 self.maxjobnumber = 0
0420 self.nUnsubmitted = 0
0421 self.nIdle = 0
0422 self.nRunning = 0
0423 self.nTransferring = 0
0424 self.nCooloff = 0
0425 self.nFailed = 0
0426 self.nFinished = 0
0427 self.nComplete = 0
0428 self.failureReason = None
0429 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0430
0431 self._isData = None
0432 self.resubmitCount = 0
0433
0434 self.debug = False
0435
0436 self.finalFiles = []
0437 self.totalEvents = 0
0438
0439
0440 self._datasetpath_default = datasetpath
0441
0442
0443 if initUpdate:
0444 self.update()
0445 self.updateJobStats()
0446
0447
0448
0449
0450 @property
0451 def isData( self ):
0452 if self._isData is None:
0453 try:
0454 test = self.crabConfig.Data.lumiMask
0455 self._isData = True
0456 except:
0457 if self.name.startswith( "Data_" ):
0458 self._isData = True
0459 else:
0460 self._isData = False
0461 return self._isData
0462
0463
0464
0465
0466
0467 @property
0468 def crabConfig( self ):
0469 if self._crabConfig is None:
0470 crab = CrabController()
0471 self._crabConfig = crab.readCrabConfig( self.name )
0472 return self._crabConfig
0473
0474 @property
0475 def datasetpath( self ):
0476 try:
0477 return self.crabConfig.Data.inputDataset
0478 except:
0479 pass
0480 return self._datasetpath_default
0481
0482 @property
0483 def crabFolder( self ):
0484 if not self._crabFolder is None: return self._crabFolder
0485 crab = CrabController()
0486 if os.path.exists( os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) ) ):
0487 self._crabFolder = os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) )
0488 return self._crabFolder
0489 alternative_path = os.path.join(os.path.cwd(), crab._prepareFoldername( self.name ) )
0490 if os.path.exists( alternative_path ):
0491 self._crabFolder = alternative_path
0492 return self._crabFolder
0493 self.log.error( "Unable to find folder for Task")
0494 return ""
0495
0496
0497
0498
0499 def resubmit_failed( self ):
0500 failedJobIds = []
0501 controller = CrabController()
0502 for jobkey in self.jobs.keys():
0503 job = self.jobs[jobkey]
0504 if job['State'] == 'failed':
0505 failedJobIds.append( job['JobIds'][-1] )
0506 controller.resubmit( self.name, joblist = failedJobIds )
0507 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0508
0509 @property
0510 def crab_folder(self):
0511 return os.path.join( self.crabConfig.General.workArea,
0512 "crab_" + self.crabConfig.General.requestName)
0513
0514
0515
0516 def update(self):
0517
0518 self.log.debug( "Start update for task %s" % self.name )
0519 self.isUpdating = True
0520 controller = CrabController()
0521 self.state = "UPDATING"
0522
0523
0524 self.log.debug( "Try to get status for task" )
0525 self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
0526 self.log.debug( "Found state: %s" % self.state )
0527 if self.state=="FAILED":
0528
0529 time.sleep(2)
0530 self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
0531 self.nJobs = len(self.jobs)
0532 self.updateJobStats()
0533 if self.state == "NOSTATE":
0534 self.log.debug( "Trying to resubmit because of NOSTATE" )
0535 if self.resubmitCount < 3: self.self.handleNoState()
0536
0537
0538 self.isUpdating = False
0539 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0540
0541
0542
0543
0544
0545 def handleNoState( self ):
0546 crab = CrabController()
0547 if "The CRAB3 server backend could not resubmit your task because the Grid scheduler answered with an error." in task.failureReason:
0548
0549 cmd = 'mv %s bak_%s' %(crab._prepareFoldername( self.name ),crab._prepareFoldername( self.name ))
0550 p = subprocess.Popen(cmd,stdout=subprocess.PIPE, shell=True)
0551 (out,err) = p.communicate()
0552 self.state = "SHEDERR"
0553 configName = '%s_cfg.py' %(crab._prepareFoldername( self.name ))
0554 crab.submit( configName )
0555
0556 elif task.failureReason is not None:
0557 self.state = "ERRHANDLE"
0558 crab.resubmit( self.name )
0559 self.resubmitCount += 1
0560
0561 def test_print(self):
0562 return self.uuid
0563
0564
0565
0566
0567 def updateJobStats(self,dCacheFileList = None):
0568 jobKeys = sorted(self.jobs.keys())
0569 try:
0570 intJobkeys = [int(x) for x in jobKeys]
0571 except:
0572 print("error parsing job numers to int")
0573
0574
0575
0576 stateDict = {'unsubmitted':0,'idle':0,'running':0,'transferring':0,'cooloff':0,'failed':0,'finished':0}
0577 nComplete = 0
0578
0579
0580 for key in jobKeys:
0581 job = self.jobs[key]
0582
0583 for statekey in stateDict.keys():
0584 if statekey in job['State']:
0585 stateDict[statekey]+=1
0586
0587 if dCacheFileList is not None:
0588 outputFilename = "%s_%s"%( self.name, key)
0589 if 'finished' in statekey and any(outputFilename in s for s in dCacheFileList):
0590 nComplete +=1
0591
0592 for state in stateDict:
0593 attrname = "n" + state.capitalize()
0594 setattr(self, attrname, stateDict[state])
0595 self.nComplete = nComplete
0596
0597
0598
0599
0600
0601
0602 def readLogArch(self, logArchName):
0603 JobNumber = logArchName.split("/")[-1].split("_")[1].split(".")[0]
0604 log = {'readEvents' : 0}
0605 with tarfile.open( logArchName, "r") as tar:
0606 try:
0607 JobXmlFile = tar.extractfile('FrameworkJobReport-%s.xml' % JobNumber)
0608 root = ET.fromstring( JobXmlFile.read() )
0609 for child in root:
0610 if child.tag == 'InputFile':
0611 for subchild in child:
0612 if subchild.tag == 'EventsRead':
0613 nEvents = int(subchild.text)
0614 log.update({'readEvents' : nEvents})
0615 break
0616 break
0617 except:
0618 print("Can not parse / read %s" % logArchName)
0619 return log
0620
0621
0622
0623
0624 class TaskStats:
0625
0626
0627
0628
0629
0630
0631
0632 def __init__(self, tasklist = None):
0633 if tasklist is not None:
0634 self.updateStats(tasklist)
0635 else:
0636 self.clearStats()
0637
0638
0639
0640
0641
0642
0643
0644 def updateStats(self,tasklist):
0645 self.clearStats()
0646 self.nTasks = len(tasklist)
0647 for task in tasklist:
0648 if not task.isUpdating:
0649 self.nUnsubmitted += task.nUnsubmitted
0650 self.nIdle += task.nIdle
0651 self.nRunning += task.nRunning
0652 self.nTransferring += task.nTransferring
0653 self.nCooloff += task.nCooloff
0654 self.nFailed += task.nFailed
0655 self.nFinished += task.nFinished
0656 self.nComplete += task.nComplete
0657
0658
0659
0660
0661
0662 def clearStats(self):
0663 self.nTasks = 0
0664 self.nUnsubmitted = 0
0665 self.nIdle = 0
0666 self.nRunning = 0
0667 self.nTransferring = 0
0668 self.nCooloff = 0
0669 self.nFailed = 0
0670 self.nFinished = 0
0671 self.nComplete = 0