File indexing completed on 2025-07-09 05:00:03
0001
0002 import sys, os
0003
0004 from itertools import cycle
0005
0006 from Configuration.PyReleaseValidation.MatrixReader import MatrixReader
0007 from Configuration.PyReleaseValidation.MatrixRunner import MatrixRunner
0008 from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector,performInjectionOptionTest
0009 from Configuration.PyReleaseValidation.MatrixUtil import cleanComputeCapabilities
0010
0011
0012 def showRaw(opt):
0013
0014 mrd = MatrixReader(opt)
0015 mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
0016
0017 return 0
0018
0019
0020
0021 def runSelected(opt):
0022
0023 mrd = MatrixReader(opt)
0024 mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
0025
0026
0027 if opt.testList:
0028 definedWf = [dwf.numId for dwf in mrd.workFlows]
0029 definedSet = set(definedWf)
0030 testSet = set(opt.testList)
0031 undefSet = testSet - definedSet
0032 if len(undefSet)>0: raise ValueError('Undefined workflows: '+', '.join(map(str,list(undefSet))))
0033 if not opt.allowDuplicates:
0034 testList = testSet
0035 ret = 0
0036 if opt.show:
0037 mrd.show(opt.testList, opt.extended, opt.cafVeto)
0038 if opt.testList : print('selected items:', opt.testList)
0039 else:
0040 mRunnerHi = MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads, opt.selected_gpus)
0041 ret = mRunnerHi.runTests(opt)
0042
0043 if opt.wmcontrol:
0044 if ret!=0:
0045 print('Cannot go on with wmagent injection with failing workflows')
0046 else:
0047 wfInjector = MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
0048 ret= wfInjector.prepare(mrd,
0049 mRunnerHi.runDirs)
0050 if ret==0:
0051 wfInjector.upload()
0052 wfInjector.submit()
0053 return ret
0054
0055
0056
0057 if __name__ == '__main__':
0058
0059
0060 predefinedSet={
0061
0062 'run1_run2' : [
0063
0064
0065 5.1,
0066 8,
0067 9.0,
0068 25,
0069 101.0,
0070
0071
0072 7.3,
0073 1306.0,
0074 1330,
0075 135.4,
0076 25202.0,
0077 250202.181,
0078 10224.0,
0079
0080
0081
0082 4.22,
0083 4.53,
0084 1000,
0085 1001,
0086
0087 136.731,
0088 136.793,
0089 136.874,
0090 ],
0091
0092 'run3' : [
0093
0094
0095 11634.0,
0096 13234.0,
0097 12434.0,
0098 12834.0,
0099 12846.0,
0100 13034.0,
0101 16834.0,
0102 17034.0,
0103 14034.0,
0104 14234.0,
0105 2500.3001,
0106
0107
0108
0109
0110 139.001,
0111
0112
0113 140.045,
0114
0115
0116 141.042,
0117
0118
0119 145.014,
0120 145.104,
0121 145.202,
0122 145.301,
0123 145.408,
0124 145.500,
0125 145.604,
0126 145.713,
0127 ],
0128
0129 'phase2' : [
0130
0131
0132 29634.0,
0133 24834.911,
0134 29634.911,
0135 29834.999,
0136 29696.0,
0137 29700.0,
0138
0139 29634.75,
0140 ],
0141
0142 'heavyIons' : [
0143
0144
0145
0146 140.56,
0147
0148 312.0,
0149 ],
0150
0151 'jetmc': [5.1, 13, 15, 25, 38, 39],
0152 'metmc' : [5.1, 15, 25, 37, 38, 39],
0153 'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30],
0154
0155 'ph2_hlt' : [29634.75,
0156 29634.751,
0157 29634.752,
0158 29634.753,
0159 29634.754,
0160 29634.755,
0161 29634.756,
0162 29634.7561,
0163 29634.7562,
0164 29634.757,
0165 29634.758,
0166 29634.759,
0167 29634.77,
0168 29634.771,
0169 29634.772]
0170 }
0171
0172 predefinedSet['limited'] = (
0173 predefinedSet['run1_run2'] +
0174 predefinedSet['run3'] +
0175 predefinedSet['phase2'] +
0176 predefinedSet['heavyIons']
0177 )
0178
0179 import argparse
0180 usage = 'usage: runTheMatrix.py --show -s '
0181
0182 parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
0183
0184 parser.add_argument('-b','--batchName',
0185 help='relval batch: suffix to be appended to Campaign name',
0186 dest='batchName',
0187 default='')
0188
0189 parser.add_argument('-m','--memoryOffset',
0190 help='memory of the wf for single core',
0191 dest='memoryOffset',
0192 type=int,
0193 default=3000)
0194
0195 parser.add_argument('--startFrom',
0196 help='Start from a specific step (e.g. GEN,SIM,DIGI,RECO)',
0197 dest='startFrom',
0198 type=str,
0199 default=None)
0200
0201 parser.add_argument('--recycle',
0202 help='Input file to recycle. To be used if the first step is an input step or togehter with --startFrom. '
0203 'N.B.: runTheMatrix.py will create its own workdirectory so if yo use a relative path, be careful.',
0204 dest='recycle',
0205 type=str,
0206 default=None)
0207
0208 parser.add_argument('--allowDuplicates',
0209 help='Allow to have duplicate workflow numbers in the list',
0210 dest='allowDuplicates',
0211 default=False,
0212 action='store_true')
0213
0214 parser.add_argument('--addMemPerCore',
0215 help='increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
0216 dest='memPerCore',
0217 type=int,
0218 default=1500)
0219
0220 parser.add_argument('-j','--nproc',
0221 help='number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
0222 dest='nProcs',
0223 type=int,
0224 default=4)
0225
0226 parser.add_argument('-t','--nThreads',
0227 help='number of threads per process to use in cmsRun.',
0228 dest='nThreads',
0229 type=int,
0230 default=1)
0231
0232 parser.add_argument('--nStreams',
0233 help='number of streams to use in cmsRun.',
0234 dest='nStreams',
0235 type=int,
0236 default=0)
0237
0238 parser.add_argument('--nEvents',
0239 help='number of events to process in cmsRun. If 0 will use the standard 10 events.',
0240 dest='nEvents',
0241 type=int,
0242 default=0)
0243
0244 parser.add_argument('--numberEventsInLuminosityBlock',
0245 help='number of events in a luminosity block',
0246 dest='numberEventsInLuminosityBlock',
0247 type=int,
0248 default=-1)
0249
0250 parser.add_argument('-n','--showMatrix',
0251 help='Only show the worflows. Use --ext to show more',
0252 dest='show',
0253 default=False,
0254 action='store_true')
0255
0256 parser.add_argument('-c','--checkInputs',
0257 help='Check if the default inputs are well defined. To be used with --show',
0258 dest='checkInputs',
0259 default=False,
0260 action='store_true')
0261
0262 parser.add_argument('-e','--extended',
0263 help='Show details of workflows, used with --show',
0264 dest='extended',
0265 default=False,
0266 action='store_true')
0267
0268 parser.add_argument('-s','--selected',
0269 help='Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
0270 dest='restricted',
0271 default=False,
0272 action='store_true')
0273
0274 parser.add_argument('-l','--list',
0275 help='Comma separated list of workflow to be shown or ran. Possible keys are also '+str(predefinedSet.keys())+'. and wild card like muon, or mc',
0276 dest='testList',
0277 default=None)
0278
0279 parser.add_argument('-f','--failed-from',
0280 help='Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
0281 dest='failed_from',
0282 default=None)
0283
0284 parser.add_argument('-r','--raw',
0285 help='Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
0286 dest='raw')
0287
0288 parser.add_argument('-i','--useInput',
0289 help='Use recyling where available. Either all, or a comma separated list of wf number.',
0290 dest='useInput',
0291 type=lambda x: x.split(','),
0292 default=None)
0293
0294 parser.add_argument('-w','--what',
0295 help='Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
0296 dest='what',
0297 default='all')
0298
0299 parser.add_argument('--step1',
0300 help='Used with --raw. Limit the production to step1',
0301 dest='step1Only',
0302 default=False)
0303
0304 parser.add_argument('--maxSteps',
0305 help='Only run maximum on maxSteps. Used when we are only interested in first n steps.',
0306 dest='maxSteps',
0307 default=9999,
0308 type=int)
0309
0310 parser.add_argument('--fromScratch',
0311 help='Comma separated list of wf to be run without recycling. all is not supported as default.',
0312 dest='fromScratch',
0313 type=lambda x: x.split(','),
0314 default=None)
0315
0316 parser.add_argument('--refRelease',
0317 help='Allow to modify the recycling dataset version',
0318 dest='refRel',
0319 default=None)
0320
0321 parser.add_argument('--wmcontrol',
0322 help='Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
0323 choices=['init','test','submit','force'],
0324 dest='wmcontrol',
0325 default=None)
0326
0327 parser.add_argument('--revertDqmio',
0328 help='When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
0329 choices=['yes','no'],
0330 dest='revertDqmio',
0331 default='no')
0332
0333 parser.add_argument('--optionswm',
0334 help='Specify a few things for wm injection',
0335 default='',
0336 dest='wmoptions')
0337
0338 parser.add_argument('--keep',
0339 help='allow to specify for which comma separated steps the output is needed',
0340 default=None)
0341
0342 parser.add_argument('--label',
0343 help='allow to give a special label to the output dataset name',
0344 default='')
0345
0346 parser.add_argument('--command',
0347 help='provide a way to add additional command to all of the cmsDriver commands in the matrix',
0348 dest='command',
0349 action='append',
0350 default=None)
0351
0352 parser.add_argument('--apply',
0353 help='allow to use the --command only for 1 comma separeated',
0354 dest='apply',
0355 default=None)
0356
0357 parser.add_argument('--workflow',
0358 help='define a workflow to be created or altered from the matrix',
0359 action='append',
0360 dest='workflow',
0361 default=None)
0362
0363 parser.add_argument('--dryRun',
0364 help='do not run the wf at all',
0365 action='store_true',
0366 dest='dryRun',
0367 default=False)
0368
0369 parser.add_argument('--testbed',
0370 help='workflow injection to cmswebtest (you need dedicated rqmgr account)',
0371 dest='testbed',
0372 default=False,
0373 action='store_true')
0374
0375 parser.add_argument('--noCafVeto',
0376 help='Run from any source, ignoring the CAF label',
0377 dest='cafVeto',
0378 default=True,
0379 action='store_false')
0380
0381 parser.add_argument('--overWrite',
0382 help='Change the content of a step for another. List of pairs.',
0383 dest='overWrite',
0384 default=None)
0385
0386 parser.add_argument('--noRun',
0387 help='Remove all run list selection from wfs',
0388 dest='noRun',
0389 default=False,
0390 action='store_true')
0391
0392 parser.add_argument('--das-options',
0393 help='Options to be passed to dasgoclient.',
0394 dest='dasOptions',
0395 default="--limit 0",
0396 action='store')
0397
0398 parser.add_argument('--job-reports',
0399 help='Dump framework job reports',
0400 dest='jobReports',
0401 default=False,
0402 action='store_true')
0403
0404 parser.add_argument('--ibeos',
0405 help='Use IB EOS site configuration',
0406 dest='IBEos',
0407 default=False,
0408 action='store_true')
0409
0410 parser.add_argument('--sites',
0411 help='Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
0412 dest='dasSites',
0413 default='T2_CH_CERN',
0414 action='store')
0415
0416 parser.add_argument('--interactive',
0417 help="Open the Matrix interactive shell",
0418 action='store_true',
0419 default=False)
0420
0421 parser.add_argument('--dbs-url',
0422 help='Overwrite DbsUrl value in JSON submitted to ReqMgr2',
0423 dest='dbsUrl',
0424 default=None,
0425 action='store')
0426
0427 gpugroup = parser.add_argument_group('GPU-related options','These options are only meaningful when --gpu is used, and is not set to forbidden.')
0428
0429 gpugroup.add_argument('--gpu','--requires-gpu',
0430 help='Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
0431 dest='gpu',
0432 choices=['forbidden', 'optional', 'required'],
0433 nargs='?',
0434 const='required',
0435 default='forbidden',
0436 action='store')
0437
0438 gpugroup.add_argument('--gpu-memory',
0439 help='Specify the minimum amount of GPU memory required by the job, in MB.',
0440 dest='GPUMemoryMB',
0441 type=int,
0442 default=8000)
0443
0444 gpugroup.add_argument('--cuda-capabilities',
0445 help='Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
0446 dest='CUDACapabilities',
0447 type=lambda x: x.split(','),
0448 default='6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6,8.7,8.9,9.0,12.0')
0449
0450
0451 cudart_version = None
0452 libcudart = os.path.realpath(os.path.expandvars('$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
0453 if os.path.isfile(libcudart):
0454 cudart_basename = os.path.basename(libcudart)
0455 cudart_version = '.'.join(cudart_basename.split('.')[2:4])
0456 gpugroup.add_argument('--cuda-runtime',
0457 help='Specify major and minor version of the CUDA runtime used to build the application.',
0458 dest='CUDARuntime',
0459 default=cudart_version)
0460
0461 gpugroup.add_argument('--force-gpu-name',
0462 help='Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
0463 dest='GPUName',
0464 default='')
0465
0466 gpugroup.add_argument('--force-cuda-driver-version',
0467 help='Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
0468 dest='CUDADriverVersion',
0469 default='')
0470
0471 gpugroup.add_argument('--force-cuda-runtime-version',
0472 help='Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
0473 dest='CUDARuntimeVersion',
0474 default='')
0475
0476 opt = parser.parse_args()
0477 opt.selected_gpus = None
0478
0479 if not opt.wmcontrol and opt.gpu != 'forbidden':
0480
0481 print(">> Running with --gpu option. Checking the available and supported GPUs.")
0482 gpus = cleanComputeCapabilities("cuda")
0483 gpus = gpus + cleanComputeCapabilities("rocm", len(gpus))
0484 available_gpus = gpus
0485
0486 if len(available_gpus) == 0:
0487 if opt.gpu == 'required':
0488 raise Exception('Launched with --gpu required and no GPU available!')
0489 print(">> No GPU available!")
0490 else:
0491 print(">> GPUs available:")
0492 [print(f) for f in available_gpus]
0493
0494
0495 gpus = [g for g in gpus if not g.isCUDA() or (g.isCUDA() and g.capability in opt.CUDACapabilities)]
0496
0497
0498 if len(opt.GPUName) > 0:
0499 gpus = [g for g in gpus if g.name == opt.GPUName]
0500
0501 if available_gpus != gpus:
0502 if len(gpus) > 0:
0503 print(">> GPUs selected:")
0504 [print(f) for f in gpus]
0505 else:
0506 if opt.gpu == 'required':
0507 raise Exception('Launched with --gpu required and no GPU selected (among those available)!')
0508 print(">> No GPU selected!")
0509 else:
0510 print(">> All selected!")
0511
0512 if len(gpus) > 0:
0513 opt.selected_gpus = cycle(gpus)
0514 else:
0515 error_str = 'No GPU selected'
0516 if opt.gpu == 'required':
0517 raise Exception('Launched with --gpu required and no GPU available (among those available)!')
0518
0519 if opt.command: opt.command = ' '.join(opt.command)
0520 os.environ["CMSSW_DAS_QUERY_SITES"]=opt.dasSites
0521 if opt.failed_from:
0522 rerunthese=[]
0523 with open(opt.failed_from,'r') as report:
0524 for report_line in report:
0525 if 'FAILED' in report_line:
0526 to_run,_=report_line.split('_',1)
0527 rerunthese.append(to_run)
0528 if opt.testList:
0529 opt.testList+=','.join(['']+rerunthese)
0530 else:
0531 opt.testList = ','.join(rerunthese)
0532
0533 if opt.IBEos:
0534 os.environ["CMSSW_USE_IBEOS"]="true"
0535 if opt.restricted:
0536 print('Deprecated, please use -l limited')
0537 if opt.testList: opt.testList+=',limited'
0538 else: opt.testList='limited'
0539
0540 def stepOrIndex(s):
0541 if s.isdigit():
0542 return int(s)
0543 else:
0544 return s
0545 if opt.apply:
0546 opt.apply=map(stepOrIndex,opt.apply.split(','))
0547 if opt.keep:
0548 opt.keep=map(stepOrIndex,opt.keep.split(','))
0549
0550 if opt.testList:
0551 testList=[]
0552 for entry in opt.testList.split(','):
0553 if not entry: continue
0554 mapped=False
0555 for k in predefinedSet:
0556 if k.lower().startswith(entry.lower()) or k.lower().endswith(entry.lower()):
0557 testList.extend(predefinedSet[k])
0558 mapped=True
0559 break
0560 if not mapped:
0561 try:
0562 testList.append(float(entry))
0563 except:
0564 print(entry,'is not a possible selected entry')
0565
0566 opt.testList = list(testList)
0567
0568 if opt.wmcontrol:
0569 performInjectionOptionTest(opt)
0570 if opt.overWrite:
0571 opt.overWrite=eval(opt.overWrite)
0572 if opt.interactive:
0573 import cmd
0574 from colorama import Fore, Style
0575 from os import isatty
0576 import subprocess
0577 import time
0578
0579 class TheMatrix(cmd.Cmd):
0580 intro = "Welcome to the Matrix (? for help)"
0581 prompt = "matrix> "
0582
0583 def __init__(self, opt):
0584 cmd.Cmd.__init__(self)
0585 self.opt_ = opt
0586 self.matrices_ = {}
0587 tmp = MatrixReader(self.opt_)
0588 self.processes_ = dict()
0589 for what in tmp.files:
0590 what = what.replace("relval_", "")
0591 self.opt_.what = what
0592 self.matrices_[what] = MatrixReader(self.opt_)
0593 self.matrices_[what].prepare(
0594 self.opt_.useInput, self.opt_.refRel, self.opt_.fromScratch
0595 )
0596 os.system("clear")
0597
0598 def do_clear(self, arg):
0599 """Clear the screen, put prompt at the top"""
0600 os.system("clear")
0601
0602 def do_exit(self, arg):
0603 print("Leaving the Matrix")
0604 return True
0605
0606 def default(self, inp):
0607 if inp == "x" or inp == "q":
0608 return self.do_exit(inp)
0609 else:
0610 is_pipe = not isatty(sys.stdin.fileno())
0611 print(Fore.RED + "Error: " + Fore.RESET + "unrecognized command.")
0612
0613 if is_pipe:
0614 sys.exit(1)
0615
0616 def help_predefined(self):
0617 print(
0618 "\n".join(
0619 [
0620 "predefined [predef1 [...]]\n",
0621 "Run w/o argument, it will print the list of known predefined workflows.",
0622 "Run with space-separated predefined workflows, it will print the workflow-ids registered to them",
0623 ]
0624 )
0625 )
0626
0627 def complete_predefined(self, text, line, start_idx, end_idx):
0628 if text and len(text) > 0:
0629 return [t for t in predefinedSet.keys() if t.startswith(text)]
0630 else:
0631 return predefinedSet.keys()
0632
0633 def do_predefined(self, arg):
0634 """Print the list of predefined workflows"""
0635 print("List of predefined workflows")
0636 if arg:
0637 for w in arg.split():
0638 if w in predefinedSet.keys():
0639 print("Predefined Set: %s" % w)
0640 print(predefinedSet[w])
0641 else:
0642 print("Unknown Set: %s" % w)
0643 else:
0644 print(
0645 "[ "
0646 + Fore.RED
0647 + ", ".join([str(k) for k in predefinedSet.keys()])
0648 + Fore.RESET
0649 + " ]"
0650 )
0651
0652 def help_showWorkflow(self):
0653 print(
0654 "\n".join(
0655 [
0656 "showWorkflow [workflow1 [...]]\n",
0657 "Run w/o arguments, it will print the list of registered macro-workflows.",
0658 "Run with space-separated workflows, it will print the full list of workflow-ids registered to them",
0659 ]
0660 )
0661 )
0662
0663 def complete_showWorkflow(self, text, line, start_idx, end_idx):
0664 if text and len(text) > 0:
0665 return [t for t in self.matrices_.keys() if t.startswith(text)]
0666 else:
0667 return self.matrices_.keys()
0668
0669 def do_showWorkflow(self, arg):
0670 if arg == "":
0671 print("Available workflows:")
0672 for k in self.matrices_.keys():
0673 print(Fore.RED + Style.BRIGHT + k)
0674 print(Style.RESET_ALL)
0675 else:
0676 selected = arg.split()
0677 for k in selected:
0678 if k not in self.matrices_.keys():
0679 print("Unknown workflow %s: skipping" % k)
0680 else:
0681 for wfl in self.matrices_[k].workFlows:
0682 print(
0683 "%s %s"
0684 % (
0685 Fore.BLUE + str(wfl.numId) + Fore.RESET,
0686 Fore.GREEN + wfl.nameId + Fore.RESET,
0687 )
0688 )
0689 print(
0690 "%s contains %d workflows"
0691 % (
0692 Fore.RED + k + Fore.RESET,
0693 len(self.matrices_[k].workFlows),
0694 )
0695 )
0696
0697 def do_runWorkflow(self, arg):
0698
0699 args = arg.split()
0700 if len(args) < 2:
0701 print(Fore.RED + Style.BRIGHT + "Wrong number of parameters passed")
0702 print(Style.RESET_ALL)
0703 return
0704 workflow_class = args[0]
0705 workflow_id = args[1]
0706 passed_down_args = list()
0707 if len(args) > 2:
0708 passed_down_args = args[2:]
0709 print(
0710 Fore.YELLOW + Style.BRIGHT + "Running with the following options:\n"
0711 )
0712 print(
0713 Fore.GREEN
0714 + Style.BRIGHT
0715 + "Workflow class: {}".format(workflow_class)
0716 )
0717 print(
0718 Fore.GREEN + Style.BRIGHT + "Workflow ID: {}".format(workflow_id)
0719 )
0720 print(
0721 Fore.GREEN
0722 + Style.BRIGHT
0723 + "Additional runTheMatrix options: {}".format(passed_down_args)
0724 )
0725 print(Style.RESET_ALL)
0726 if workflow_class not in self.matrices_.keys():
0727 print(
0728 Fore.RED
0729 + Style.BRIGHT
0730 + "Unknown workflow selected: {}".format(workflow_class)
0731 )
0732 print("Available workflows:")
0733 for k in self.matrices_.keys():
0734 print(Fore.RED + Style.BRIGHT + k)
0735 print(Style.RESET_ALL)
0736 return
0737 wflnums = [x.numId for x in self.matrices_[workflow_class].workFlows]
0738 if float(workflow_id) not in wflnums:
0739 print(
0740 Fore.RED
0741 + Style.BRIGHT
0742 + "Unknown workflow {}".format(workflow_id)
0743 )
0744 print(Fore.GREEN + Style.BRIGHT)
0745 print(wflnums)
0746 print(Style.RESET_ALL)
0747 return
0748 if workflow_id in self.processes_.keys():
0749
0750 if self.processes_[workflow_id][0].poll() is None:
0751 print(
0752 Fore.RED
0753 + Style.BRIGHT
0754 + "Workflow {} already running!".format(workflow_id)
0755 )
0756 print(Style.RESET_ALL)
0757 return
0758
0759
0760 lognames = ["stdout", "stderr"]
0761 logfiles = tuple(
0762 "%s_%s_%s.log" % (workflow_class, workflow_id, name)
0763 for name in lognames
0764 )
0765 stdout = open(logfiles[0], "w")
0766 stderr = open(logfiles[1], "w")
0767 command = ("runTheMatrix.py", "-w", workflow_class, "-l", workflow_id)
0768 if len(passed_down_args) > 0:
0769 command += tuple(passed_down_args)
0770 print(command)
0771 p = subprocess.Popen(command, stdout=stdout, stderr=stderr)
0772 self.processes_[workflow_id] = (p, time.time())
0773
0774 def complete_runWorkflow(self, text, line, start_idx, end_idx):
0775 if text and len(text) > 0:
0776 return [t for t in self.matrices_.keys() if t.startswith(text)]
0777 else:
0778 return self.matrices_.keys()
0779
0780 def help_runWorkflow(self):
0781 print(
0782 "\n".join(
0783 [
0784 "runWorkflow workflow_class workflow_id\n",
0785 "This command will launch a new and independent process that invokes",
0786 "the command:\n",
0787 "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
0788 "\nYou can specify just one workflow_class and workflow_id per invocation.",
0789 "The job will continue even after quitting the interactive session.",
0790 "stdout and stderr of the new process will be automatically",
0791 "redirected to 2 logfiles whose names contain the workflow_class",
0792 "and workflow_id. Mutiple command can be issued one after the other.",
0793 "The working directory of the new process will be the directory",
0794 "from which the interactive session has started.",
0795 "Autocompletion is available for workflow_class, but",
0796 "not for workflow_id. Supplying a wrong workflow_class or",
0797 "a non-existing workflow_id for a valid workflow_class",
0798 "will trigger an error and no process will be invoked.",
0799 "The interactive shell will keep track of all active processes",
0800 "and will prevent the accidental resubmission of an already",
0801 "active jobs.",
0802 ]
0803 )
0804 )
0805
0806 def do_jobs(self, args):
0807 print(Fore.GREEN + Style.BRIGHT + "List of jobs:")
0808 for w in self.processes_.keys():
0809 if self.processes_[w][0].poll() is None:
0810 print(
0811 Fore.YELLOW
0812 + Style.BRIGHT
0813 + "Active job: {} since {:.2f} seconds.".format(
0814 w, time.time() - self.processes_[w][1]
0815 )
0816 )
0817 else:
0818 print(Fore.RED + Style.BRIGHT + "Done job: {}".format(w))
0819 print(Style.RESET_ALL)
0820
0821 def help_jobs(self):
0822 print(
0823 "\n".join(
0824 [
0825 "Print a full list of active and done jobs submitted",
0826 "in the ongoing interactive session",
0827 ]
0828 )
0829 )
0830
0831 def do_searchInCommands(self, arg):
0832 args = arg.split()
0833 if len(args) < 3:
0834 print("searchInCommands name regexp regexp")
0835 return
0836 if args[0] not in self.matrices_.keys():
0837 print("Unknown workflow")
0838 return
0839 import re
0840
0841 pattern_dataset = None
0842 pattern_command = None
0843 try:
0844 pattern_dataset = re.compile(args[1])
0845 pattern_command = re.compile(args[2])
0846 except:
0847 print("Failed to compile regexp %s" % args[1])
0848 return
0849 counter = 0
0850 cached = []
0851 cached_steps = {}
0852 for wfl in self.matrices_[args[0]].workFlows:
0853 if re.match(pattern_dataset, wfl.nameId):
0854 for step, command in enumerate(wfl.cmds):
0855 if re.match(pattern_command, command):
0856 if wfl.numId not in cached:
0857 cached.append(wfl.numId)
0858 cached_steps[wfl.nameId] = {
0859 "steps": [],
0860 "numId": wfl.numId,
0861 }
0862 cached_steps[wfl.nameId]["steps"].append(step)
0863 else:
0864 cached_steps[wfl.nameId]["steps"].append(step)
0865 counter += 1
0866 for wfl in cached_steps:
0867 print(
0868 "%s %s [%s]"
0869 % (
0870 Fore.BLUE + str(cached_steps[wfl]["numId"]) + Fore.RESET,
0871 Fore.GREEN + wfl + Fore.RESET,
0872 Fore.YELLOW
0873 + " ".join(
0874 [str(command) for command in cached_steps[wfl]["steps"]]
0875 )
0876 + Fore.RESET,
0877 )
0878 )
0879 print(
0880 "Found %s compatible commands inside %s workflows inside %s."
0881 % (
0882 Fore.RED + str(counter) + Fore.RESET,
0883 Fore.BLUE + str(len(cached_steps.keys())),
0884 Fore.YELLOW + str(args[0]),
0885 )
0886 + Fore.RESET
0887 )
0888
0889 def help_searchInCommands(self):
0890 print(
0891 "\n".join(
0892 [
0893 "searchInCommands wfl_name dataset_search_regexp command_search_regexp\n",
0894 "This command will search for a match within all workflows registered to wfl_name.",
0895 "The search is done on both the workflow name, via the dataset_search_regexp, and the actual cmsDriver steps registered to it, via command_search_regexp.",
0896 ]
0897 )
0898 )
0899
0900 def complete_searchInCommands(self, text, line, start_idx, end_idx):
0901 if text and len(text) > 0:
0902 return [t for t in self.matrices_.keys() if t.startswith(text)]
0903 else:
0904 return self.matrices_.keys()
0905
0906 def help_searchInWorkflow(self):
0907 print(
0908 "\n".join(
0909 [
0910 "searchInWorkflow wfl_name search_regexp\n",
0911 "This command will search for a match within all workflows registered to wfl_name.",
0912 "The search is done on both the workflow name and the names of steps registered to it.",
0913 ]
0914 )
0915 )
0916
0917 def complete_searchInWorkflow(self, text, line, start_idx, end_idx):
0918 if text and len(text) > 0:
0919 return [t for t in self.matrices_.keys() if t.startswith(text)]
0920 else:
0921 return self.matrices_.keys()
0922
0923 def do_searchInWorkflow(self, arg):
0924 args = arg.split()
0925 if len(args) < 2:
0926 print("searchInWorkflow name regexp")
0927 return
0928 if args[0] not in self.matrices_.keys():
0929 print("Unknown workflow")
0930 return
0931 import re
0932
0933 pattern = None
0934 try:
0935 pattern = re.compile(args[1])
0936 except:
0937 print("Failed to compile regexp %s" % args[1])
0938 return
0939 counter = 0
0940 for wfl in self.matrices_[args[0]].workFlows:
0941 if re.match(pattern, wfl.nameId):
0942 print(
0943 "%s %s"
0944 % (
0945 Fore.BLUE + str(wfl.numId) + Fore.RESET,
0946 Fore.GREEN + wfl.nameId + Fore.RESET,
0947 )
0948 )
0949 counter += 1
0950 print(
0951 "Found %s compatible workflows inside %s"
0952 % (Fore.RED + str(counter) + Fore.RESET, Fore.YELLOW + str(args[0]))
0953 + Fore.RESET
0954 )
0955
0956 def help_search(self):
0957 print(
0958 "\n".join(
0959 [
0960 "search search_regexp\n",
0961 "This command will search for a match within all workflows registered.",
0962 "The search is done on both the workflow name and the names of steps registered to it.",
0963 ]
0964 )
0965 )
0966
0967 def do_search(self, arg):
0968 args = arg.split()
0969 if len(args) < 1:
0970 print("search regexp")
0971 return
0972 for wfl in self.matrices_.keys():
0973 self.do_searchInWorkflow(" ".join([wfl, args[0]]))
0974
0975 def help_dumpWorkflowId(self):
0976 print(
0977 "\n".join(
0978 [
0979 "dumpWorkflowId [wfl-id1 [...]]\n",
0980 "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input.",
0981 ]
0982 )
0983 )
0984
0985 def do_dumpWorkflowId(self, arg):
0986 wflids = arg.split()
0987 if len(wflids) == 0:
0988 print("dumpWorkflowId [wfl-id1 [...]]")
0989 return
0990
0991 fmt = "[%s]: %s\n"
0992 maxLen = 100
0993 for wflid in wflids:
0994 dump = True
0995 for key, mrd in self.matrices_.items():
0996 for wfl in mrd.workFlows:
0997 if wfl.numId == float(wflid):
0998 if dump:
0999 dump = False
1000 print(
1001 Fore.GREEN
1002 + str(wfl.numId)
1003 + Fore.RESET
1004 + " "
1005 + Fore.YELLOW
1006 + wfl.nameId
1007 + Fore.RESET
1008 )
1009 for i, s in enumerate(wfl.cmds):
1010 print(
1011 fmt
1012 % (
1013 Fore.RED + str(i + 1) + Fore.RESET,
1014 (str(s) + " "),
1015 )
1016 )
1017 print("\nWorkflow found in %s." % key)
1018 else:
1019 print("Workflow also found in %s." % key)
1020
1021 do_EOF = do_exit
1022
1023 TheMatrix(opt).cmdloop()
1024 sys.exit(0)
1025
1026 if opt.raw and opt.show:
1027 ret = showRaw(opt)
1028 else:
1029 ret = runSelected(opt)
1030
1031
1032 sys.exit(ret)