File indexing completed on 2025-09-12 09:51:02
0001
0002 import sys, os
0003
0004 from itertools import cycle
0005
0006 from Configuration.PyReleaseValidation.MatrixReader import MatrixReader
0007 from Configuration.PyReleaseValidation.MatrixRunner import MatrixRunner
0008 from Configuration.PyReleaseValidation.MatrixInjector import MatrixInjector,performInjectionOptionTest
0009 from Configuration.PyReleaseValidation.MatrixUtil import cleanComputeCapabilities
0010
0011
0012 def showRaw(opt):
0013
0014 mrd = MatrixReader(opt)
0015 mrd.showRaw(opt.useInput, opt.refRel, opt.fromScratch, opt.raw, opt.step1Only, selected=opt.testList)
0016
0017 return 0
0018
0019
0020
0021 def runSelected(opt):
0022
0023 mrd = MatrixReader(opt)
0024 mrd.prepare(opt.useInput, opt.refRel, opt.fromScratch)
0025
0026
0027 if opt.testList:
0028 definedWf = [dwf.numId for dwf in mrd.workFlows]
0029 definedSet = set(definedWf)
0030 testSet = set(opt.testList)
0031 undefSet = testSet - definedSet
0032 if len(undefSet)>0: raise ValueError('Undefined workflows: '+', '.join(map(str,list(undefSet))))
0033 if not opt.allowDuplicates:
0034 testList = testSet
0035 ret = 0
0036 if opt.show:
0037 mrd.show(opt.testList, opt.extended, opt.cafVeto)
0038 if opt.testList : print('selected items:', opt.testList)
0039 else:
0040 mRunnerHi = MatrixRunner(mrd.workFlows, opt.nProcs, opt.nThreads, opt.selected_gpus)
0041 ret = mRunnerHi.runTests(opt)
0042
0043 if opt.wmcontrol:
0044 if ret!=0:
0045 print('Cannot go on with wmagent injection with failing workflows')
0046 else:
0047 wfInjector = MatrixInjector(opt,mode=opt.wmcontrol,options=opt.wmoptions)
0048 ret= wfInjector.prepare(mrd,
0049 mRunnerHi.runDirs)
0050 if ret==0:
0051 wfInjector.upload()
0052 wfInjector.submit()
0053 return ret
0054
0055
0056
0057 if __name__ == '__main__':
0058
0059
0060 predefinedSet={
0061
0062 'run1_run2' : [
0063
0064
0065 5.1,
0066 8,
0067 9.0,
0068 25,
0069 101.0,
0070
0071
0072 7.3,
0073 1306.0,
0074 1330,
0075 135.4,
0076 25202.0,
0077 250202.181,
0078 10224.0,
0079
0080
0081
0082 4.22,
0083 4.53,
0084 1000,
0085 1001,
0086
0087 136.731,
0088 136.793,
0089 136.874,
0090 ],
0091
0092 'run3' : [
0093
0094
0095 11634.0,
0096 13234.0,
0097 12434.0,
0098 12834.0,
0099 12846.0,
0100 13034.0,
0101 16834.0,
0102 17034.0,
0103 14034.0,
0104 14234.0,
0105 2500.3001,
0106
0107
0108
0109
0110 139.001,
0111
0112
0113 140.045,
0114
0115
0116 141.042,
0117
0118
0119 145.014,
0120 145.104,
0121 145.202,
0122 145.301,
0123 145.408,
0124 145.500,
0125 145.604,
0126 145.713,
0127 ],
0128
0129 'phase2' : [
0130
0131
0132 29634.0,
0133 24834.911,
0134 29634.911,
0135 29834.999,
0136 29696.0,
0137 29700.0,
0138
0139 29634.75,
0140 ],
0141
0142 'heavyIons' : [
0143
0144
0145
0146 140.56,
0147
0148 312.0,
0149 ],
0150
0151 'jetmc': [5.1, 13, 15, 25, 38, 39],
0152 'metmc' : [5.1, 15, 25, 37, 38, 39],
0153 'muonmc' : [5.1, 124.4, 124.5, 20, 21, 22, 23, 25, 30],
0154
0155 'ph2_hlt' : [29634.75,
0156 29634.7501,
0157 29634.751,
0158 29634.752,
0159 29634.753,
0160 29634.754,
0161 29634.755,
0162 29634.756,
0163 29634.7561,
0164 29634.7562,
0165 29634.757,
0166 29634.758,
0167 29634.759,
0168 29634.77,
0169 29634.771,
0170 29634.772,
0171 29634.773]
0172 }
0173
0174 predefinedSet['limited'] = (
0175 predefinedSet['run1_run2'] +
0176 predefinedSet['run3'] +
0177 predefinedSet['phase2'] +
0178 predefinedSet['heavyIons']
0179 )
0180
0181 import argparse
0182 usage = 'usage: runTheMatrix.py --show -s '
0183
0184 parser = argparse.ArgumentParser(usage,formatter_class=argparse.ArgumentDefaultsHelpFormatter)
0185
0186 parser.add_argument('-b','--batchName',
0187 help='relval batch: suffix to be appended to Campaign name',
0188 dest='batchName',
0189 default='')
0190
0191 parser.add_argument('-m','--memoryOffset',
0192 help='memory of the wf for single core',
0193 dest='memoryOffset',
0194 type=int,
0195 default=3000)
0196
0197 parser.add_argument('--startFrom',
0198 help='Start from a specific step (e.g. GEN,SIM,DIGI,RECO)',
0199 dest='startFrom',
0200 type=str,
0201 default=None)
0202
0203 parser.add_argument('--recycle',
0204 help='Input file to recycle. To be used if the first step is an input step or togehter with --startFrom. '
0205 'N.B.: runTheMatrix.py will create its own workdirectory so if yo use a relative path, be careful.',
0206 dest='recycle',
0207 type=str,
0208 default=None)
0209
0210 parser.add_argument('--allowDuplicates',
0211 help='Allow to have duplicate workflow numbers in the list',
0212 dest='allowDuplicates',
0213 default=False,
0214 action='store_true')
0215
0216 parser.add_argument('--addMemPerCore',
0217 help='increase of memory per each n > 1 core: memory(n_core) = memoryOffset + (n_core-1) * memPerCore',
0218 dest='memPerCore',
0219 type=int,
0220 default=1500)
0221
0222 parser.add_argument('-j','--nproc',
0223 help='number of processes. 0 Will use 4 processes, not execute anything but create the wfs',
0224 dest='nProcs',
0225 type=int,
0226 default=4)
0227
0228 parser.add_argument('-t','--nThreads',
0229 help='number of threads per process to use in cmsRun.',
0230 dest='nThreads',
0231 type=int,
0232 default=1)
0233
0234 parser.add_argument('--nStreams',
0235 help='number of streams to use in cmsRun.',
0236 dest='nStreams',
0237 type=int,
0238 default=0)
0239
0240 parser.add_argument('--nEvents',
0241 help='number of events to process in cmsRun. If 0 will use the standard 10 events.',
0242 dest='nEvents',
0243 type=int,
0244 default=0)
0245
0246 parser.add_argument('--numberEventsInLuminosityBlock',
0247 help='number of events in a luminosity block',
0248 dest='numberEventsInLuminosityBlock',
0249 type=int,
0250 default=-1)
0251
0252 parser.add_argument('-n','--showMatrix',
0253 help='Only show the worflows. Use --ext to show more',
0254 dest='show',
0255 default=False,
0256 action='store_true')
0257
0258 parser.add_argument('-c','--checkInputs',
0259 help='Check if the default inputs are well defined. To be used with --show',
0260 dest='checkInputs',
0261 default=False,
0262 action='store_true')
0263
0264 parser.add_argument('-e','--extended',
0265 help='Show details of workflows, used with --show',
0266 dest='extended',
0267 default=False,
0268 action='store_true')
0269
0270 parser.add_argument('-s','--selected',
0271 help='Run a pre-defined selected matrix of wf. Deprecated, please use -l limited',
0272 dest='restricted',
0273 default=False,
0274 action='store_true')
0275
0276 parser.add_argument('-l','--list',
0277 help='Comma separated list of workflow to be shown or ran. Possible keys are also '+str(predefinedSet.keys())+'. and wild card like muon, or mc',
0278 dest='testList',
0279 default=None)
0280
0281 parser.add_argument('-f','--failed-from',
0282 help='Provide a matrix report to specify the workflows to be run again. Augments the -l option if specified already',
0283 dest='failed_from',
0284 default=None)
0285
0286 parser.add_argument('-r','--raw',
0287 help='Temporary dump the .txt needed for prodAgent interface. To be discontinued soon. Argument must be the name of the set (standard, pileup,...)',
0288 dest='raw')
0289
0290 parser.add_argument('-i','--useInput',
0291 help='Use recyling where available. Either all, or a comma separated list of wf number.',
0292 dest='useInput',
0293 type=lambda x: x.split(','),
0294 default=None)
0295
0296 parser.add_argument('-w','--what',
0297 help='Specify the set to be used. Argument must be the name of a set (standard, pileup,...) or multiple sets separated by commas (--what standard,pileup )',
0298 dest='what',
0299 default='all')
0300
0301 parser.add_argument('--step1',
0302 help='Used with --raw. Limit the production to step1',
0303 dest='step1Only',
0304 default=False)
0305
0306 parser.add_argument('--maxSteps',
0307 help='Only run maximum on maxSteps. Used when we are only interested in first n steps.',
0308 dest='maxSteps',
0309 default=9999,
0310 type=int)
0311
0312 parser.add_argument('--fromScratch',
0313 help='Comma separated list of wf to be run without recycling. all is not supported as default.',
0314 dest='fromScratch',
0315 type=lambda x: x.split(','),
0316 default=None)
0317
0318 parser.add_argument('--refRelease',
0319 help='Allow to modify the recycling dataset version',
0320 dest='refRel',
0321 default=None)
0322
0323 parser.add_argument('--wmcontrol',
0324 help='Create the workflows for injection to WMAgent. In the WORKING. -wmcontrol init will create the the workflows, -wmcontrol test will dryRun a test, -wmcontrol submit will submit to wmagent',
0325 choices=['init','test','submit','force'],
0326 dest='wmcontrol',
0327 default=None)
0328
0329 parser.add_argument('--revertDqmio',
0330 help='When submitting workflows to wmcontrol, force DQM outout to use pool and not DQMIO',
0331 choices=['yes','no'],
0332 dest='revertDqmio',
0333 default='no')
0334
0335 parser.add_argument('--optionswm',
0336 help='Specify a few things for wm injection',
0337 default='',
0338 dest='wmoptions')
0339
0340 parser.add_argument('--keep',
0341 help='allow to specify for which comma separated steps the output is needed',
0342 default=None)
0343
0344 parser.add_argument('--label',
0345 help='allow to give a special label to the output dataset name',
0346 default='')
0347
0348 parser.add_argument('--command',
0349 help='provide a way to add additional command to all of the cmsDriver commands in the matrix',
0350 dest='command',
0351 action='append',
0352 default=None)
0353
0354 parser.add_argument('--apply',
0355 help='allow to use the --command only for 1 comma separeated',
0356 dest='apply',
0357 default=None)
0358
0359 parser.add_argument('--workflow',
0360 help='define a workflow to be created or altered from the matrix',
0361 action='append',
0362 dest='workflow',
0363 default=None)
0364
0365 parser.add_argument('--dryRun',
0366 help='do not run the wf at all',
0367 action='store_true',
0368 dest='dryRun',
0369 default=False)
0370
0371 parser.add_argument('--testbed',
0372 help='workflow injection to cmswebtest (you need dedicated rqmgr account)',
0373 dest='testbed',
0374 default=False,
0375 action='store_true')
0376
0377 parser.add_argument('--noCafVeto',
0378 help='Run from any source, ignoring the CAF label',
0379 dest='cafVeto',
0380 default=True,
0381 action='store_false')
0382
0383 parser.add_argument('--overWrite',
0384 help='Change the content of a step for another. List of pairs.',
0385 dest='overWrite',
0386 default=None)
0387
0388 parser.add_argument('--noRun',
0389 help='Remove all run list selection from wfs',
0390 dest='noRun',
0391 default=False,
0392 action='store_true')
0393
0394 parser.add_argument('--das-options',
0395 help='Options to be passed to dasgoclient.',
0396 dest='dasOptions',
0397 default="--limit 0",
0398 action='store')
0399
0400 parser.add_argument('--job-reports',
0401 help='Dump framework job reports',
0402 dest='jobReports',
0403 default=False,
0404 action='store_true')
0405
0406 parser.add_argument('--ibeos',
0407 help='Use IB EOS site configuration',
0408 dest='IBEos',
0409 default=False,
0410 action='store_true')
0411
0412 parser.add_argument('--sites',
0413 help='Run DAS query to get data from a specific site. Set it to empty string to search all sites.',
0414 dest='dasSites',
0415 default='T2_CH_CERN',
0416 action='store')
0417
0418 parser.add_argument('--interactive',
0419 help="Open the Matrix interactive shell",
0420 action='store_true',
0421 default=False)
0422
0423 parser.add_argument('--dbs-url',
0424 help='Overwrite DbsUrl value in JSON submitted to ReqMgr2',
0425 dest='dbsUrl',
0426 default=None,
0427 action='store')
0428
0429 gpugroup = parser.add_argument_group('GPU-related options','These options are only meaningful when --gpu is used, and is not set to forbidden.')
0430
0431 gpugroup.add_argument('--gpu','--requires-gpu',
0432 help='Enable GPU workflows. Possible options are "forbidden" (default), "required" (implied if no argument is given), or "optional".',
0433 dest='gpu',
0434 choices=['forbidden', 'optional', 'required'],
0435 nargs='?',
0436 const='required',
0437 default='forbidden',
0438 action='store')
0439
0440 gpugroup.add_argument('--gpu-memory',
0441 help='Specify the minimum amount of GPU memory required by the job, in MB.',
0442 dest='GPUMemoryMB',
0443 type=int,
0444 default=8000)
0445
0446 gpugroup.add_argument('--cuda-capabilities',
0447 help='Specify a comma-separated list of CUDA "compute capabilities", or GPU hardware architectures, that the job can use.',
0448 dest='CUDACapabilities',
0449 type=lambda x: x.split(','),
0450 default='6.0,6.1,6.2,7.0,7.2,7.5,8.0,8.6,8.7,8.9,9.0,12.0')
0451
0452
0453 cudart_version = None
0454 libcudart = os.path.realpath(os.path.expandvars('$CMSSW_RELEASE_BASE/external/$SCRAM_ARCH/lib/libcudart.so'))
0455 if os.path.isfile(libcudart):
0456 cudart_basename = os.path.basename(libcudart)
0457 cudart_version = '.'.join(cudart_basename.split('.')[2:4])
0458 gpugroup.add_argument('--cuda-runtime',
0459 help='Specify major and minor version of the CUDA runtime used to build the application.',
0460 dest='CUDARuntime',
0461 default=cudart_version)
0462
0463 gpugroup.add_argument('--force-gpu-name',
0464 help='Request a specific GPU model, e.g. "Tesla T4" or "NVIDIA GeForce RTX 2080". The default behaviour is to accept any supported GPU.',
0465 dest='GPUName',
0466 default='')
0467
0468 gpugroup.add_argument('--force-cuda-driver-version',
0469 help='Request a specific CUDA driver version, e.g. 470.57.02. The default behaviour is to accept any supported CUDA driver version.',
0470 dest='CUDADriverVersion',
0471 default='')
0472
0473 gpugroup.add_argument('--force-cuda-runtime-version',
0474 help='Request a specific CUDA runtime version, e.g. 11.4. The default behaviour is to accept any supported CUDA runtime version.',
0475 dest='CUDARuntimeVersion',
0476 default='')
0477
0478 opt = parser.parse_args()
0479 opt.selected_gpus = None
0480
0481 if not opt.wmcontrol and opt.gpu != 'forbidden':
0482
0483 print(">> Running with --gpu option. Checking the available and supported GPUs.")
0484 gpus = cleanComputeCapabilities("cuda")
0485 gpus = gpus + cleanComputeCapabilities("rocm", len(gpus))
0486 available_gpus = gpus
0487
0488 if len(available_gpus) == 0:
0489 if opt.gpu == 'required':
0490 raise Exception('Launched with --gpu required and no GPU available!')
0491 print(">> No GPU available!")
0492 else:
0493 print(">> GPUs available:")
0494 [print(f) for f in available_gpus]
0495
0496
0497 gpus = [g for g in gpus if not g.isCUDA() or (g.isCUDA() and g.capability in opt.CUDACapabilities)]
0498
0499
0500 if len(opt.GPUName) > 0:
0501 gpus = [g for g in gpus if g.name == opt.GPUName]
0502
0503 if available_gpus != gpus:
0504 if len(gpus) > 0:
0505 print(">> GPUs selected:")
0506 [print(f) for f in gpus]
0507 else:
0508 if opt.gpu == 'required':
0509 raise Exception('Launched with --gpu required and no GPU selected (among those available)!')
0510 print(">> No GPU selected!")
0511 else:
0512 print(">> All selected!")
0513
0514 if len(gpus) > 0:
0515 opt.selected_gpus = cycle(gpus)
0516 else:
0517 error_str = 'No GPU selected'
0518 if opt.gpu == 'required':
0519 raise Exception('Launched with --gpu required and no GPU available (among those available)!')
0520
0521 if opt.command: opt.command = ' '.join(opt.command)
0522 os.environ["CMSSW_DAS_QUERY_SITES"]=opt.dasSites
0523 if opt.failed_from:
0524 rerunthese=[]
0525 with open(opt.failed_from,'r') as report:
0526 for report_line in report:
0527 if 'FAILED' in report_line:
0528 to_run,_=report_line.split('_',1)
0529 rerunthese.append(to_run)
0530 if opt.testList:
0531 opt.testList+=','.join(['']+rerunthese)
0532 else:
0533 opt.testList = ','.join(rerunthese)
0534
0535 if opt.IBEos:
0536 os.environ["CMSSW_USE_IBEOS"]="true"
0537 if opt.restricted:
0538 print('Deprecated, please use -l limited')
0539 if opt.testList: opt.testList+=',limited'
0540 else: opt.testList='limited'
0541
0542 def stepOrIndex(s):
0543 if s.isdigit():
0544 return int(s)
0545 else:
0546 return s
0547 if opt.apply:
0548 opt.apply=map(stepOrIndex,opt.apply.split(','))
0549 if opt.keep:
0550 opt.keep=map(stepOrIndex,opt.keep.split(','))
0551
0552 if opt.testList:
0553 testList=[]
0554 for entry in opt.testList.split(','):
0555 if not entry: continue
0556 mapped=False
0557 for k in predefinedSet:
0558 if k.lower().startswith(entry.lower()) or k.lower().endswith(entry.lower()):
0559 testList.extend(predefinedSet[k])
0560 mapped=True
0561 break
0562 if not mapped:
0563 try:
0564 testList.append(float(entry))
0565 except:
0566 print(entry,'is not a possible selected entry')
0567
0568 opt.testList = list(testList)
0569
0570 if opt.wmcontrol:
0571 performInjectionOptionTest(opt)
0572 if opt.overWrite:
0573 opt.overWrite=eval(opt.overWrite)
0574 if opt.interactive:
0575 import cmd
0576 from colorama import Fore, Style
0577 from os import isatty
0578 import subprocess
0579 import time
0580
0581 class TheMatrix(cmd.Cmd):
0582 intro = "Welcome to the Matrix (? for help)"
0583 prompt = "matrix> "
0584
0585 def __init__(self, opt):
0586 cmd.Cmd.__init__(self)
0587 self.opt_ = opt
0588 self.matrices_ = {}
0589 tmp = MatrixReader(self.opt_)
0590 self.processes_ = dict()
0591 for what in tmp.files:
0592 what = what.replace("relval_", "")
0593 self.opt_.what = what
0594 self.matrices_[what] = MatrixReader(self.opt_)
0595 self.matrices_[what].prepare(
0596 self.opt_.useInput, self.opt_.refRel, self.opt_.fromScratch
0597 )
0598 os.system("clear")
0599
0600 def do_clear(self, arg):
0601 """Clear the screen, put prompt at the top"""
0602 os.system("clear")
0603
0604 def do_exit(self, arg):
0605 print("Leaving the Matrix")
0606 return True
0607
0608 def default(self, inp):
0609 if inp == "x" or inp == "q":
0610 return self.do_exit(inp)
0611 else:
0612 is_pipe = not isatty(sys.stdin.fileno())
0613 print(Fore.RED + "Error: " + Fore.RESET + "unrecognized command.")
0614
0615 if is_pipe:
0616 sys.exit(1)
0617
0618 def help_predefined(self):
0619 print(
0620 "\n".join(
0621 [
0622 "predefined [predef1 [...]]\n",
0623 "Run w/o argument, it will print the list of known predefined workflows.",
0624 "Run with space-separated predefined workflows, it will print the workflow-ids registered to them",
0625 ]
0626 )
0627 )
0628
0629 def complete_predefined(self, text, line, start_idx, end_idx):
0630 if text and len(text) > 0:
0631 return [t for t in predefinedSet.keys() if t.startswith(text)]
0632 else:
0633 return predefinedSet.keys()
0634
0635 def do_predefined(self, arg):
0636 """Print the list of predefined workflows"""
0637 print("List of predefined workflows")
0638 if arg:
0639 for w in arg.split():
0640 if w in predefinedSet.keys():
0641 print("Predefined Set: %s" % w)
0642 print(predefinedSet[w])
0643 else:
0644 print("Unknown Set: %s" % w)
0645 else:
0646 print(
0647 "[ "
0648 + Fore.RED
0649 + ", ".join([str(k) for k in predefinedSet.keys()])
0650 + Fore.RESET
0651 + " ]"
0652 )
0653
0654 def help_showWorkflow(self):
0655 print(
0656 "\n".join(
0657 [
0658 "showWorkflow [workflow1 [...]]\n",
0659 "Run w/o arguments, it will print the list of registered macro-workflows.",
0660 "Run with space-separated workflows, it will print the full list of workflow-ids registered to them",
0661 ]
0662 )
0663 )
0664
0665 def complete_showWorkflow(self, text, line, start_idx, end_idx):
0666 if text and len(text) > 0:
0667 return [t for t in self.matrices_.keys() if t.startswith(text)]
0668 else:
0669 return self.matrices_.keys()
0670
0671 def do_showWorkflow(self, arg):
0672 if arg == "":
0673 print("Available workflows:")
0674 for k in self.matrices_.keys():
0675 print(Fore.RED + Style.BRIGHT + k)
0676 print(Style.RESET_ALL)
0677 else:
0678 selected = arg.split()
0679 for k in selected:
0680 if k not in self.matrices_.keys():
0681 print("Unknown workflow %s: skipping" % k)
0682 else:
0683 for wfl in self.matrices_[k].workFlows:
0684 print(
0685 "%s %s"
0686 % (
0687 Fore.BLUE + str(wfl.numId) + Fore.RESET,
0688 Fore.GREEN + wfl.nameId + Fore.RESET,
0689 )
0690 )
0691 print(
0692 "%s contains %d workflows"
0693 % (
0694 Fore.RED + k + Fore.RESET,
0695 len(self.matrices_[k].workFlows),
0696 )
0697 )
0698
0699 def do_runWorkflow(self, arg):
0700
0701 args = arg.split()
0702 if len(args) < 2:
0703 print(Fore.RED + Style.BRIGHT + "Wrong number of parameters passed")
0704 print(Style.RESET_ALL)
0705 return
0706 workflow_class = args[0]
0707 workflow_id = args[1]
0708 passed_down_args = list()
0709 if len(args) > 2:
0710 passed_down_args = args[2:]
0711 print(
0712 Fore.YELLOW + Style.BRIGHT + "Running with the following options:\n"
0713 )
0714 print(
0715 Fore.GREEN
0716 + Style.BRIGHT
0717 + "Workflow class: {}".format(workflow_class)
0718 )
0719 print(
0720 Fore.GREEN + Style.BRIGHT + "Workflow ID: {}".format(workflow_id)
0721 )
0722 print(
0723 Fore.GREEN
0724 + Style.BRIGHT
0725 + "Additional runTheMatrix options: {}".format(passed_down_args)
0726 )
0727 print(Style.RESET_ALL)
0728 if workflow_class not in self.matrices_.keys():
0729 print(
0730 Fore.RED
0731 + Style.BRIGHT
0732 + "Unknown workflow selected: {}".format(workflow_class)
0733 )
0734 print("Available workflows:")
0735 for k in self.matrices_.keys():
0736 print(Fore.RED + Style.BRIGHT + k)
0737 print(Style.RESET_ALL)
0738 return
0739 wflnums = [x.numId for x in self.matrices_[workflow_class].workFlows]
0740 if float(workflow_id) not in wflnums:
0741 print(
0742 Fore.RED
0743 + Style.BRIGHT
0744 + "Unknown workflow {}".format(workflow_id)
0745 )
0746 print(Fore.GREEN + Style.BRIGHT)
0747 print(wflnums)
0748 print(Style.RESET_ALL)
0749 return
0750 if workflow_id in self.processes_.keys():
0751
0752 if self.processes_[workflow_id][0].poll() is None:
0753 print(
0754 Fore.RED
0755 + Style.BRIGHT
0756 + "Workflow {} already running!".format(workflow_id)
0757 )
0758 print(Style.RESET_ALL)
0759 return
0760
0761
0762 lognames = ["stdout", "stderr"]
0763 logfiles = tuple(
0764 "%s_%s_%s.log" % (workflow_class, workflow_id, name)
0765 for name in lognames
0766 )
0767 stdout = open(logfiles[0], "w")
0768 stderr = open(logfiles[1], "w")
0769 command = ("runTheMatrix.py", "-w", workflow_class, "-l", workflow_id)
0770 if len(passed_down_args) > 0:
0771 command += tuple(passed_down_args)
0772 print(command)
0773 p = subprocess.Popen(command, stdout=stdout, stderr=stderr)
0774 self.processes_[workflow_id] = (p, time.time())
0775
0776 def complete_runWorkflow(self, text, line, start_idx, end_idx):
0777 if text and len(text) > 0:
0778 return [t for t in self.matrices_.keys() if t.startswith(text)]
0779 else:
0780 return self.matrices_.keys()
0781
0782 def help_runWorkflow(self):
0783 print(
0784 "\n".join(
0785 [
0786 "runWorkflow workflow_class workflow_id\n",
0787 "This command will launch a new and independent process that invokes",
0788 "the command:\n",
0789 "runTheMatrix.py -w workflow_class -l workflow_id [runTheMatrix.py options]",
0790 "\nYou can specify just one workflow_class and workflow_id per invocation.",
0791 "The job will continue even after quitting the interactive session.",
0792 "stdout and stderr of the new process will be automatically",
0793 "redirected to 2 logfiles whose names contain the workflow_class",
0794 "and workflow_id. Mutiple command can be issued one after the other.",
0795 "The working directory of the new process will be the directory",
0796 "from which the interactive session has started.",
0797 "Autocompletion is available for workflow_class, but",
0798 "not for workflow_id. Supplying a wrong workflow_class or",
0799 "a non-existing workflow_id for a valid workflow_class",
0800 "will trigger an error and no process will be invoked.",
0801 "The interactive shell will keep track of all active processes",
0802 "and will prevent the accidental resubmission of an already",
0803 "active jobs.",
0804 ]
0805 )
0806 )
0807
0808 def do_jobs(self, args):
0809 print(Fore.GREEN + Style.BRIGHT + "List of jobs:")
0810 for w in self.processes_.keys():
0811 if self.processes_[w][0].poll() is None:
0812 print(
0813 Fore.YELLOW
0814 + Style.BRIGHT
0815 + "Active job: {} since {:.2f} seconds.".format(
0816 w, time.time() - self.processes_[w][1]
0817 )
0818 )
0819 else:
0820 print(Fore.RED + Style.BRIGHT + "Done job: {}".format(w))
0821 print(Style.RESET_ALL)
0822
0823 def help_jobs(self):
0824 print(
0825 "\n".join(
0826 [
0827 "Print a full list of active and done jobs submitted",
0828 "in the ongoing interactive session",
0829 ]
0830 )
0831 )
0832
0833 def do_searchInCommands(self, arg):
0834 args = arg.split()
0835 if len(args) < 3:
0836 print("searchInCommands name regexp regexp")
0837 return
0838 if args[0] not in self.matrices_.keys():
0839 print("Unknown workflow")
0840 return
0841 import re
0842
0843 pattern_dataset = None
0844 pattern_command = None
0845 try:
0846 pattern_dataset = re.compile(args[1])
0847 pattern_command = re.compile(args[2])
0848 except:
0849 print("Failed to compile regexp %s" % args[1])
0850 return
0851 counter = 0
0852 cached = []
0853 cached_steps = {}
0854 for wfl in self.matrices_[args[0]].workFlows:
0855 if re.match(pattern_dataset, wfl.nameId):
0856 for step, command in enumerate(wfl.cmds):
0857 if re.match(pattern_command, command):
0858 if wfl.numId not in cached:
0859 cached.append(wfl.numId)
0860 cached_steps[wfl.nameId] = {
0861 "steps": [],
0862 "numId": wfl.numId,
0863 }
0864 cached_steps[wfl.nameId]["steps"].append(step)
0865 else:
0866 cached_steps[wfl.nameId]["steps"].append(step)
0867 counter += 1
0868 for wfl in cached_steps:
0869 print(
0870 "%s %s [%s]"
0871 % (
0872 Fore.BLUE + str(cached_steps[wfl]["numId"]) + Fore.RESET,
0873 Fore.GREEN + wfl + Fore.RESET,
0874 Fore.YELLOW
0875 + " ".join(
0876 [str(command) for command in cached_steps[wfl]["steps"]]
0877 )
0878 + Fore.RESET,
0879 )
0880 )
0881 print(
0882 "Found %s compatible commands inside %s workflows inside %s."
0883 % (
0884 Fore.RED + str(counter) + Fore.RESET,
0885 Fore.BLUE + str(len(cached_steps.keys())),
0886 Fore.YELLOW + str(args[0]),
0887 )
0888 + Fore.RESET
0889 )
0890
0891 def help_searchInCommands(self):
0892 print(
0893 "\n".join(
0894 [
0895 "searchInCommands wfl_name dataset_search_regexp command_search_regexp\n",
0896 "This command will search for a match within all workflows registered to wfl_name.",
0897 "The search is done on both the workflow name, via the dataset_search_regexp, and the actual cmsDriver steps registered to it, via command_search_regexp.",
0898 ]
0899 )
0900 )
0901
0902 def complete_searchInCommands(self, text, line, start_idx, end_idx):
0903 if text and len(text) > 0:
0904 return [t for t in self.matrices_.keys() if t.startswith(text)]
0905 else:
0906 return self.matrices_.keys()
0907
0908 def help_searchInWorkflow(self):
0909 print(
0910 "\n".join(
0911 [
0912 "searchInWorkflow wfl_name search_regexp\n",
0913 "This command will search for a match within all workflows registered to wfl_name.",
0914 "The search is done on both the workflow name and the names of steps registered to it.",
0915 ]
0916 )
0917 )
0918
0919 def complete_searchInWorkflow(self, text, line, start_idx, end_idx):
0920 if text and len(text) > 0:
0921 return [t for t in self.matrices_.keys() if t.startswith(text)]
0922 else:
0923 return self.matrices_.keys()
0924
0925 def do_searchInWorkflow(self, arg):
0926 args = arg.split()
0927 if len(args) < 2:
0928 print("searchInWorkflow name regexp")
0929 return
0930 if args[0] not in self.matrices_.keys():
0931 print("Unknown workflow")
0932 return
0933 import re
0934
0935 pattern = None
0936 try:
0937 pattern = re.compile(args[1])
0938 except:
0939 print("Failed to compile regexp %s" % args[1])
0940 return
0941 counter = 0
0942 for wfl in self.matrices_[args[0]].workFlows:
0943 if re.match(pattern, wfl.nameId):
0944 print(
0945 "%s %s"
0946 % (
0947 Fore.BLUE + str(wfl.numId) + Fore.RESET,
0948 Fore.GREEN + wfl.nameId + Fore.RESET,
0949 )
0950 )
0951 counter += 1
0952 print(
0953 "Found %s compatible workflows inside %s"
0954 % (Fore.RED + str(counter) + Fore.RESET, Fore.YELLOW + str(args[0]))
0955 + Fore.RESET
0956 )
0957
0958 def help_search(self):
0959 print(
0960 "\n".join(
0961 [
0962 "search search_regexp\n",
0963 "This command will search for a match within all workflows registered.",
0964 "The search is done on both the workflow name and the names of steps registered to it.",
0965 ]
0966 )
0967 )
0968
0969 def do_search(self, arg):
0970 args = arg.split()
0971 if len(args) < 1:
0972 print("search regexp")
0973 return
0974 for wfl in self.matrices_.keys():
0975 self.do_searchInWorkflow(" ".join([wfl, args[0]]))
0976
0977 def help_dumpWorkflowId(self):
0978 print(
0979 "\n".join(
0980 [
0981 "dumpWorkflowId [wfl-id1 [...]]\n",
0982 "Dumps the details (cmsDriver commands for all steps) of the space-separated workflow-ids in input.",
0983 ]
0984 )
0985 )
0986
0987 def do_dumpWorkflowId(self, arg):
0988 wflids = arg.split()
0989 if len(wflids) == 0:
0990 print("dumpWorkflowId [wfl-id1 [...]]")
0991 return
0992
0993 fmt = "[%s]: %s\n"
0994 maxLen = 100
0995 for wflid in wflids:
0996 dump = True
0997 for key, mrd in self.matrices_.items():
0998 for wfl in mrd.workFlows:
0999 if wfl.numId == float(wflid):
1000 if dump:
1001 dump = False
1002 print(
1003 Fore.GREEN
1004 + str(wfl.numId)
1005 + Fore.RESET
1006 + " "
1007 + Fore.YELLOW
1008 + wfl.nameId
1009 + Fore.RESET
1010 )
1011 for i, s in enumerate(wfl.cmds):
1012 print(
1013 fmt
1014 % (
1015 Fore.RED + str(i + 1) + Fore.RESET,
1016 (str(s) + " "),
1017 )
1018 )
1019 print("\nWorkflow found in %s." % key)
1020 else:
1021 print("Workflow also found in %s." % key)
1022
1023 do_EOF = do_exit
1024
1025 TheMatrix(opt).cmdloop()
1026 sys.exit(0)
1027
1028 if opt.raw and opt.show:
1029 ret = showRaw(opt)
1030 else:
1031 ret = runSelected(opt)
1032
1033
1034 sys.exit(ret)