File indexing completed on 2023-03-17 10:42:22
0001
0002
0003
0004
0005
0006
0007 from __future__ import print_function
0008 import os,sys,glob
0009 import tarfile
0010 import xml.etree.ElementTree as ET
0011 import imp
0012 import json
0013 import optparse
0014 import subprocess
0015 import logging
0016 import datetime
0017 import uuid
0018 import time
0019 from httplib import HTTPException
0020 from multiprocessing import Process, Queue
0021
0022 from CRABAPI.RawCommand import crabCommand
0023 from CRABClient.UserUtilities import getConsoleLogLevel, setConsoleLogLevel
0024 from CRABClient.ClientUtilities import LOGLEVEL_MUTE
0025 from CRABClient.ClientExceptions import CachefileNotFoundException
0026
0027
0028
0029
0030
0031 class CrabController():
0032
0033
0034
0035
0036 def __init__(self, debug=0, logger = None , workingArea = None, voGroup = None, username = None):
0037
0038 setConsoleLogLevel(LOGLEVEL_MUTE)
0039 self.debug = debug
0040 if workingArea is not None:
0041 self.workingArea = workingArea
0042 else:
0043 self.workingArea = os.getcwd()
0044 self.dry_run = False
0045 if voGroup is not None:
0046 self.voGroup = voGroup
0047 else:
0048 self.voGroup = "dcms"
0049 if username is not None:
0050 self.username = username
0051 else:
0052 self.username = None
0053
0054 if logger is not None:
0055 self.logger = logger.getChild("CrabController")
0056 else:
0057
0058 self.logger = logging.getLogger("CrabController")
0059
0060
0061
0062 if len(logging.getLogger().handlers) < 1 :
0063 ch = logging.FileHandler('crabController.log', mode='a', encoding=None, delay=False)
0064 ch.setLevel(logging.DEBUG)
0065
0066 formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' )
0067
0068 ch.setFormatter(formatter)
0069 self.logger.addHandler(ch)
0070
0071 self.crab_q = Queue()
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081 def checkwrite(self,site='T2_DE_RWTH',path='noPath'):
0082 if self.username is None: self.checkusername()
0083 try:
0084 self.logger.info( "Checking if user can write to /store/user/%s on site %s with voGroup %s"%(self.username,site , self.voGroup) )
0085 if not 'noPath' in path:
0086 res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup,'--lfn', path)
0087 else:
0088 res = crabCommand('checkwrite','--site',site,'--voGroup',self.voGroup)
0089 if res['status'] == 'SUCCESS':
0090 self.logger.info("Checkwrite was sucessfully called.")
0091 return True
0092 else:
0093 self.logger.error( "The crab checkwrite command failed for site: %s"%site )
0094 return False
0095 except:
0096 self.logger.error( 'Unable to perform crab checkwrite')
0097 return False
0098
0099
0100
0101
0102
0103
0104 def submit(self,name):
0105 if self.dry_run:
0106 res = self.callCrabCommand(('submit', '--dryrun', name))
0107 self.logger.info('Dry-run: You may check the created config and sandbox')
0108 else:
0109 res = self.callCrabCommand(('submit','--wait' , name))
0110 self.logger.info("crab sumbit called for task %s"%name)
0111 if self.debug > 1:
0112 self.logger.info(str(res))
0113 return res
0114
0115
0116
0117
0118
0119
0120
0121
0122 def resubmit(self,name,joblist = None):
0123 if self.dry_run:
0124 self.logger.info('Dry-run: Created config file. ')
0125 return {}
0126
0127
0128
0129 if False:
0130 pass
0131 else:
0132 cmd = ('resubmit','--wait', os.path.join(self.workingArea,self._prepareFoldername(name)) )
0133 res = self.callCrabCommand( cmd )
0134 self.logger.info("crab resumbit called for task %s"%name)
0135 return res
0136
0137
0138
0139
0140
0141 def checkusername(self):
0142
0143
0144 try:
0145 username = os.environ["CERNUSERNAME"]
0146 return username
0147 except:pass
0148 res = crabCommand('checkusername')
0149 try:
0150 self.username = res['username']
0151 return res['username']
0152 except:
0153 return "noHNname"
0154
0155
0156
0157
0158
0159
0160
0161 def status(self,name):
0162 if self.dry_run:
0163 self.logger.info('Dry-run: Created config file. crab command would have been: %s'%cmd)
0164 else:
0165 try:
0166 if not "crab_" in name:
0167 callname = "crab_" + name
0168 else:
0169 callname = name
0170 res = self.callCrabCommand( ('status', '--long', callname) )
0171
0172 if 'taskFailureMsg' in res and 'jobs' in res:
0173 return res['status'], res['jobs'], res['taskFailureMsg']
0174 elif 'jobs' in res and 'taskFailureMsg' not in res:
0175 return res['status'], res['jobs'],None
0176 elif 'jobs' not in res and 'taskFailureMsg' in res:
0177 return res['status'], {},res['taskFailureMsg']
0178 else:
0179 return res['status'],{},None
0180 except Exception as e:
0181 print(e)
0182 self.logger.error("Can not run crab status request")
0183 return "NOSTATE",{},None
0184
0185
0186
0187
0188
0189 def callCrabCommand( self, crabArgs ):
0190 crabCommandProcessArgs = (self.crab_q, crabArgs)
0191 p = Process(target=crabCommandProcess, args=(crabCommandProcessArgs))
0192 p.start()
0193 res = self.crab_q.get()
0194 p.join()
0195 return res
0196
0197
0198
0199
0200
0201 def getlog(self, name):
0202 foldername = self._prepareFoldername( name)
0203 try:
0204
0205 res = self.callCrabCommand( ('getlog', '%s' % foldername) )
0206 return res['success'], res['failed']
0207 except:
0208 self.logger.error("Error calling crab getlog for %s" %foldername)
0209 return {}, {}
0210
0211
0212
0213
0214
0215 def report(self, name):
0216 foldername = self._prepareFoldername( name)
0217 try:
0218 res = self.callCrabCommand( ('report', '%s' % foldername) )
0219 return res['analyzedLumis']
0220 except:
0221 self.logger.error("Error calling crab report for %s" %foldername)
0222
0223
0224
0225
0226
0227
0228 def readCrabConfig( self, name ):
0229 try:
0230 if os.path.exists(name):
0231 pset = name
0232 else:
0233 pset = 'crab_%s_cfg.py' % name
0234 with open( pset, 'r') as cfgfile:
0235 cfo = imp.load_source("pycfg", pset, cfgfile )
0236 config = cfo.config
0237 del cfo
0238 return config
0239 except:
0240 return False
0241
0242
0243
0244
0245
0246 @property
0247 def crabFolders(self):
0248 results = []
0249 dirlist = [ x for x in os.listdir( self.workingArea ) if (x.startswith('crab_') and os.path.isdir( os.path.join(self.workingArea,x) ) )]
0250 return dirlist
0251
0252
0253
0254
0255 def _prepareFoldername(self, name):
0256 if name.startswith("crab_"):
0257 crabfolder = '%s'%name
0258 else:
0259 crabfolder = "crab_%s "%name
0260 return crabfolder.strip()
0261
0262
0263
0264
0265
0266
0267
0268
0269
0270
0271
0272 def commandlineOptions(self,parser = optparse.OptionParser( 'usage: %prog' )):
0273
0274
0275 (currentoptions, args ) = parser.parse_args([" "])
0276
0277
0278
0279
0280
0281
0282
0283
0284
0285
0286
0287
0288
0289 if not hasattr(currentoptions, 'dry_run'):
0290 parser.add_option( '--dry-run', action='store_true', default=False,
0291 help='Do everything except calling CRAB or registering samples to the database.' )
0292 if not hasattr(currentoptions, 'workingArea'):
0293 parser.add_option( '--workingArea',metavar='DIR',default=os.getcwd(),help='The area (full or relative path) where to create the CRAB project directory. '
0294 'If the area doesn\'t exist, CRAB will try to create it using the mkdir command' \
0295 ' (without -p option). Defaults to the current working directory.' )
0296
0297
0298
0299
0300
0301
0302
0303
0304
0305
0306 return parser
0307
0308
0309
0310
0311
0312
0313
0314
0315
0316 def crabCommandProcess(q,crabCommandArgs):
0317
0318 i=0
0319 while True:
0320 i+=1
0321 try:
0322 res = crabCommand(*crabCommandArgs)
0323 break
0324 except HTTPException as e:
0325 print("crab error ---------------")
0326 print(e)
0327 print("end error ---------------")
0328 print("will try again!")
0329 import time
0330 time.sleep(5)
0331 except CachefileNotFoundException as e:
0332 print("crab error ---------------")
0333 print(e)
0334 print("end error ---------------")
0335 print(crabCommandArgs)
0336 res={ 'status':"CachefileNotFound",'jobs':{}}
0337 break
0338 if i>5:
0339 res={ 'status':"UnexpectedError",'jobs':{}}
0340 break
0341 q.put( res )
0342
0343 class CertInfo:
0344 def __init__( self ):
0345 p = subprocess.Popen("voms-proxy-info --fqan",
0346 stdout = subprocess.PIPE,
0347 stderr = subprocess.PIPE,
0348 shell=True)
0349 stdout, stderr = p.communicate()
0350 print(stdout)
0351 if p.returncode != 0:
0352 self.vo = ""
0353 self.voGroup = ""
0354 self.voRole = ""
0355 else:
0356 lines = stdout.split("\n")
0357 splitline = lines[0].split("/")
0358 if len(splitline) < 4:
0359 splitline = lines[1].split("/")
0360 self.vo = splitline[1]
0361 self.voGroup = splitline[2]
0362 try:
0363 self.voRole = splitline[2].split("=")[1]
0364 if "NULL" in self.voRole:
0365 self.voGroup = ""
0366 except:
0367 self.voRole = ""
0368
0369
0370
0371
0372 class CrabTask:
0373
0374
0375
0376
0377
0378
0379 def __init__(self,
0380 taskname="",
0381 crab_config="",
0382 crabController = None ,
0383 initUpdate = True,
0384 debuglevel = "ERROR",
0385 datasetpath = "",
0386 localDir = "",
0387 outlfn = "" ,):
0388
0389
0390 self._crabConfig = None
0391
0392 self._crabFolder = None
0393
0394 if taskname:
0395 self.name = taskname
0396 else:
0397 if not crab_config:
0398 raise ValueError("Either taskname or crab_config needs to be set")
0399 if not os.path.exists( crab_config):
0400 raise IOError("File %s not found" % crab_config )
0401 self.name = crab_config
0402 self.name = self.crabConfig.General.requestName
0403 self.uuid = uuid.uuid4()
0404
0405
0406 self.log = logging.getLogger( 'crabTask' )
0407 self.log.setLevel(logging._levelNames[ debuglevel ])
0408 self.jobs = {}
0409 self.localDir = localDir
0410 self.outlfn = outlfn
0411 self.isUpdating = False
0412 self.taskId = -1
0413
0414 self.nJobs = 0
0415 self.state = "NOSTATE"
0416 self.maxjobnumber = 0
0417 self.nUnsubmitted = 0
0418 self.nIdle = 0
0419 self.nRunning = 0
0420 self.nTransferring = 0
0421 self.nCooloff = 0
0422 self.nFailed = 0
0423 self.nFinished = 0
0424 self.nComplete = 0
0425 self.failureReason = None
0426 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0427
0428 self._isData = None
0429 self.resubmitCount = 0
0430
0431 self.debug = False
0432
0433 self.finalFiles = []
0434 self.totalEvents = 0
0435
0436
0437 self._datasetpath_default = datasetpath
0438
0439
0440 if initUpdate:
0441 self.update()
0442 self.updateJobStats()
0443
0444
0445
0446
0447 @property
0448 def isData( self ):
0449 if self._isData is None:
0450 try:
0451 test = self.crabConfig.Data.lumiMask
0452 self._isData = True
0453 except:
0454 if self.name.startswith( "Data_" ):
0455 self._isData = True
0456 else:
0457 self._isData = False
0458 return self._isData
0459
0460
0461
0462
0463
0464 @property
0465 def crabConfig( self ):
0466 if self._crabConfig is None:
0467 crab = CrabController()
0468 self._crabConfig = crab.readCrabConfig( self.name )
0469 return self._crabConfig
0470
0471 @property
0472 def datasetpath( self ):
0473 try:
0474 return self.crabConfig.Data.inputDataset
0475 except:
0476 pass
0477 return self._datasetpath_default
0478
0479 @property
0480 def crabFolder( self ):
0481 if not self._crabFolder is None: return self._crabFolder
0482 crab = CrabController()
0483 if os.path.exists( os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) ) ):
0484 self._crabFolder = os.path.join( self.crabConfig.General.workArea, crab._prepareFoldername( self.name ) )
0485 return self._crabFolder
0486 alternative_path = os.path.join(os.path.cwd(), crab._prepareFoldername( self.name ) )
0487 if os.path.exists( alternative_path ):
0488 self._crabFolder = alternative_path
0489 return self._crabFolder
0490 self.log.error( "Unable to find folder for Task")
0491 return ""
0492
0493
0494
0495
0496 def resubmit_failed( self ):
0497 failedJobIds = []
0498 controller = CrabController()
0499 for jobkey in self.jobs.keys():
0500 job = self.jobs[jobkey]
0501 if job['State'] == 'failed':
0502 failedJobIds.append( job['JobIds'][-1] )
0503 controller.resubmit( self.name, joblist = failedJobIds )
0504 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0505
0506 @property
0507 def crab_folder(self):
0508 return os.path.join( self.crabConfig.General.workArea,
0509 "crab_" + self.crabConfig.General.requestName)
0510
0511
0512
0513 def update(self):
0514
0515 self.log.debug( "Start update for task %s" % self.name )
0516 self.isUpdating = True
0517 controller = CrabController()
0518 self.state = "UPDATING"
0519
0520
0521 self.log.debug( "Try to get status for task" )
0522 self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
0523 self.log.debug( "Found state: %s" % self.state )
0524 if self.state=="FAILED":
0525
0526 time.sleep(2)
0527 self.state , self.jobs,self.failureReason = controller.status(self.crab_folder)
0528 self.nJobs = len(self.jobs)
0529 self.updateJobStats()
0530 if self.state == "NOSTATE":
0531 self.log.debug( "Trying to resubmit because of NOSTATE" )
0532 if self.resubmitCount < 3: self.self.handleNoState()
0533
0534
0535 self.isUpdating = False
0536 self.lastUpdate = datetime.datetime.now().strftime( "%Y-%m-%d_%H.%M.%S" )
0537
0538
0539
0540
0541
0542 def handleNoState( self ):
0543 crab = CrabController()
0544 if "The CRAB3 server backend could not resubmit your task because the Grid scheduler answered with an error." in task.failureReason:
0545
0546 cmd = 'mv %s bak_%s' %(crab._prepareFoldername( self.name ),crab._prepareFoldername( self.name ))
0547 p = subprocess.Popen(cmd,stdout=subprocess.PIPE, shell=True)
0548 (out,err) = p.communicate()
0549 self.state = "SHEDERR"
0550 configName = '%s_cfg.py' %(crab._prepareFoldername( self.name ))
0551 crab.submit( configName )
0552
0553 elif task.failureReason is not None:
0554 self.state = "ERRHANDLE"
0555 crab.resubmit( self.name )
0556 self.resubmitCount += 1
0557
0558 def test_print(self):
0559 return self.uuid
0560
0561
0562
0563
0564 def updateJobStats(self,dCacheFileList = None):
0565 jobKeys = sorted(self.jobs.keys())
0566 try:
0567 intJobkeys = [int(x) for x in jobKeys]
0568 except:
0569 print("error parsing job numers to int")
0570
0571
0572
0573 stateDict = {'unsubmitted':0,'idle':0,'running':0,'transferring':0,'cooloff':0,'failed':0,'finished':0}
0574 nComplete = 0
0575
0576
0577 for key in jobKeys:
0578 job = self.jobs[key]
0579
0580 for statekey in stateDict.keys():
0581 if statekey in job['State']:
0582 stateDict[statekey]+=1
0583
0584 if dCacheFileList is not None:
0585 outputFilename = "%s_%s"%( self.name, key)
0586 if 'finished' in statekey and any(outputFilename in s for s in dCacheFileList):
0587 nComplete +=1
0588
0589 for state in stateDict:
0590 attrname = "n" + state.capitalize()
0591 setattr(self, attrname, stateDict[state])
0592 self.nComplete = nComplete
0593
0594
0595
0596
0597
0598
0599 def readLogArch(self, logArchName):
0600 JobNumber = logArchName.split("/")[-1].split("_")[1].split(".")[0]
0601 log = {'readEvents' : 0}
0602 with tarfile.open( logArchName, "r") as tar:
0603 try:
0604 JobXmlFile = tar.extractfile('FrameworkJobReport-%s.xml' % JobNumber)
0605 root = ET.fromstring( JobXmlFile.read() )
0606 for child in root:
0607 if child.tag == 'InputFile':
0608 for subchild in child:
0609 if subchild.tag == 'EventsRead':
0610 nEvents = int(subchild.text)
0611 log.update({'readEvents' : nEvents})
0612 break
0613 break
0614 except:
0615 print("Can not parse / read %s" % logArchName)
0616 return log
0617
0618
0619
0620
0621 class TaskStats:
0622
0623
0624
0625
0626
0627
0628
0629 def __init__(self, tasklist = None):
0630 if tasklist is not None:
0631 self.updateStats(tasklist)
0632 else:
0633 self.clearStats()
0634
0635
0636
0637
0638
0639
0640
0641 def updateStats(self,tasklist):
0642 self.clearStats()
0643 self.nTasks = len(tasklist)
0644 for task in tasklist:
0645 if not task.isUpdating:
0646 self.nUnsubmitted += task.nUnsubmitted
0647 self.nIdle += task.nIdle
0648 self.nRunning += task.nRunning
0649 self.nTransferring += task.nTransferring
0650 self.nCooloff += task.nCooloff
0651 self.nFailed += task.nFailed
0652 self.nFinished += task.nFinished
0653 self.nComplete += task.nComplete
0654
0655
0656
0657
0658
0659 def clearStats(self):
0660 self.nTasks = 0
0661 self.nUnsubmitted = 0
0662 self.nIdle = 0
0663 self.nRunning = 0
0664 self.nTransferring = 0
0665 self.nCooloff = 0
0666 self.nFailed = 0
0667 self.nFinished = 0
0668 self.nComplete = 0