File indexing completed on 2025-05-08 02:18:42
0001
0002
0003 import os
0004 from argparse import ArgumentParser
0005 from fnmatch import fnmatch
0006 import yaml
0007
0008 import re
0009 import datetime
0010
0011 from schema import Schema, And, Or, Optional, SchemaError
0012
0013 import CRABClient
0014
0015 from CRABAPI.RawCommand import crabCommand
0016
0017 from CRABClient.ClientExceptions import ClientException
0018 from http.client import HTTPException
0019
0020 from CRABClient.UserUtilities import config
0021 from multiprocessing import Process
0022
0023
0024 production_tag = datetime.date.today().strftime('%Y%b%d')
0025
0026
0027 def parse_args():
0028 parser = ArgumentParser(description="A multicrab submission script")
0029 parser.add_argument('-y', '--yaml', default = 'test_samples.yml', help = 'File with dataset descriptions')
0030 parser.add_argument('-c', '--cmd', default='submit', choices = ['submit', 'status'], help= 'Crab command')
0031 parser.add_argument('-f', '--filter', default='*', help = 'filter samples, POSIX regular expressions allowed')
0032 parser.add_argument('-w', '--workarea', default='BPHNANO_%s' % production_tag, help = 'Crab working area name')
0033 parser.add_argument('-o', '--outputdir', default= '/store/group/cmst3/group/bpark/gmelachr/', help='LFN Output high-level directory: the LFN will be saved in outputdir+workarea ')
0034 parser.add_argument('-s', '--site', default='T2_CH_CERN', help='T2 or T3 cite where user has access. To be checked with crab checkout')
0035 parser.add_argument('-t', '--tag', default=production_tag, help='Production Tag extra')
0036 parser.add_argument('-p', '--psetcfg', default="../test/run_bphNano_cfg.py", help='Plugin configuration file')
0037 parser.add_argument('-e', '--extra', nargs='*', default=list(), help='Optional extra input files')
0038 parser.add_argument('-tt', '--test', action='store_true', help='Flag a test job')
0039 return parser.parse_args()
0040
0041 def submit(config):
0042 try:
0043 crabCommand('-dev submit', config = config)
0044 except HTTPException as hte:
0045 print("Failed submitting task: %s" % (hte.headers))
0046 except ClientException as cle:
0047 print("Failed submitting task: %s" % (cle))
0048
0049 def status(directory):
0050 try:
0051 crabCommand('status', dir=directory)
0052 except HTTPException as hte:
0053 print("Failed submitting task: %s" % (hte.headers))
0054 except ClientException as cle:
0055 print("Failed submitting task: %s" % (cle))
0056
0057
0058 expected_schema = Schema({
0059 "common": {
0060 "data": {
0061 "lumimask": And(str, error="lumimask should be a string"),
0062 "splitting": And(int, error="splitting should be an integer"),
0063 "globaltag": And(str, error="globaltag should be a string"),
0064 },
0065 "mc": {
0066 "splitting": And(int, error="splitting should be an integer"),
0067 "globaltag": And(str, error="globaltag should be a string"),
0068 },
0069 },
0070 "samples": And(dict, error="samples should be a dict with keys dataset (str), isMC (bool). Optional keys: globaltag (str), parts (list(int))")
0071 }
0072 )
0073
0074 samples_schema = Schema({
0075 "dataset": And(str, error="dataset should be a string"),
0076 "isMC": And(bool, error="isMC should be a boolean"),
0077 Optional("decay") : And(str, error="decay to reconstruct"),
0078 Optional("goldenjson") : And(str, error="golden json file path should be a string"),
0079 Optional("globaltag") : And(str, error="globaltag should be a string"),
0080 Optional("parts"): [And(int, error="parts should be a list of integers")]
0081 })
0082
0083
0084 def validate_yaml(data):
0085 try:
0086 expected_schema.validate(data)
0087 for name, content in data["samples"].items():
0088 samples_schema.validate(content)
0089 print("YAML structure is valid.")
0090 except SchemaError as e:
0091 print("YAML structure is invalid:", e)
0092 import sys
0093 sys.exit(1)
0094
0095
0096 if __name__ == '__main__':
0097
0098 args = parse_args()
0099 with open(args.yaml, "r") as f:
0100 samples = yaml.safe_load(f)
0101 validate_yaml(samples)
0102
0103 if args.cmd == "submit":
0104 print("")
0105 print(f"Submit Crab jobs for {args.yaml} with filter {args.filter} applied")
0106
0107 common_config = samples['common'] if 'common' in samples else {'data' : {}, 'mc' : {}}
0108
0109 for sample, sample_info in samples['samples'].items():
0110
0111
0112 config_ = config()
0113
0114 config_.General.transferOutputs = True
0115 config_.General.transferLogs = True
0116 config_.General.workArea = args.workarea
0117
0118 config_.Data.publication = False
0119 config_.Data.outLFNDirBase = args.outputdir + '/'+ config_.General.workArea
0120 config_.Data.inputDBS = 'global'
0121
0122 config_.JobType.pluginName = 'Analysis'
0123 config_.JobType.psetName = args.psetcfg
0124 config_.JobType.maxJobRuntimeMin = 2700
0125 config_.JobType.allowUndistributedCMSSW = True
0126 config_.JobType.inputFiles = args.extra
0127
0128 config_.Site.storageSite = args.site
0129
0130
0131 parts = sample_info['parts'] if 'parts' in sample_info else [None]
0132 for part in parts:
0133 name = sample % part if part is not None else sample
0134
0135
0136 if not fnmatch(name, args.filter): continue
0137 print(name)
0138 config_.Data.outLFNDirBase = args.outputdir + config_.General.workArea
0139 config_.General.workArea = args.workarea + "_" + name
0140
0141 config_.Data.inputDataset = sample_info['dataset'] % part \
0142 if part is not None else \
0143 sample_info['dataset']
0144
0145 data_type = 'mc' if sample_info['isMC'] else 'data'
0146
0147 if sample_info['isMC']: config_.Data.splitting = 'FileBased'
0148 else: config_.Data.splitting = 'LumiBased'
0149
0150 if sample_info['isMC']:
0151 config_.Data.lumiMask = ''
0152 else:
0153 config_.Data.lumiMask = sample_info.get('lumimask', None)
0154
0155 config_.Data.unitsPerJob = common_config[data_type].get('splitting', None)
0156
0157 globaltag = sample_info.get('globaltag', "auto:run3_data")
0158 if globaltag == "auto:run3_data":
0159 globaltag = common_config[data_type].get('globaltag', "auto:run3_data")
0160
0161 decay = sample_info.get('decay', 'all')
0162
0163 maxevents = -1
0164
0165 config_.JobType.pyCfgParams = [
0166 'isMC=%s' % sample_info['isMC'], 'reportEvery=1000',
0167 'tag=%s' % production_tag,
0168 'globalTag=%s' % globaltag,
0169 'decay=%s' % decay,
0170 'maxEvents=%s' % maxevents,
0171 ]
0172
0173 if args.test:
0174 config_.Data.totalUnits = 10
0175
0176 config_.General.requestName = name + "_" + production_tag
0177 config_.JobType.outputFiles = ['_'.join(['bph_nano', production_tag, 'mc' if sample_info['isMC'] else 'data', decay])+'.root']
0178
0179
0180 print(f"Submit Crab job for {name}")
0181 print(config_)
0182 submit(config_)
0183 elif args.cmd == "status":
0184 print(f"Getting crab status for {args.dir}")
0185 status(args.dir)
0186 else:
0187 print(f"Invalid Crab command : {args.cmd}")
0188
0189