Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2025-05-08 02:18:42

0001 #!/usr/bin/env python
0002 
0003 import os
0004 from argparse import ArgumentParser
0005 from fnmatch import fnmatch
0006 import yaml
0007 
0008 import re
0009 import datetime
0010 
0011 from schema import Schema, And, Or, Optional, SchemaError
0012 
0013 import CRABClient
0014 
0015 from CRABAPI.RawCommand import crabCommand
0016 
0017 from CRABClient.ClientExceptions import ClientException
0018 from http.client import HTTPException 
0019 
0020 from CRABClient.UserUtilities import config
0021 from multiprocessing import Process
0022 
0023 
0024 production_tag = datetime.date.today().strftime('%Y%b%d')
0025 
0026 
0027 def parse_args():
0028     parser = ArgumentParser(description="A multicrab submission script")
0029     parser.add_argument('-y', '--yaml', default = 'test_samples.yml', help = 'File with dataset descriptions')
0030     parser.add_argument('-c', '--cmd', default='submit', choices = ['submit', 'status'], help= 'Crab command')
0031     parser.add_argument('-f', '--filter', default='*', help = 'filter samples, POSIX regular expressions allowed') 
0032     parser.add_argument('-w', '--workarea', default='BPHNANO_%s' % production_tag, help = 'Crab working area name')
0033     parser.add_argument('-o', '--outputdir', default= '/store/group/cmst3/group/bpark/gmelachr/', help='LFN Output high-level directory: the LFN will be saved in outputdir+workarea ')
0034     parser.add_argument('-s', '--site', default='T2_CH_CERN', help='T2 or T3 cite where user has access. To be checked with crab checkout')
0035     parser.add_argument('-t', '--tag', default=production_tag, help='Production Tag extra')
0036     parser.add_argument('-p', '--psetcfg', default="../test/run_bphNano_cfg.py", help='Plugin configuration file')
0037     parser.add_argument('-e', '--extra', nargs='*', default=list(),  help='Optional extra input files')
0038     parser.add_argument('-tt', '--test', action='store_true', help='Flag a test job')
0039     return parser.parse_args()
0040     
0041 def submit(config):
0042     try:
0043         crabCommand('-dev submit', config = config)
0044     except HTTPException as hte:
0045         print("Failed submitting task: %s" % (hte.headers))
0046     except ClientException as cle:
0047         print("Failed submitting task: %s" % (cle))
0048 
0049 def status(directory):
0050     try:
0051         crabCommand('status', dir=directory)
0052     except HTTPException as hte:
0053         print("Failed submitting task: %s" % (hte.headers))
0054     except ClientException as cle:
0055         print("Failed submitting task: %s" % (cle))
0056 
0057 
0058 expected_schema = Schema({
0059     "common": {
0060         "data": {
0061             "lumimask": And(str, error="lumimask should be a string"),
0062             "splitting": And(int, error="splitting should be an integer"),
0063             "globaltag": And(str, error="globaltag should be a string"),
0064         },
0065         "mc": {
0066             "splitting": And(int, error="splitting should be an integer"),
0067             "globaltag": And(str, error="globaltag should be a string"),
0068         },
0069     },
0070     "samples": And(dict, error="samples should be a dict with keys dataset (str), isMC (bool). Optional keys: globaltag (str), parts (list(int))")
0071     }
0072     )
0073 
0074 samples_schema = Schema({
0075     "dataset": And(str, error="dataset should be a string"),
0076     "isMC": And(bool, error="isMC should be a boolean"),
0077     Optional("decay") : And(str, error="decay to reconstruct"), 
0078     Optional("goldenjson") : And(str, error="golden json file path should be a string"),
0079     Optional("globaltag") : And(str, error="globaltag should be a string"),
0080     Optional("parts"): [And(int, error="parts should be a list of integers")]
0081 })
0082 
0083 
0084 def validate_yaml(data):
0085     try:
0086        expected_schema.validate(data)
0087        for name, content in data["samples"].items():
0088            samples_schema.validate(content)
0089        print("YAML structure is valid.")
0090     except SchemaError as e:
0091        print("YAML structure is invalid:", e)
0092        import sys
0093        sys.exit(1)
0094   
0095 
0096 if __name__ == '__main__':
0097 
0098     args = parse_args()
0099     with open(args.yaml, "r") as f:
0100         samples = yaml.safe_load(f) # Parse YAML file
0101     validate_yaml(samples)
0102   
0103     if args.cmd == "submit":
0104         print("")
0105         print(f"Submit Crab jobs for {args.yaml} with filter {args.filter} applied")
0106         
0107         common_config = samples['common'] if 'common' in samples else {'data' : {}, 'mc' : {}}
0108         # loop over samples
0109         for sample, sample_info in samples['samples'].items():
0110             # Given we have repeated datasets check for different parts
0111     
0112             config_ = config()
0113     
0114             config_.General.transferOutputs = True
0115             config_.General.transferLogs = True
0116             config_.General.workArea = args.workarea
0117 
0118             config_.Data.publication = False
0119             config_.Data.outLFNDirBase = args.outputdir + '/'+ config_.General.workArea
0120             config_.Data.inputDBS = 'global'
0121 
0122             config_.JobType.pluginName = 'Analysis'
0123             config_.JobType.psetName = args.psetcfg
0124             config_.JobType.maxJobRuntimeMin = 2700 #can not use with Automatic 
0125             config_.JobType.allowUndistributedCMSSW = True
0126             config_.JobType.inputFiles = args.extra
0127 
0128             config_.Site.storageSite = args.site
0129 
0130         
0131             parts = sample_info['parts'] if 'parts' in sample_info else [None]
0132             for part in parts:
0133                 name = sample % part if part is not None else sample
0134             
0135                 # filter names according to what we need
0136                 if not fnmatch(name, args.filter): continue
0137                 print(name)
0138                 config_.Data.outLFNDirBase = args.outputdir + config_.General.workArea
0139                 config_.General.workArea = args.workarea + "_" + name
0140         
0141                 config_.Data.inputDataset = sample_info['dataset'] % part \
0142                                          if part is not None else \
0143                                                   sample_info['dataset']
0144                
0145                 data_type = 'mc' if sample_info['isMC'] else 'data'
0146 
0147                 if sample_info['isMC']: config_.Data.splitting = 'FileBased'
0148                 else: config_.Data.splitting = 'LumiBased'
0149 
0150                 if sample_info['isMC']:
0151                     config_.Data.lumiMask = ''                    
0152                 else:
0153                     config_.Data.lumiMask = sample_info.get('lumimask', None)
0154 
0155                 config_.Data.unitsPerJob = common_config[data_type].get('splitting', None)
0156 
0157                 globaltag = sample_info.get('globaltag', "auto:run3_data")
0158                 if globaltag == "auto:run3_data":
0159                     globaltag = common_config[data_type].get('globaltag', "auto:run3_data")
0160 
0161                 decay = sample_info.get('decay', 'all')
0162      
0163                 maxevents = -1
0164        
0165                 config_.JobType.pyCfgParams = [
0166                     'isMC=%s' % sample_info['isMC'], 'reportEvery=1000',
0167                     'tag=%s' % production_tag,
0168                     'globalTag=%s' % globaltag,
0169                     'decay=%s' % decay,
0170                     'maxEvents=%s' % maxevents,
0171                  ]
0172             
0173                 if args.test:
0174                    config_.Data.totalUnits = 10
0175 
0176                 config_.General.requestName = name + "_" + production_tag
0177                 config_.JobType.outputFiles = ['_'.join(['bph_nano', production_tag, 'mc' if sample_info['isMC'] else 'data', decay])+'.root']
0178  
0179 
0180                 print(f"Submit Crab job for {name}")
0181                 print(config_)   
0182                 submit(config_)
0183     elif args.cmd == "status":
0184         print(f"Getting crab status for {args.dir}")
0185         status(args.dir)
0186     else:
0187         print(f"Invalid Crab command : {args.cmd}")
0188     
0189