File indexing completed on 2023-03-17 10:38:47
0001
0002
0003 from __future__ import print_function
0004
0005 from builtins import range
0006 import os
0007 import re
0008 import sys
0009 import glob
0010 import json
0011 import math
0012 import bisect
0013 import random
0014 import signal
0015 if sys.version_info[0]>2:
0016 import _pickle as cPickle
0017 else:
0018 import cPickle
0019 import difflib
0020 import argparse
0021 import functools
0022 import itertools
0023 import subprocess
0024 import collections
0025 import multiprocessing
0026 import FWCore.PythonUtilities.LumiList as LumiList
0027 import Utilities.General.cmssw_das_client as cmssw_das_client
0028 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0029
0030
0031
0032 def main(argv = None):
0033 """
0034 Main routine. Not called, if this module is loaded via `import`.
0035
0036 Arguments:
0037 - `argv`: Command line arguments passed to the script.
0038 """
0039
0040 if argv == None:
0041 argv = sys.argv[1:]
0042
0043 file_list_creator = FileListCreator(argv)
0044 file_list_creator.create()
0045
0046
0047
0048 class FileListCreator(object):
0049 """Create file lists for alignment and validation for a given dataset.
0050 """
0051
0052 def __init__(self, argv):
0053 """Constructor taking the command line arguments.
0054
0055 Arguments:
0056 - `args`: command line arguments
0057 """
0058
0059 self._first_dataset_ini = True
0060 self._parser = self._define_parser()
0061 self._args = self._parser.parse_args(argv)
0062
0063 if not mps_tools.check_proxy():
0064 print_msg(
0065 "Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0066 sys.exit(1)
0067
0068 self._dataset_regex = re.compile(r"^/([^/]+)/([^/]+)/([^/]+)$")
0069 self._validate_input()
0070
0071 if self._args.test_mode:
0072 import Configuration.PyReleaseValidation.relval_steps as rvs
0073 import Configuration.PyReleaseValidation.relval_production as rvp
0074 self._args.datasets = [rvs.steps[rvp.workflows[1000][1][0]]["INPUT"].dataSet]
0075 self._validate_input()
0076
0077 self._datasets = sorted([dataset
0078 for pattern in self._args.datasets
0079 for dataset in get_datasets(pattern)
0080 if re.search(self._args.dataset_filter, dataset)])
0081 if len(self._datasets) == 0:
0082 print_msg("Found no dataset matching the pattern(s):")
0083 for d in self._args.datasets: print_msg("\t"+d)
0084 sys.exit(1)
0085
0086 self._formatted_dataset = merge_strings(
0087 [re.sub(self._dataset_regex, r"\1_\2_\3", dataset)
0088 for dataset in self._datasets])
0089 self._output_dir = os.path.join(self._args.output_dir,
0090 self._formatted_dataset)
0091 self._output_dir = os.path.abspath(self._output_dir)
0092 self._cache = _DasCache(self._output_dir)
0093 self._prepare_iov_datastructures()
0094 self._prepare_run_datastructures()
0095
0096 try:
0097 os.makedirs(self._output_dir)
0098 except OSError as e:
0099 if e.args == (17, "File exists"):
0100 if self._args.force:
0101 pass
0102 elif self._args.use_cache:
0103 self._cache.load()
0104 else:
0105 print_msg("Directory '{}' already exists from previous runs"
0106 " of the script. Use '--use-cache' if you want to"
0107 " use the cached DAS-query results Or use "
0108 "'--force' to remove it."
0109 .format(self._output_dir))
0110 sys.exit(1)
0111 files = glob.glob(os.path.join(self._output_dir, "*"))
0112 for f in files: os.remove(f)
0113 else:
0114 raise
0115
0116
0117 def create(self):
0118 """Creates file list. To be called by user of the class."""
0119
0120 self._request_dataset_information()
0121 self._create_file_lists()
0122 self._print_eventcounts()
0123 self._write_file_lists()
0124
0125
0126 _event_count_log = "event_count_info.log"
0127
0128
0129 def _define_parser(self):
0130 """Definition of command line argument parser."""
0131
0132 parser = argparse.ArgumentParser(
0133 description = "Create file lists for alignment",
0134 epilog = ("The tool will create a directory containing all file "
0135 "lists and a log file with all relevant event counts "
0136 "('{}').".format(FileListCreator._event_count_log)))
0137 parser.add_argument("-i", "--input", dest = "datasets", required = True,
0138 metavar = "DATASET", action = "append",
0139 help = ("CMS dataset name; supports wildcards; "
0140 "use multiple times for multiple datasets"))
0141 parser.add_argument("--dataset-filter", default = "",
0142 help = "regex to match within in the datasets matched,"
0143 "in case the wildcard isn't flexible enough")
0144 parser.add_argument("-j", "--json", dest = "json", metavar = "PATH",
0145 help = "path to JSON file (optional)")
0146 parser.add_argument("-f", "--fraction", dest = "fraction",
0147 type = float, default = 1,
0148 help = "max. fraction of files used for alignment")
0149 parser.add_argument("--iov", dest = "iovs", metavar = "RUN", type = int,
0150 action = "append", default = [],
0151 help = ("define IOV by specifying first run; for "
0152 "multiple IOVs use this option multiple "
0153 "times; files from runs before the lowest "
0154 "IOV are discarded (default: 1)"))
0155 parser.add_argument("--miniiov", dest="miniiovs", metavar="RUN", type=int,
0156 action="append", default=[],
0157 help=("in addition to the standard IOVs, break up hippy jobs "
0158 "at these points, so that jobs from before and after "
0159 "these runs are not in the same job"))
0160 parser.add_argument("-r", "--random", action = "store_true",
0161 default = False, help = "select files randomly")
0162 parser.add_argument("-n", "--events-for-alignment", "--maxevents",
0163 dest = "events", type = int, metavar = "NUMBER",
0164 help = ("number of events needed for alignment; the"
0165 " remaining events in the dataset are used "
0166 "for validation; if n<=0, all events are "
0167 "used for validation"))
0168 parser.add_argument("--all-events", action = "store_true",
0169 help = "Use all events for alignment")
0170 parser.add_argument("--tracks-for-alignment", dest = "tracks",
0171 type = int, metavar = "NUMBER",
0172 help = "number of tracks needed for alignment")
0173 parser.add_argument("--track-rate", dest = "rate", type = float,
0174 metavar = "NUMBER",
0175 help = "number of tracks per event")
0176 parser.add_argument("--run-by-run", dest = "run_by_run",
0177 action = "store_true", default = False,
0178 help = "create validation file list for each run")
0179 parser.add_argument("--minimum-events-in-iov",
0180 dest = "minimum_events_in_iov", metavar = "NUMBER",
0181 type = int, default = 100000,
0182 help = ("minimum number of events for alignment per"
0183 " IOV; this option has a higher priority "
0184 "than '-f/--fraction' "
0185 "(default: %(default)s)"))
0186 parser.add_argument("--minimum-events-validation",
0187 dest = "minimum_events_validation",
0188 metavar = "NUMBER", type = int, default = 1,
0189 help = ("minimum number of events for validation; "
0190 "applies to IOVs; in case of --run-by-run "
0191 "it applies to runs runs "
0192 "(default: %(default)s)"))
0193 parser.add_argument("--use-cache", dest = "use_cache",
0194 action = "store_true", default = False,
0195 help = "use DAS-query results of previous run")
0196 parser.add_argument("-o", "--output-dir", dest = "output_dir",
0197 metavar = "PATH", default = os.getcwd(),
0198 help = "output base directory (default: %(default)s)")
0199 parser.add_argument("--create-ini", dest = "create_ini",
0200 action = "store_true", default = False,
0201 help = ("create dataset ini file based on the "
0202 "created file lists"))
0203 parser.add_argument("--force", action = "store_true", default = False,
0204 help = ("remove output directory from previous "
0205 "runs, if existing"))
0206 parser.add_argument("--hippy-events-per-job", type = int, default = 1,
0207 help = ("approximate number of events in each job for HipPy"))
0208 parser.add_argument("--test-mode", dest = "test_mode",
0209 action = "store_true", default = False,
0210 help = argparse.SUPPRESS)
0211 return parser
0212
0213
0214 def _validate_input(self):
0215 """Validate command line arguments."""
0216
0217 if self._args.events is None:
0218 if self._args.all_events:
0219 self._args.events = float("inf")
0220 print_msg("Using all tracks for alignment")
0221 elif (self._args.tracks is None) and (self._args.rate is None):
0222 msg = ("either -n/--events-for-alignment, --all-events, or both of "
0223 "--tracks-for-alignment and --track-rate are required")
0224 self._parser.error(msg)
0225 elif (((self._args.tracks is not None) and (self._args.rate is None)) or
0226 ((self._args.rate is not None)and (self._args.tracks is None))):
0227 msg = ("--tracks-for-alignment and --track-rate must be used "
0228 "together")
0229 self._parser.error(msg)
0230 else:
0231 self._args.events = int(math.ceil(self._args.tracks /
0232 self._args.rate))
0233 print_msg("Requested {0:d} tracks with {1:.2f} tracks/event "
0234 "-> {2:d} events for alignment."
0235 .format(self._args.tracks, self._args.rate,
0236 self._args.events))
0237 else:
0238 if (self._args.tracks is not None) or (self._args.rate is not None) or self._args.all_events:
0239 msg = ("-n/--events-for-alignment must not be used with "
0240 "--tracks-for-alignment, --track-rate, or --all-events")
0241 self._parser.error(msg)
0242 print_msg("Requested {0:d} events for alignment."
0243 .format(self._args.events))
0244
0245 for dataset in self._args.datasets:
0246 if not re.match(self._dataset_regex, dataset):
0247 print_msg("Dataset pattern '"+dataset+"' is not in CMS format.")
0248 sys.exit(1)
0249
0250 nonzero_events_per_iov = (self._args.minimum_events_in_iov > 0)
0251 if nonzero_events_per_iov and self._args.fraction <= 0:
0252 print_msg("Setting minimum number of events per IOV for alignment "
0253 "to 0 because a non-positive fraction of alignment events"
0254 " is chosen: {}".format(self._args.fraction))
0255 nonzero_events_per_iov = False
0256 self._args.minimum_events_in_iov = 0
0257 if nonzero_events_per_iov and self._args.events <= 0:
0258 print_msg("Setting minimum number of events per IOV for alignment "
0259 "to 0 because a non-positive number of alignment events"
0260 " is chosen: {}".format(self._args.events))
0261 nonzero_events_per_iov = False
0262 self._args.minimum_events_in_iov = 0
0263
0264
0265 def _prepare_iov_datastructures(self):
0266 """Create the needed objects for IOV handling."""
0267
0268 self._iovs = sorted(set(self._args.iovs))
0269 if len(self._iovs) == 0: self._iovs.append(1)
0270 self._iov_info_alignment = {iov: {"events": 0, "files": []}
0271 for iov in self._iovs}
0272 self._iov_info_validation = {iov: {"events": 0, "files": []}
0273 for iov in self._iovs}
0274
0275 self._miniiovs = sorted(set(self._iovs) | set(self._args.miniiovs))
0276
0277
0278 def _get_iovs(self, runs, useminiiovs=False):
0279 """
0280 Return the IOV start for `run`. Returns 'None' if the run is before any
0281 defined IOV.
0282
0283 Arguments:
0284 - `runs`: run numbers
0285 """
0286
0287 iovlist = self._miniiovs if useminiiovs else self._iovs
0288
0289 iovs = []
0290 for run in runs:
0291 iov_index = bisect.bisect(iovlist, run)
0292 if iov_index > 0: iovs.append(iovlist[iov_index-1])
0293 return iovs
0294
0295
0296 def _prepare_run_datastructures(self):
0297 """Create the needed objects for run-by-run validation file lists."""
0298
0299 self._run_info = {}
0300
0301
0302 def _add_file_info(self, container, keys, fileinfo):
0303 """Add file with `file_name` to `container` using `key`.
0304
0305 Arguments:
0306 - `container`: dictionary holding information on files and event counts
0307 - `keys`: keys to which the info should be added; will be created if not
0308 existing
0309 - `file_name`: name of a dataset file
0310 """
0311
0312 for key in keys:
0313 if key not in container:
0314 container[key] = {"events": 0,
0315 "files": []}
0316 container[key]["events"] += fileinfo.nevents / len(keys)
0317 if fileinfo not in container[key]["files"]:
0318 container[key]["files"].append(fileinfo)
0319
0320
0321 def _remove_file_info(self, container, keys, fileinfo):
0322 """Remove file with `file_name` to `container` using `key`.
0323
0324 Arguments:
0325 - `container`: dictionary holding information on files and event counts
0326 - `keys`: keys from which the info should be removed
0327 - `file_name`: name of a dataset file
0328 - `event_count`: number of events in `file_name`
0329 """
0330
0331 for key in keys:
0332 if key not in container: continue
0333 try:
0334 index = container[key]["files"].index(fileinfo)
0335 except ValueError:
0336 return
0337 del container[key]["files"][index]
0338 container[key]["events"] -= fileinfo.nevents / len(keys)
0339
0340
0341 def _request_dataset_information(self):
0342 """Retrieve general dataset information and create file list."""
0343
0344 if not self._cache.empty:
0345 print_msg("Using cached information.")
0346 (self._events_in_dataset,
0347 self._files,
0348 self._file_info,
0349 self._max_run) = self._cache.get()
0350 self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
0351 if self._args.random: random.shuffle(self._files)
0352 return
0353
0354
0355
0356
0357 number_of_processes = multiprocessing.cpu_count() - 1
0358 number_of_processes = (number_of_processes
0359 if number_of_processes > 0
0360 else 1)
0361 pool = multiprocessing.Pool(
0362 processes = number_of_processes,
0363 initializer = lambda: signal.signal(signal.SIGINT, signal.SIG_IGN))
0364
0365 print_msg("Requesting information for the following dataset(s):")
0366 for d in self._datasets: print_msg("\t"+d)
0367 print_msg("This may take a while...")
0368
0369 result = pool.map_async(get_events_per_dataset, self._datasets).get(3600)
0370 self._events_in_dataset = sum(result)
0371
0372 result = pool.map_async(get_max_run, self._datasets).get(3600)
0373 self._max_run = max(result)
0374
0375 result = sum(pool.map_async(get_file_info, self._datasets).get(3600), [])
0376 files = pool.map_async(_make_file_info, result).get(3600)
0377 self._file_info = sorted(fileinfo for fileinfo in files)
0378
0379 self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
0380
0381 if self._args.test_mode:
0382 self._file_info = self._file_info[-200:]
0383 self._files = [fileinfo.name for fileinfo in self._file_info]
0384
0385
0386 self._cache.set(self._events_in_dataset, self._files, self._file_info,
0387 self._max_run)
0388 self._cache.dump()
0389 if self._args.random:
0390 random.shuffle(self._file_info)
0391 self._files = [fileinfo.name for fileinfo in self._file_info]
0392
0393 def _create_file_lists(self):
0394 """Create file lists for alignment and validation."""
0395
0396
0397 self._files_alignment = []
0398 self._files_validation = []
0399 self._events_for_alignment = 0
0400 self._events_for_validation = 0
0401
0402 max_range = (0
0403 if self._args.events <= 0
0404 else int(math.ceil(len(self._files)*self._args.fraction)))
0405 use_for_alignment = True
0406 for i, fileinfo in enumerate(self._file_info):
0407 enough_events = self._events_for_alignment >= self._args.events
0408 fraction_exceeded = i >= max_range
0409 if enough_events or fraction_exceeded: use_for_alignment = False
0410
0411 dataset, f, number_of_events, runs = fileinfo
0412
0413 iovs = self._get_iovs(runs)
0414 if use_for_alignment:
0415 if iovs:
0416 self._events_for_alignment += number_of_events
0417 self._files_alignment.append(fileinfo)
0418 self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
0419 else:
0420 max_range += 1
0421 else:
0422 if iovs:
0423 self._events_for_validation += number_of_events
0424 self._files_validation.append(fileinfo)
0425 self._add_file_info(self._iov_info_validation, iovs, fileinfo)
0426 if self._args.run_by_run:
0427 self._add_file_info(self._run_info, runs, fileinfo)
0428
0429 self._fulfill_iov_eventcount()
0430
0431 self._split_hippy_jobs()
0432
0433
0434 def _fulfill_iov_eventcount(self):
0435 """
0436 Try to fulfill the requirement on the minimum number of events per IOV
0437 in the alignment file list by picking files from the validation list.
0438 """
0439
0440 for iov in self._iovs:
0441 if self._iov_info_alignment[iov]["events"] >= self._args.minimum_events_in_iov: continue
0442 for fileinfo in self._files_validation[:]:
0443 dataset, f, number_of_events, runs = fileinfo
0444 iovs = self._get_iovs(runs)
0445 if iov in iovs:
0446 self._files_alignment.append(fileinfo)
0447 self._events_for_alignment += number_of_events
0448 self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
0449
0450 self._events_for_validation -= number_of_events
0451 self._remove_file_info(self._iov_info_validation, iovs, fileinfo)
0452 if self._args.run_by_run:
0453 self._remove_file_info(self._run_info, runs, fileinfo)
0454 self._files_validation.remove(fileinfo)
0455
0456 if (self._iov_info_alignment[iov]["events"]
0457 >= self._args.minimum_events_in_iov):
0458 break
0459
0460 def _split_hippy_jobs(self):
0461 hippyjobs = {}
0462 for dataset, miniiov in itertools.product(self._datasets, self._miniiovs):
0463 jobsforminiiov = []
0464 hippyjobs[dataset,miniiov] = jobsforminiiov
0465 eventsinthisjob = float("inf")
0466 for fileinfo in self._files_alignment:
0467 if fileinfo.dataset != dataset: continue
0468 miniiovs = set(self._get_iovs(fileinfo.runs, useminiiovs=True))
0469 if miniiov not in miniiovs: continue
0470 if len(miniiovs) > 1:
0471 hippyjobs[dataset,miniiov] = []
0472 if eventsinthisjob >= self._args.hippy_events_per_job:
0473 currentjob = []
0474 jobsforminiiov.append(currentjob)
0475 eventsinthisjob = 0
0476 currentjob.append(fileinfo)
0477 currentjob.sort()
0478 eventsinthisjob += fileinfo.nevents
0479
0480 self._hippy_jobs = {
0481 (dataset, iov): sum((hippyjobs[dataset, miniiov]
0482 for miniiov in self._miniiovs
0483 if iov == max(_ for _ in self._iovs if _ <= miniiov)), []
0484 )
0485 for dataset, iov in itertools.product(self._datasets, self._iovs)
0486 }
0487
0488 def _print_eventcounts(self):
0489 """Print the event counts per file list and per IOV."""
0490
0491 log = os.path.join(self._output_dir, FileListCreator._event_count_log)
0492
0493 print_msg("Using {0:d} events for alignment ({1:.2f}%)."
0494 .format(self._events_for_alignment,
0495 100.0*
0496 self._events_for_alignment/self._events_in_dataset),
0497 log_file = log)
0498 for iov in sorted(self._iov_info_alignment):
0499 print_msg(("Approximate events" if self.rereco else "Events") + " for alignment in IOV since {0:f}: {1:f}"
0500 .format(iov, self._iov_info_alignment[iov]["events"]),
0501 log_file = log)
0502
0503 print_msg("Using {0:d} events for validation ({1:.2f}%)."
0504 .format(self._events_for_validation,
0505 100.0*
0506 self._events_for_validation/self._events_in_dataset),
0507 log_file = log)
0508
0509 for iov in sorted(self._iov_info_validation):
0510 msg = ("Approximate events" if self.rereco else "Events") + " for validation in IOV since {0:f}: {1:f}".format(
0511 iov, self._iov_info_validation[iov]["events"])
0512 if (self._iov_info_validation[iov]["events"]
0513 < self._args.minimum_events_validation):
0514 msg += " (not enough events -> no dataset file will be created)"
0515 print_msg(msg, log_file = log)
0516
0517 for run in sorted(self._run_info):
0518 msg = ("Approximate events" if self.rereco else "Events") + " for validation in run {0:f}: {1:f}".format(
0519 run, self._run_info[run]["events"])
0520 if (self._run_info[run]["events"]
0521 < self._args.minimum_events_validation):
0522 msg += " (not enough events -> no dataset file will be created)"
0523 print_msg(msg, log_file = log)
0524
0525 unused_events = (self._events_in_dataset
0526 - self._events_for_validation
0527 - self._events_for_alignment)
0528 if unused_events > 0 != self._events_in_dataset:
0529 print_msg("Unused events: {0:d} ({1:.2f}%)"
0530 .format(unused_events,
0531 100.0*unused_events/self._events_in_dataset),
0532 log_file = log)
0533
0534
0535 def _create_dataset_ini_section(self, name, collection, json_file = None):
0536 """Write dataset ini snippet.
0537
0538 Arguments:
0539 - `name`: name of the dataset section
0540 - `collection`: track collection of this dataset
0541 - `json_file`: JSON file to be used for this dataset (optional)
0542 """
0543
0544 if json_file:
0545 splitted = name.split("_since")
0546 file_list = "_since".join(splitted[:-1]
0547 if len(splitted) > 1
0548 else splitted)
0549 else:
0550 file_list = name
0551 output = "[dataset:{}]\n".format(name)
0552 output += "collection = {}\n".format(collection)
0553 output += "inputFileList = ${{datasetdir}}/{}.txt\n".format(file_list)
0554 output += "json = ${{datasetdir}}/{}\n".format(json_file) if json_file else ""
0555
0556 if collection in ("ALCARECOTkAlCosmicsCTF0T",
0557 "ALCARECOTkAlCosmicsInCollisions"):
0558 if self._first_dataset_ini:
0559 print_msg("\tDetermined cosmics dataset, i.e. please replace "
0560 "'DUMMY_DECO_MODE_FLAG' and 'DUMMY_ZERO_TESLA_FLAG' "
0561 "with the correct values.")
0562 self._first_dataset_ini = False
0563 output += "cosmicsDecoMode = DUMMY_DECO_MODE_FLAG\n"
0564 output += "cosmicsZeroTesla = DUMMY_ZERO_TESLA_FLAG\n"
0565 output += "\n"
0566
0567 return output
0568
0569
0570 def _create_json_file(self, name, first, last = None):
0571 """
0572 Create JSON file with `name` covering runs from `first` to `last`. If a
0573 global JSON is provided, the resulting file is the intersection of the
0574 file created here and the global one.
0575 Returns the name of the created JSON file.
0576
0577 Arguments:
0578 - `name`: name of the creted JSON file
0579 - `first`: first run covered by the JSON file
0580 - `last`: last run covered by the JSON file
0581
0582 """
0583
0584 if last is None: last = self._max_run
0585 name += "_JSON.txt"
0586 print_msg("Creating JSON file: "+name)
0587
0588 json_file = LumiList.LumiList(runs = range(first, last+1))
0589 if self._args.json:
0590 global_json = LumiList.LumiList(filename = self._args.json)
0591 json_file = json_file & global_json
0592 json_file.writeJSON(os.path.join(self._output_dir, name))
0593
0594 return name
0595
0596
0597 def _get_track_collection(self, edm_file):
0598 """Extract track collection from given `edm_file`.
0599
0600 Arguments:
0601 - `edm_file`: CMSSW dataset file
0602 """
0603
0604
0605 cmd = ["edmDumpEventContent", r"root://cms-xrd-global.cern.ch/"+edm_file]
0606 try:
0607 event_content = subprocess.check_output(cmd).split("\n")
0608 except subprocess.CalledProcessError as e:
0609 splitted = edm_file.split("/")
0610 try:
0611 alcareco = splitted[splitted.index("ALCARECO")+1].split("-")[0]
0612 alcareco = alcareco.replace("TkAlCosmics0T", "TkAlCosmicsCTF0T")
0613 alcareco = "ALCARECO" + alcareco
0614 print_msg("\tDetermined track collection as '{}'.".format(alcareco))
0615 return alcareco
0616 except ValueError:
0617 if "RECO" in splitted:
0618 print_msg("\tDetermined track collection as 'generalTracks'.")
0619 return "generalTracks"
0620 else:
0621 print_msg("\tCould not determine track collection "
0622 "automatically.")
0623 print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
0624 "the correct value.")
0625 return "DUMMY_TRACK_COLLECTION"
0626
0627 track_collections = []
0628 for line in event_content:
0629 splitted = line.split()
0630 if len(splitted) > 0 and splitted[0] == r"vector<reco::Track>":
0631 track_collections.append(splitted[1].strip().strip('"'))
0632 if len(track_collections) == 0:
0633 print_msg("No track collection found in file '{}'.".format(edm_file))
0634 sys.exit(1)
0635 elif len(track_collections) == 1:
0636 print_msg("\tDetermined track collection as "
0637 "'{}'.".format(track_collections[0]))
0638 return track_collections[0]
0639 else:
0640 alcareco_tracks = filter(lambda x: x.startswith("ALCARECO"),
0641 track_collections)
0642 if len(alcareco_tracks) == 0 and "generalTracks" in track_collections:
0643 print_msg("\tDetermined track collection as 'generalTracks'.")
0644 return "generalTracks"
0645 elif len(alcareco_tracks) == 1:
0646 print_msg("\tDetermined track collection as "
0647 "'{}'.".format(alcareco_tracks[0]))
0648 return alcareco_tracks[0]
0649 print_msg("\tCould not unambiguously determine track collection in "
0650 "file '{}':".format(edm_file))
0651 print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
0652 "the correct value from the following list.")
0653 for collection in track_collections:
0654 print_msg("\t - "+collection)
0655 return "DUMMY_TRACK_COLLECTION"
0656
0657
0658 def _write_file_lists(self):
0659 """Write file lists to disk."""
0660
0661 self._create_dataset_txt(self._formatted_dataset, self._files_alignment)
0662 self._create_hippy_txt(self._formatted_dataset, sum(self._hippy_jobs.values(), []))
0663 self._create_dataset_cff(
0664 "_".join(["Alignment", self._formatted_dataset]),
0665 self._files_alignment)
0666
0667 self._create_dataset_cff(
0668 "_".join(["Validation", self._formatted_dataset]),
0669 self._files_validation)
0670
0671
0672 if self._args.create_ini:
0673 dataset_ini_general = "[general]\n"
0674 dataset_ini_general += "datasetdir = {}\n".format(self._output_dir)
0675 dataset_ini_general += ("json = {}\n\n".format(self._args.json)
0676 if self._args.json
0677 else "\n")
0678
0679 ini_path = self._formatted_dataset + ".ini"
0680 print_msg("Creating dataset ini file: " + ini_path)
0681 ini_path = os.path.join(self._output_dir, ini_path)
0682
0683 collection = self._get_track_collection(self._files[0])
0684
0685 with open(ini_path, "w") as f:
0686 f.write(dataset_ini_general)
0687 f.write(self._create_dataset_ini_section(
0688 self._formatted_dataset, collection))
0689
0690 iov_wise_ini = dataset_ini_general
0691
0692 for i,iov in enumerate(sorted(self._iovs)):
0693 iov_str = "since{0:d}".format(iov)
0694 iov_str = "_".join([self._formatted_dataset, iov_str])
0695
0696 if self.rereco:
0697 if i == len(self._iovs) - 1:
0698 last = None
0699 else:
0700 last = sorted(self._iovs)[i+1] - 1
0701 local_json = self._create_json_file(iov_str, iov, last)
0702 else:
0703 local_json = None
0704
0705 if self._args.create_ini:
0706 iov_wise_ini += self._create_dataset_ini_section(iov_str,
0707 collection,
0708 local_json)
0709
0710 self._create_dataset_txt(iov_str,
0711 self._iov_info_alignment[iov]["files"])
0712 self._create_hippy_txt(iov_str, sum((self._hippy_jobs[dataset,iov] for dataset in self._datasets), []))
0713 self._create_dataset_cff(
0714 "_".join(["Alignment", iov_str]),
0715 self._iov_info_alignment[iov]["files"],
0716 json_file=local_json)
0717
0718 if (self._iov_info_validation[iov]["events"]
0719 < self._args.minimum_events_validation):
0720 continue
0721 self._create_dataset_cff(
0722 "_".join(["Validation", iov_str]),
0723 self._iov_info_validation[iov]["files"],
0724 json_file=local_json)
0725
0726 if self._args.create_ini and iov_wise_ini != dataset_ini_general:
0727 ini_path = self._formatted_dataset + "_IOVs.ini"
0728 print_msg("Creating dataset ini file: " + ini_path)
0729 ini_path = os.path.join(self._output_dir, ini_path)
0730 with open(ini_path, "w") as f: f.write(iov_wise_ini)
0731
0732 for run in sorted(self._run_info):
0733 if args.rereco: continue
0734 if (self._run_info[run]["events"]
0735 < self._args.minimum_events_validation):
0736 continue
0737 self._create_dataset_cff(
0738 "_".join(["Validation", self._formatted_dataset, str(run)]),
0739 self._run_info[run]["files"])
0740
0741
0742 def _create_dataset_txt(self, name, file_list):
0743 """Write alignment file list to disk.
0744
0745 Arguments:
0746 - `name`: name of the file list
0747 - `file_list`: list of files to write to `name`
0748 """
0749
0750 name += ".txt"
0751 print_msg("Creating dataset file list: "+name)
0752 with open(os.path.join(self._output_dir, name), "w") as f:
0753 f.write("\n".join(fileinfo.name for fileinfo in file_list))
0754
0755
0756 def _create_hippy_txt(self, name, job_list):
0757 name += "_hippy.txt"
0758 print_msg("Creating dataset file list for HipPy: "+name)
0759 with open(os.path.join(self._output_dir, name), "w") as f:
0760 f.write("\n".join(",".join("'"+fileinfo.name+"'" for fileinfo in job) for job in job_list)+"\n")
0761
0762
0763 def _create_dataset_cff(self, name, file_list, json_file = None):
0764 """
0765 Create configuration fragment to define a dataset.
0766
0767 Arguments:
0768 - `name`: name of the configuration fragment
0769 - `file_list`: list of files to write to `name`
0770 - `json_file`: JSON file to be used for this dataset (optional)
0771 """
0772
0773 if json_file is None: json_file = self._args.json
0774 if json_file is not None:
0775 json_file = os.path.join(self._output_dir, json_file)
0776
0777 name = "_".join(["Dataset",name, "cff.py"])
0778 print_msg("Creating dataset configuration fragment: "+name)
0779
0780 file_list_str = ""
0781 for sub_list in get_chunks(file_list, 255):
0782 file_list_str += ("readFiles.extend([\n'"+
0783 "',\n'".join(fileinfo.name for fileinfo in sub_list)+
0784 "'\n])\n")
0785
0786 fragment = FileListCreator._dataset_template.format(
0787 lumi_def = ("import FWCore.PythonUtilities.LumiList as LumiList\n\n"
0788 "lumiSecs = cms.untracked.VLuminosityBlockRange()\n"
0789 "goodLumiSecs = LumiList.LumiList(filename = "
0790 "'{0:s}').getCMSSWString().split(',')"
0791 .format(json_file)
0792 if json_file else ""),
0793 lumi_arg = ("lumisToProcess = lumiSecs,\n "
0794 if json_file else ""),
0795 lumi_extend = "lumiSecs.extend(goodLumiSecs)" if json_file else "",
0796 files = file_list_str)
0797
0798 with open(os.path.join(self._output_dir, name), "w") as f:
0799 f.write(fragment)
0800
0801
0802 _dataset_template = """\
0803 import FWCore.ParameterSet.Config as cms
0804 {lumi_def:s}
0805 readFiles = cms.untracked.vstring()
0806 source = cms.Source("PoolSource",
0807 {lumi_arg:s}fileNames = readFiles)
0808 {files:s}{lumi_extend:s}
0809 maxEvents = cms.untracked.PSet(input = cms.untracked.int32(-1))
0810 """
0811
0812
0813 class _DasCache(object):
0814 """Helper class to cache information from DAS requests."""
0815
0816 def __init__(self, file_list_id):
0817 """Constructor of the cache.
0818
0819 Arguments:
0820 - `file_list_id`: ID of the cached file lists
0821 """
0822
0823 self._file_list_id = file_list_id
0824 self._cache_file_name = os.path.join(file_list_id, ".das_cache.pkl")
0825 self.reset()
0826
0827
0828 def reset(self):
0829 """Reset the cache contents and the 'empty' flag."""
0830
0831 self._empty = True
0832 self._events_in_dataset = 0
0833 self._files = []
0834 self._file_info = []
0835 self._max_run = None
0836
0837
0838 def set(self, total_events, file_list, file_info, max_run):
0839 """Set the content of the cache.
0840
0841 Arguments:
0842 - `total_events`: total number of events in dataset
0843 - `file_list`: list of files in dataset
0844 - `file_info`: dictionary with numbers of events per file
0845 - `max_run`: highest run number contained in the dataset
0846 """
0847
0848 self._events_in_dataset = total_events
0849 self._files = file_list
0850 self._file_info = file_info
0851 self._max_run = max_run
0852 self._empty = False
0853
0854
0855 def get(self):
0856 """
0857 Get the content of the cache as tuple:
0858 result = (total number of events in dataset,
0859 list of files in dataset,
0860 dictionary with numbers of events and runs per file)
0861 """
0862
0863 return self._events_in_dataset, self._files, self._file_info, self._max_run
0864
0865
0866 def load(self):
0867 """Loads the cached contents."""
0868
0869 if not self.empty:
0870 print_msg("Overriding file information with cached information.")
0871 try:
0872 with open(self._cache_file_name, "rb") as f:
0873 tmp_dict = cPickle.load(f)
0874 self.__dict__.update(tmp_dict)
0875 except IOError as e:
0876 if e.args == (2, "No such file or directory"):
0877 msg = "Failed to load cache for '{}'.".format(self._file_list_id)
0878 if not self.empty:
0879 msg += " Keeping the previous file information."
0880 print_msg(msg)
0881 else:
0882 raise
0883
0884
0885 def dump(self):
0886 """Dumps the contents to the cache file."""
0887
0888 if self.empty:
0889 print_msg("Cache is empty. Not writing to file.")
0890 return
0891
0892 with open(self._cache_file_name, "wb") as f:
0893 cPickle.dump(self.__dict__, f, 2)
0894
0895
0896 @property
0897 def empty(self):
0898 """
0899 Flag indicating whether the cache is empty or has been filled (possibly
0900 with nothing).
0901 """
0902
0903 return self._empty
0904
0905
0906
0907
0908 def das_client(query, check_key = None):
0909 """
0910 Submit `query` to DAS client and handle possible errors.
0911 Further treatment of the output might be necessary.
0912
0913 Arguments:
0914 - `query`: DAS query
0915 - `check_key`: optional key to be checked for; retriggers query if needed
0916 """
0917
0918 error = True
0919 das_data = {'status': 'error'}
0920 for i in range(5):
0921 try:
0922 das_data = cmssw_das_client.get_data(query, limit = 0)
0923 except IOError as e:
0924 if e.errno == 14:
0925 continue
0926 except ValueError as e:
0927 if str(e) == "No JSON object could be decoded":
0928 das_data['reason'] = str(e)
0929 continue
0930
0931 if das_data["status"] == "ok":
0932 if das_data["nresults"] == 0 or check_key is None:
0933 error = False
0934 break
0935
0936 result_count = 0
0937 for d in find_key(das_data["data"], [check_key]):
0938 result_count += len(d)
0939 if result_count == 0:
0940 das_data["status"] = "error"
0941 das_data["reason"] = ("DAS did not return required data.")
0942 continue
0943 else:
0944 error = False
0945 break
0946
0947 if das_data["status"] == "error":
0948 print_msg("DAS query '{}' failed 5 times. "
0949 "The last time for the the following reason:".format(query))
0950 print(das_data.get("reason", "ERROR:UNKNOWN"))
0951 sys.exit(1)
0952 return das_data["data"]
0953
0954
0955 def find_key(collection, key_chain):
0956 """Searches for `key` in `collection` and returns first corresponding value.
0957
0958 Arguments:
0959 - `collection`: list of dictionaries
0960 - `key_chain`: chain of keys to be searched for
0961 """
0962
0963 result = None
0964 for i,key in enumerate(key_chain):
0965 for item in collection:
0966 if key in item:
0967 if i == len(key_chain) - 1:
0968 result = item[key]
0969 else:
0970 try:
0971 result = find_key(item[key], key_chain[i+1:])
0972 except LookupError:
0973 pass
0974 else:
0975 pass
0976
0977 if result is not None: return result
0978 raise LookupError(key_chain, collection)
0979
0980
0981 def print_msg(text, line_break = True, log_file = None):
0982 """Formatted printing of `text`.
0983
0984 Arguments:
0985 - `text`: string to be printed
0986 """
0987
0988 msg = " >>> " + str(text)
0989 if line_break:
0990 print(msg)
0991 else:
0992 print(msg, end=' ')
0993 sys.stdout.flush()
0994 if log_file:
0995 with open(log_file, "a") as f: f.write(msg+"\n")
0996 return msg
0997
0998
0999 def get_runs(file_name):
1000 """
1001 Try to guess the run number from `file_name`. If run could not be
1002 determined, gets the run numbers from DAS (slow!)
1003
1004 Arguments:
1005 - `file_name`: name of the considered file
1006 """
1007 try:
1008 return [int("".join(file_name.split("/")[-4:-2]))]
1009 except ValueError:
1010 query = "run file="+file_name+" system=dbs3"
1011 return [int(_) for _ in find_key(das_client(query), ["run", "run_number"])]
1012
1013
1014 def get_max_run(dataset_name):
1015 """Retrieve the maximum run number in `dataset_name`.
1016
1017 Arguments:
1018 - `dataset_name`: name of the dataset
1019 """
1020
1021 data = das_client("run dataset={0:s} system=dbs3".format(dataset_name))
1022 runs = [f["run"][0]["run_number"] for f in data]
1023 return max(runs)
1024
1025
1026 def get_files(dataset_name):
1027 """Retrieve list of files in `dataset_name`.
1028
1029 Arguments:
1030 - `dataset_name`: name of the dataset
1031 """
1032
1033 data = das_client(("file dataset={0:s} system=dbs3 detail=True | "+
1034 "grep file.name, file.nevents > 0").format(dataset_name),
1035 "file")
1036 return [find_key(f["file"], ["name"]) for f in data]
1037
1038
1039 def get_datasets(dataset_pattern):
1040 """Retrieve list of dataset matching `dataset_pattern`.
1041
1042 Arguments:
1043 - `dataset_pattern`: pattern of dataset names
1044 """
1045
1046 data = das_client("dataset dataset={0:s} system=dbs3 detail=True"
1047 "| grep dataset.name".format(dataset_pattern), "dataset")
1048 return sorted(set([find_key(f["dataset"], ["name"]) for f in data]))
1049
1050
1051 def get_events_per_dataset(dataset_name):
1052 """Retrieve the number of a events in `dataset_name`.
1053
1054 Arguments:
1055 - `dataset_name`: name of a dataset
1056 """
1057
1058 return _get_events("dataset", dataset_name)
1059
1060
1061 def get_events_per_file(file_name):
1062 """Retrieve the number of a events in `file_name`.
1063
1064 Arguments:
1065 - `file_name`: name of a dataset file
1066 """
1067
1068 return _get_events("file", file_name)
1069
1070
1071 def _get_events(entity, name):
1072 """Retrieve the number of events from `entity` called `name`.
1073
1074 Arguments:
1075 - `entity`: type of entity
1076 - `name`: name of entity
1077 """
1078
1079 data = das_client("{0:s}={1:s} system=dbs3 detail=True | grep {0:s}.nevents"
1080 .format(entity, name), entity)
1081 return int(find_key(data, [entity, "nevents"]))
1082
1083
1084 def _get_properties(name, entity, properties, filters = None, sub_entity = None,
1085 aggregators = None):
1086 """Retrieve `properties` from `entity` called `name`.
1087
1088 Arguments:
1089 - `name`: name of entity
1090 - `entity`: type of entity
1091 - `properties`: list of property names
1092 - `filters`: list of filters on properties
1093 - `sub_entity`: type of entity from which to extract the properties;
1094 defaults to `entity`
1095 - `aggregators`: additional aggregators/filters to amend to query
1096 """
1097
1098 if sub_entity is None: sub_entity = entity
1099 if filters is None: filters = []
1100 props = ["{0:s}.{1:s}".format(sub_entity,prop.split()[0])
1101 for prop in properties]
1102 conditions = ["{0:s}.{1:s}".format(sub_entity, filt)
1103 for filt in filters]
1104 add_ons = "" if aggregators is None else " | "+" | ".join(aggregators)
1105
1106 data = das_client("{0:s} {1:s}={2:s} system=dbs3 detail=True | grep {3:s}{4:s}"
1107 .format(sub_entity, entity, name,
1108 ", ".join(props+conditions), add_ons), sub_entity)
1109 return [[find_key(f[sub_entity], [prop]) for prop in properties] for f in data]
1110
1111 def get_file_info(dataset):
1112 result = _get_properties(name=dataset,
1113 properties = ["name", "nevents"],
1114 filters = ["nevents > 0"],
1115 entity = "dataset",
1116 sub_entity = "file")
1117 return [(dataset, name, nevents) for name, nevents in result]
1118
1119
1120
1121 FileInfo = collections.namedtuple("FileInfo", "dataset name nevents runs")
1122
1123 def _make_file_info(dataset_name_nevents):
1124 return FileInfo(*dataset_name_nevents, runs=get_runs(dataset_name_nevents[1]))
1125
1126 def get_chunks(long_list, chunk_size):
1127 """
1128 Generates list of sub-lists of `long_list` with a maximum size of
1129 `chunk_size`.
1130
1131 Arguments:
1132 - `long_list`: original list
1133 - `chunk_size`: maximum size of created sub-lists
1134 """
1135
1136 for i in range(0, len(long_list), chunk_size):
1137 yield long_list[i:i+chunk_size]
1138
1139
1140 def merge_strings(strings):
1141 """Merge strings in `strings` into a common string.
1142
1143 Arguments:
1144 - `strings`: list of strings
1145 """
1146
1147 if type(strings) == str:
1148 return strings
1149 elif len(strings) == 0:
1150 return ""
1151 elif len(strings) == 1:
1152 return strings[0]
1153 elif len(strings) == 2:
1154 first = strings[0]
1155 second = strings[1]
1156 else:
1157 first = merge_strings(strings[:-1])
1158 second = strings[-1]
1159
1160 merged_string = ""
1161 blocks = difflib.SequenceMatcher(None, first, second).get_matching_blocks()
1162
1163 last_i, last_j, last_n = 0, 0, 0
1164 for i, j, n in blocks:
1165 merged_string += first[last_i+last_n:i]
1166 merged_string += second[last_j+last_n:j]
1167 merged_string += first[i:i+n]
1168 last_i, last_j, last_n = i, j, n
1169
1170 return str(merged_string)
1171
1172
1173
1174 if __name__ == "__main__":
1175 try:
1176 main()
1177 except KeyboardInterrupt:
1178 pass