File indexing completed on 2024-11-25 02:29:02
0001
0002
0003
0004 from builtins import range
0005 import os
0006 import re
0007 import sys
0008 import glob
0009 import json
0010 import math
0011 import bisect
0012 import random
0013 import signal
0014 if sys.version_info[0]>2:
0015 import _pickle as cPickle
0016 else:
0017 import cPickle
0018 import difflib
0019 import argparse
0020 import functools
0021 import itertools
0022 import subprocess
0023 import collections
0024 import multiprocessing
0025 import FWCore.PythonUtilities.LumiList as LumiList
0026 import Utilities.General.cmssw_das_client as cmssw_das_client
0027 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0028
0029
0030
0031 def main(argv = None):
0032 """
0033 Main routine. Not called, if this module is loaded via `import`.
0034
0035 Arguments:
0036 - `argv`: Command line arguments passed to the script.
0037 """
0038
0039 if argv == None:
0040 argv = sys.argv[1:]
0041
0042 file_list_creator = FileListCreator(argv)
0043 file_list_creator.create()
0044
0045
0046
0047 class FileListCreator(object):
0048 """Create file lists for alignment and validation for a given dataset.
0049 """
0050
0051 def __init__(self, argv):
0052 """Constructor taking the command line arguments.
0053
0054 Arguments:
0055 - `args`: command line arguments
0056 """
0057
0058 self._first_dataset_ini = True
0059 self._parser = self._define_parser()
0060 self._args = self._parser.parse_args(argv)
0061
0062 if not mps_tools.check_proxy():
0063 print_msg(
0064 "Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0065 sys.exit(1)
0066
0067 self._dataset_regex = re.compile(r"^/([^/]+)/([^/]+)/([^/]+)$")
0068 self._validate_input()
0069
0070 if self._args.test_mode:
0071 import Configuration.PyReleaseValidation.relval_steps as rvs
0072 import Configuration.PyReleaseValidation.relval_production as rvp
0073 self._args.datasets = [rvs.steps[rvp.workflows[1000][1][0]]["INPUT"].dataSet]
0074 self._validate_input()
0075
0076 self._datasets = sorted([dataset
0077 for pattern in self._args.datasets
0078 for dataset in get_datasets(pattern)
0079 if re.search(self._args.dataset_filter, dataset)])
0080 if len(self._datasets) == 0:
0081 print_msg("Found no dataset matching the pattern(s):")
0082 for d in self._args.datasets: print_msg("\t"+d)
0083 sys.exit(1)
0084
0085 self._formatted_dataset = merge_strings(
0086 [re.sub(self._dataset_regex, r"\1_\2_\3", dataset)
0087 for dataset in self._datasets])
0088 self._output_dir = os.path.join(self._args.output_dir,
0089 self._formatted_dataset)
0090 self._output_dir = os.path.abspath(self._output_dir)
0091 self._cache = _DasCache(self._output_dir)
0092 self._prepare_iov_datastructures()
0093 self._prepare_run_datastructures()
0094
0095 try:
0096 os.makedirs(self._output_dir)
0097 except OSError as e:
0098 if e.args == (17, "File exists"):
0099 if self._args.force:
0100 pass
0101 elif self._args.use_cache:
0102 self._cache.load()
0103 else:
0104 print_msg("Directory '{}' already exists from previous runs"
0105 " of the script. Use '--use-cache' if you want to"
0106 " use the cached DAS-query results Or use "
0107 "'--force' to remove it."
0108 .format(self._output_dir))
0109 sys.exit(1)
0110 files = glob.glob(os.path.join(self._output_dir, "*"))
0111 for f in files: os.remove(f)
0112 else:
0113 raise
0114
0115
0116 def create(self):
0117 """Creates file list. To be called by user of the class."""
0118
0119 self._request_dataset_information()
0120 self._create_file_lists()
0121 self._print_eventcounts()
0122 self._write_file_lists()
0123
0124
0125 _event_count_log = "event_count_info.log"
0126
0127
0128 def _define_parser(self):
0129 """Definition of command line argument parser."""
0130
0131 parser = argparse.ArgumentParser(
0132 description = "Create file lists for alignment",
0133 epilog = ("The tool will create a directory containing all file "
0134 "lists and a log file with all relevant event counts "
0135 "('{}').".format(FileListCreator._event_count_log)))
0136 parser.add_argument("-i", "--input", dest = "datasets", required = True,
0137 metavar = "DATASET", action = "append",
0138 help = ("CMS dataset name; supports wildcards; "
0139 "use multiple times for multiple datasets"))
0140 parser.add_argument("--dataset-filter", default = "",
0141 help = "regex to match within in the datasets matched,"
0142 "in case the wildcard isn't flexible enough")
0143 parser.add_argument("-j", "--json", dest = "json", metavar = "PATH",
0144 help = "path to JSON file (optional)")
0145 parser.add_argument("-f", "--fraction", dest = "fraction",
0146 type = float, default = 1,
0147 help = "max. fraction of files used for alignment")
0148 parser.add_argument("--iov", dest = "iovs", metavar = "RUN", type = int,
0149 action = "append", default = [],
0150 help = ("define IOV by specifying first run; for "
0151 "multiple IOVs use this option multiple "
0152 "times; files from runs before the lowest "
0153 "IOV are discarded (default: 1)"))
0154 parser.add_argument("--miniiov", dest="miniiovs", metavar="RUN", type=int,
0155 action="append", default=[],
0156 help=("in addition to the standard IOVs, break up hippy jobs "
0157 "at these points, so that jobs from before and after "
0158 "these runs are not in the same job"))
0159 parser.add_argument("-r", "--random", action = "store_true",
0160 default = False, help = "select files randomly")
0161 parser.add_argument("-n", "--events-for-alignment", "--maxevents",
0162 dest = "events", type = int, metavar = "NUMBER",
0163 help = ("number of events needed for alignment; the"
0164 " remaining events in the dataset are used "
0165 "for validation; if n<=0, all events are "
0166 "used for validation"))
0167 parser.add_argument("--all-events", action = "store_true",
0168 help = "Use all events for alignment")
0169 parser.add_argument("--tracks-for-alignment", dest = "tracks",
0170 type = int, metavar = "NUMBER",
0171 help = "number of tracks needed for alignment")
0172 parser.add_argument("--track-rate", dest = "rate", type = float,
0173 metavar = "NUMBER",
0174 help = "number of tracks per event")
0175 parser.add_argument("--run-by-run", dest = "run_by_run",
0176 action = "store_true", default = False,
0177 help = "create validation file list for each run")
0178 parser.add_argument("--minimum-events-in-iov",
0179 dest = "minimum_events_in_iov", metavar = "NUMBER",
0180 type = int, default = 100000,
0181 help = ("minimum number of events for alignment per"
0182 " IOV; this option has a higher priority "
0183 "than '-f/--fraction' "
0184 "(default: %(default)s)"))
0185 parser.add_argument("--minimum-events-validation",
0186 dest = "minimum_events_validation",
0187 metavar = "NUMBER", type = int, default = 1,
0188 help = ("minimum number of events for validation; "
0189 "applies to IOVs; in case of --run-by-run "
0190 "it applies to runs runs "
0191 "(default: %(default)s)"))
0192 parser.add_argument("--use-cache", dest = "use_cache",
0193 action = "store_true", default = False,
0194 help = "use DAS-query results of previous run")
0195 parser.add_argument("-o", "--output-dir", dest = "output_dir",
0196 metavar = "PATH", default = os.getcwd(),
0197 help = "output base directory (default: %(default)s)")
0198 parser.add_argument("--create-ini", dest = "create_ini",
0199 action = "store_true", default = False,
0200 help = ("create dataset ini file based on the "
0201 "created file lists"))
0202 parser.add_argument("--force", action = "store_true", default = False,
0203 help = ("remove output directory from previous "
0204 "runs, if existing"))
0205 parser.add_argument("--hippy-events-per-job", type = int, default = 1,
0206 help = ("approximate number of events in each job for HipPy"))
0207 parser.add_argument("--test-mode", dest = "test_mode",
0208 action = "store_true", default = False,
0209 help = argparse.SUPPRESS)
0210 return parser
0211
0212
0213 def _validate_input(self):
0214 """Validate command line arguments."""
0215
0216 if self._args.events is None:
0217 if self._args.all_events:
0218 self._args.events = float("inf")
0219 print_msg("Using all tracks for alignment")
0220 elif (self._args.tracks is None) and (self._args.rate is None):
0221 msg = ("either -n/--events-for-alignment, --all-events, or both of "
0222 "--tracks-for-alignment and --track-rate are required")
0223 self._parser.error(msg)
0224 elif (((self._args.tracks is not None) and (self._args.rate is None)) or
0225 ((self._args.rate is not None)and (self._args.tracks is None))):
0226 msg = ("--tracks-for-alignment and --track-rate must be used "
0227 "together")
0228 self._parser.error(msg)
0229 else:
0230 self._args.events = int(math.ceil(self._args.tracks /
0231 self._args.rate))
0232 print_msg("Requested {0:d} tracks with {1:.2f} tracks/event "
0233 "-> {2:d} events for alignment."
0234 .format(self._args.tracks, self._args.rate,
0235 self._args.events))
0236 else:
0237 if (self._args.tracks is not None) or (self._args.rate is not None) or self._args.all_events:
0238 msg = ("-n/--events-for-alignment must not be used with "
0239 "--tracks-for-alignment, --track-rate, or --all-events")
0240 self._parser.error(msg)
0241 print_msg("Requested {0:d} events for alignment."
0242 .format(self._args.events))
0243
0244 for dataset in self._args.datasets:
0245 if not re.match(self._dataset_regex, dataset):
0246 print_msg("Dataset pattern '"+dataset+"' is not in CMS format.")
0247 sys.exit(1)
0248
0249 nonzero_events_per_iov = (self._args.minimum_events_in_iov > 0)
0250 if nonzero_events_per_iov and self._args.fraction <= 0:
0251 print_msg("Setting minimum number of events per IOV for alignment "
0252 "to 0 because a non-positive fraction of alignment events"
0253 " is chosen: {}".format(self._args.fraction))
0254 nonzero_events_per_iov = False
0255 self._args.minimum_events_in_iov = 0
0256 if nonzero_events_per_iov and self._args.events <= 0:
0257 print_msg("Setting minimum number of events per IOV for alignment "
0258 "to 0 because a non-positive number of alignment events"
0259 " is chosen: {}".format(self._args.events))
0260 nonzero_events_per_iov = False
0261 self._args.minimum_events_in_iov = 0
0262
0263
0264 def _prepare_iov_datastructures(self):
0265 """Create the needed objects for IOV handling."""
0266
0267 self._iovs = sorted(set(self._args.iovs))
0268 if len(self._iovs) == 0: self._iovs.append(1)
0269 self._iov_info_alignment = {iov: {"events": 0, "files": []}
0270 for iov in self._iovs}
0271 self._iov_info_validation = {iov: {"events": 0, "files": []}
0272 for iov in self._iovs}
0273
0274 self._miniiovs = sorted(set(self._iovs) | set(self._args.miniiovs))
0275
0276
0277 def _get_iovs(self, runs, useminiiovs=False):
0278 """
0279 Return the IOV start for `run`. Returns 'None' if the run is before any
0280 defined IOV.
0281
0282 Arguments:
0283 - `runs`: run numbers
0284 """
0285
0286 iovlist = self._miniiovs if useminiiovs else self._iovs
0287
0288 iovs = []
0289 for run in runs:
0290 iov_index = bisect.bisect(iovlist, run)
0291 if iov_index > 0: iovs.append(iovlist[iov_index-1])
0292 return iovs
0293
0294
0295 def _prepare_run_datastructures(self):
0296 """Create the needed objects for run-by-run validation file lists."""
0297
0298 self._run_info = {}
0299
0300
0301 def _add_file_info(self, container, keys, fileinfo):
0302 """Add file with `file_name` to `container` using `key`.
0303
0304 Arguments:
0305 - `container`: dictionary holding information on files and event counts
0306 - `keys`: keys to which the info should be added; will be created if not
0307 existing
0308 - `file_name`: name of a dataset file
0309 """
0310
0311 for key in keys:
0312 if key not in container:
0313 container[key] = {"events": 0,
0314 "files": []}
0315 container[key]["events"] += fileinfo.nevents / len(keys)
0316 if fileinfo not in container[key]["files"]:
0317 container[key]["files"].append(fileinfo)
0318
0319
0320 def _remove_file_info(self, container, keys, fileinfo):
0321 """Remove file with `file_name` to `container` using `key`.
0322
0323 Arguments:
0324 - `container`: dictionary holding information on files and event counts
0325 - `keys`: keys from which the info should be removed
0326 - `file_name`: name of a dataset file
0327 - `event_count`: number of events in `file_name`
0328 """
0329
0330 for key in keys:
0331 if key not in container: continue
0332 try:
0333 index = container[key]["files"].index(fileinfo)
0334 except ValueError:
0335 return
0336 del container[key]["files"][index]
0337 container[key]["events"] -= fileinfo.nevents / len(keys)
0338
0339
0340 def _request_dataset_information(self):
0341 """Retrieve general dataset information and create file list."""
0342
0343 if not self._cache.empty:
0344 print_msg("Using cached information.")
0345 (self._events_in_dataset,
0346 self._files,
0347 self._file_info,
0348 self._max_run) = self._cache.get()
0349 self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
0350 if self._args.random: random.shuffle(self._files)
0351 return
0352
0353
0354
0355
0356 number_of_processes = multiprocessing.cpu_count() - 1
0357 number_of_processes = (number_of_processes
0358 if number_of_processes > 0
0359 else 1)
0360 pool = multiprocessing.Pool(
0361 processes = number_of_processes,
0362 initializer = lambda: signal.signal(signal.SIGINT, signal.SIG_IGN))
0363
0364 print_msg("Requesting information for the following dataset(s):")
0365 for d in self._datasets: print_msg("\t"+d)
0366 print_msg("This may take a while...")
0367
0368 result = pool.map_async(get_events_per_dataset, self._datasets).get(3600)
0369 self._events_in_dataset = sum(result)
0370
0371 result = pool.map_async(get_max_run, self._datasets).get(3600)
0372 self._max_run = max(result)
0373
0374 result = sum(pool.map_async(get_file_info, self._datasets).get(3600), [])
0375 files = pool.map_async(_make_file_info, result).get(3600)
0376 self._file_info = sorted(fileinfo for fileinfo in files)
0377
0378 self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
0379
0380 if self._args.test_mode:
0381 self._file_info = self._file_info[-200:]
0382 self._files = [fileinfo.name for fileinfo in self._file_info]
0383
0384
0385 self._cache.set(self._events_in_dataset, self._files, self._file_info,
0386 self._max_run)
0387 self._cache.dump()
0388 if self._args.random:
0389 random.shuffle(self._file_info)
0390 self._files = [fileinfo.name for fileinfo in self._file_info]
0391
0392 def _create_file_lists(self):
0393 """Create file lists for alignment and validation."""
0394
0395
0396 self._files_alignment = []
0397 self._files_validation = []
0398 self._events_for_alignment = 0
0399 self._events_for_validation = 0
0400
0401 max_range = (0
0402 if self._args.events <= 0
0403 else int(math.ceil(len(self._files)*self._args.fraction)))
0404 use_for_alignment = True
0405 for i, fileinfo in enumerate(self._file_info):
0406 enough_events = self._events_for_alignment >= self._args.events
0407 fraction_exceeded = i >= max_range
0408 if enough_events or fraction_exceeded: use_for_alignment = False
0409
0410 dataset, f, number_of_events, runs = fileinfo
0411
0412 iovs = self._get_iovs(runs)
0413 if use_for_alignment:
0414 if iovs:
0415 self._events_for_alignment += number_of_events
0416 self._files_alignment.append(fileinfo)
0417 self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
0418 else:
0419 max_range += 1
0420 else:
0421 if iovs:
0422 self._events_for_validation += number_of_events
0423 self._files_validation.append(fileinfo)
0424 self._add_file_info(self._iov_info_validation, iovs, fileinfo)
0425 if self._args.run_by_run:
0426 self._add_file_info(self._run_info, runs, fileinfo)
0427
0428 self._fulfill_iov_eventcount()
0429
0430 self._split_hippy_jobs()
0431
0432
0433 def _fulfill_iov_eventcount(self):
0434 """
0435 Try to fulfill the requirement on the minimum number of events per IOV
0436 in the alignment file list by picking files from the validation list.
0437 """
0438
0439 for iov in self._iovs:
0440 if self._iov_info_alignment[iov]["events"] >= self._args.minimum_events_in_iov: continue
0441 for fileinfo in self._files_validation[:]:
0442 dataset, f, number_of_events, runs = fileinfo
0443 iovs = self._get_iovs(runs)
0444 if iov in iovs:
0445 self._files_alignment.append(fileinfo)
0446 self._events_for_alignment += number_of_events
0447 self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
0448
0449 self._events_for_validation -= number_of_events
0450 self._remove_file_info(self._iov_info_validation, iovs, fileinfo)
0451 if self._args.run_by_run:
0452 self._remove_file_info(self._run_info, runs, fileinfo)
0453 self._files_validation.remove(fileinfo)
0454
0455 if (self._iov_info_alignment[iov]["events"]
0456 >= self._args.minimum_events_in_iov):
0457 break
0458
0459 def _split_hippy_jobs(self):
0460 hippyjobs = {}
0461 for dataset, miniiov in itertools.product(self._datasets, self._miniiovs):
0462 jobsforminiiov = []
0463 hippyjobs[dataset,miniiov] = jobsforminiiov
0464 eventsinthisjob = float("inf")
0465 for fileinfo in self._files_alignment:
0466 if fileinfo.dataset != dataset: continue
0467 miniiovs = set(self._get_iovs(fileinfo.runs, useminiiovs=True))
0468 if miniiov not in miniiovs: continue
0469 if len(miniiovs) > 1:
0470 hippyjobs[dataset,miniiov] = []
0471 if eventsinthisjob >= self._args.hippy_events_per_job:
0472 currentjob = []
0473 jobsforminiiov.append(currentjob)
0474 eventsinthisjob = 0
0475 currentjob.append(fileinfo)
0476 currentjob.sort()
0477 eventsinthisjob += fileinfo.nevents
0478
0479 self._hippy_jobs = {
0480 (dataset, iov): sum((hippyjobs[dataset, miniiov]
0481 for miniiov in self._miniiovs
0482 if iov == max(_ for _ in self._iovs if _ <= miniiov)), []
0483 )
0484 for dataset, iov in itertools.product(self._datasets, self._iovs)
0485 }
0486
0487 def _print_eventcounts(self):
0488 """Print the event counts per file list and per IOV."""
0489
0490 log = os.path.join(self._output_dir, FileListCreator._event_count_log)
0491
0492 print_msg("Using {0:d} events for alignment ({1:.2f}%)."
0493 .format(self._events_for_alignment,
0494 100.0*
0495 self._events_for_alignment/self._events_in_dataset),
0496 log_file = log)
0497 for iov in sorted(self._iov_info_alignment):
0498 print_msg(("Approximate events" if self.rereco else "Events") + " for alignment in IOV since {0:f}: {1:f}"
0499 .format(iov, self._iov_info_alignment[iov]["events"]),
0500 log_file = log)
0501
0502 print_msg("Using {0:d} events for validation ({1:.2f}%)."
0503 .format(self._events_for_validation,
0504 100.0*
0505 self._events_for_validation/self._events_in_dataset),
0506 log_file = log)
0507
0508 for iov in sorted(self._iov_info_validation):
0509 msg = ("Approximate events" if self.rereco else "Events") + " for validation in IOV since {0:f}: {1:f}".format(
0510 iov, self._iov_info_validation[iov]["events"])
0511 if (self._iov_info_validation[iov]["events"]
0512 < self._args.minimum_events_validation):
0513 msg += " (not enough events -> no dataset file will be created)"
0514 print_msg(msg, log_file = log)
0515
0516 for run in sorted(self._run_info):
0517 msg = ("Approximate events" if self.rereco else "Events") + " for validation in run {0:f}: {1:f}".format(
0518 run, self._run_info[run]["events"])
0519 if (self._run_info[run]["events"]
0520 < self._args.minimum_events_validation):
0521 msg += " (not enough events -> no dataset file will be created)"
0522 print_msg(msg, log_file = log)
0523
0524 unused_events = (self._events_in_dataset
0525 - self._events_for_validation
0526 - self._events_for_alignment)
0527 if unused_events > 0 != self._events_in_dataset:
0528 print_msg("Unused events: {0:d} ({1:.2f}%)"
0529 .format(unused_events,
0530 100.0*unused_events/self._events_in_dataset),
0531 log_file = log)
0532
0533
0534 def _create_dataset_ini_section(self, name, collection, json_file = None):
0535 """Write dataset ini snippet.
0536
0537 Arguments:
0538 - `name`: name of the dataset section
0539 - `collection`: track collection of this dataset
0540 - `json_file`: JSON file to be used for this dataset (optional)
0541 """
0542
0543 if json_file:
0544 splitted = name.split("_since")
0545 file_list = "_since".join(splitted[:-1]
0546 if len(splitted) > 1
0547 else splitted)
0548 else:
0549 file_list = name
0550 output = "[dataset:{}]\n".format(name)
0551 output += "collection = {}\n".format(collection)
0552 output += "inputFileList = ${{datasetdir}}/{}.txt\n".format(file_list)
0553 output += "json = ${{datasetdir}}/{}\n".format(json_file) if json_file else ""
0554
0555 if collection in ("ALCARECOTkAlCosmicsCTF0T",
0556 "ALCARECOTkAlCosmicsInCollisions"):
0557 if self._first_dataset_ini:
0558 print_msg("\tDetermined cosmics dataset, i.e. please replace "
0559 "'DUMMY_DECO_MODE_FLAG' and 'DUMMY_ZERO_TESLA_FLAG' "
0560 "with the correct values.")
0561 self._first_dataset_ini = False
0562 output += "cosmicsDecoMode = DUMMY_DECO_MODE_FLAG\n"
0563 output += "cosmicsZeroTesla = DUMMY_ZERO_TESLA_FLAG\n"
0564 output += "\n"
0565
0566 return output
0567
0568
0569 def _create_json_file(self, name, first, last = None):
0570 """
0571 Create JSON file with `name` covering runs from `first` to `last`. If a
0572 global JSON is provided, the resulting file is the intersection of the
0573 file created here and the global one.
0574 Returns the name of the created JSON file.
0575
0576 Arguments:
0577 - `name`: name of the creted JSON file
0578 - `first`: first run covered by the JSON file
0579 - `last`: last run covered by the JSON file
0580
0581 """
0582
0583 if last is None: last = self._max_run
0584 name += "_JSON.txt"
0585 print_msg("Creating JSON file: "+name)
0586
0587 json_file = LumiList.LumiList(runs = range(first, last+1))
0588 if self._args.json:
0589 global_json = LumiList.LumiList(filename = self._args.json)
0590 json_file = json_file & global_json
0591 json_file.writeJSON(os.path.join(self._output_dir, name))
0592
0593 return name
0594
0595
0596 def _get_track_collection(self, edm_file):
0597 """Extract track collection from given `edm_file`.
0598
0599 Arguments:
0600 - `edm_file`: CMSSW dataset file
0601 """
0602
0603
0604 cmd = ["edmDumpEventContent", r"root://cms-xrd-global.cern.ch/"+edm_file]
0605 try:
0606 event_content = subprocess.check_output(cmd).split("\n")
0607 except subprocess.CalledProcessError as e:
0608 splitted = edm_file.split("/")
0609 try:
0610 alcareco = splitted[splitted.index("ALCARECO")+1].split("-")[0]
0611 alcareco = alcareco.replace("TkAlCosmics0T", "TkAlCosmicsCTF0T")
0612 alcareco = "ALCARECO" + alcareco
0613 print_msg("\tDetermined track collection as '{}'.".format(alcareco))
0614 return alcareco
0615 except ValueError:
0616 if "RECO" in splitted:
0617 print_msg("\tDetermined track collection as 'generalTracks'.")
0618 return "generalTracks"
0619 else:
0620 print_msg("\tCould not determine track collection "
0621 "automatically.")
0622 print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
0623 "the correct value.")
0624 return "DUMMY_TRACK_COLLECTION"
0625
0626 track_collections = []
0627 for line in event_content:
0628 splitted = line.split()
0629 if len(splitted) > 0 and splitted[0] == r"vector<reco::Track>":
0630 track_collections.append(splitted[1].strip().strip('"'))
0631 if len(track_collections) == 0:
0632 print_msg("No track collection found in file '{}'.".format(edm_file))
0633 sys.exit(1)
0634 elif len(track_collections) == 1:
0635 print_msg("\tDetermined track collection as "
0636 "'{}'.".format(track_collections[0]))
0637 return track_collections[0]
0638 else:
0639 alcareco_tracks = filter(lambda x: x.startswith("ALCARECO"),
0640 track_collections)
0641 if len(alcareco_tracks) == 0 and "generalTracks" in track_collections:
0642 print_msg("\tDetermined track collection as 'generalTracks'.")
0643 return "generalTracks"
0644 elif len(alcareco_tracks) == 1:
0645 print_msg("\tDetermined track collection as "
0646 "'{}'.".format(alcareco_tracks[0]))
0647 return alcareco_tracks[0]
0648 print_msg("\tCould not unambiguously determine track collection in "
0649 "file '{}':".format(edm_file))
0650 print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
0651 "the correct value from the following list.")
0652 for collection in track_collections:
0653 print_msg("\t - "+collection)
0654 return "DUMMY_TRACK_COLLECTION"
0655
0656
0657 def _write_file_lists(self):
0658 """Write file lists to disk."""
0659
0660 self._create_dataset_txt(self._formatted_dataset, self._files_alignment)
0661 self._create_hippy_txt(self._formatted_dataset, sum(self._hippy_jobs.values(), []))
0662 self._create_dataset_cff(
0663 "_".join(["Alignment", self._formatted_dataset]),
0664 self._files_alignment)
0665
0666 self._create_dataset_cff(
0667 "_".join(["Validation", self._formatted_dataset]),
0668 self._files_validation)
0669
0670
0671 if self._args.create_ini:
0672 dataset_ini_general = "[general]\n"
0673 dataset_ini_general += "datasetdir = {}\n".format(self._output_dir)
0674 dataset_ini_general += ("json = {}\n\n".format(self._args.json)
0675 if self._args.json
0676 else "\n")
0677
0678 ini_path = self._formatted_dataset + ".ini"
0679 print_msg("Creating dataset ini file: " + ini_path)
0680 ini_path = os.path.join(self._output_dir, ini_path)
0681
0682 collection = self._get_track_collection(self._files[0])
0683
0684 with open(ini_path, "w") as f:
0685 f.write(dataset_ini_general)
0686 f.write(self._create_dataset_ini_section(
0687 self._formatted_dataset, collection))
0688
0689 iov_wise_ini = dataset_ini_general
0690
0691 for i,iov in enumerate(sorted(self._iovs)):
0692 iov_str = "since{0:d}".format(iov)
0693 iov_str = "_".join([self._formatted_dataset, iov_str])
0694
0695 if self.rereco:
0696 if i == len(self._iovs) - 1:
0697 last = None
0698 else:
0699 last = sorted(self._iovs)[i+1] - 1
0700 local_json = self._create_json_file(iov_str, iov, last)
0701 else:
0702 local_json = None
0703
0704 if self._args.create_ini:
0705 iov_wise_ini += self._create_dataset_ini_section(iov_str,
0706 collection,
0707 local_json)
0708
0709 self._create_dataset_txt(iov_str,
0710 self._iov_info_alignment[iov]["files"])
0711 self._create_hippy_txt(iov_str, sum((self._hippy_jobs[dataset,iov] for dataset in self._datasets), []))
0712 self._create_dataset_cff(
0713 "_".join(["Alignment", iov_str]),
0714 self._iov_info_alignment[iov]["files"],
0715 json_file=local_json)
0716
0717 if (self._iov_info_validation[iov]["events"]
0718 < self._args.minimum_events_validation):
0719 continue
0720 self._create_dataset_cff(
0721 "_".join(["Validation", iov_str]),
0722 self._iov_info_validation[iov]["files"],
0723 json_file=local_json)
0724
0725 if self._args.create_ini and iov_wise_ini != dataset_ini_general:
0726 ini_path = self._formatted_dataset + "_IOVs.ini"
0727 print_msg("Creating dataset ini file: " + ini_path)
0728 ini_path = os.path.join(self._output_dir, ini_path)
0729 with open(ini_path, "w") as f: f.write(iov_wise_ini)
0730
0731 for run in sorted(self._run_info):
0732 if args.rereco: continue
0733 if (self._run_info[run]["events"]
0734 < self._args.minimum_events_validation):
0735 continue
0736 self._create_dataset_cff(
0737 "_".join(["Validation", self._formatted_dataset, str(run)]),
0738 self._run_info[run]["files"])
0739
0740
0741 def _create_dataset_txt(self, name, file_list):
0742 """Write alignment file list to disk.
0743
0744 Arguments:
0745 - `name`: name of the file list
0746 - `file_list`: list of files to write to `name`
0747 """
0748
0749 name += ".txt"
0750 print_msg("Creating dataset file list: "+name)
0751 with open(os.path.join(self._output_dir, name), "w") as f:
0752 f.write("\n".join(fileinfo.name for fileinfo in file_list))
0753
0754
0755 def _create_hippy_txt(self, name, job_list):
0756 name += "_hippy.txt"
0757 print_msg("Creating dataset file list for HipPy: "+name)
0758 with open(os.path.join(self._output_dir, name), "w") as f:
0759 f.write("\n".join(",".join("'"+fileinfo.name+"'" for fileinfo in job) for job in job_list)+"\n")
0760
0761
0762 def _create_dataset_cff(self, name, file_list, json_file = None):
0763 """
0764 Create configuration fragment to define a dataset.
0765
0766 Arguments:
0767 - `name`: name of the configuration fragment
0768 - `file_list`: list of files to write to `name`
0769 - `json_file`: JSON file to be used for this dataset (optional)
0770 """
0771
0772 if json_file is None: json_file = self._args.json
0773 if json_file is not None:
0774 json_file = os.path.join(self._output_dir, json_file)
0775
0776 name = "_".join(["Dataset",name, "cff.py"])
0777 print_msg("Creating dataset configuration fragment: "+name)
0778
0779 file_list_str = ""
0780 for sub_list in get_chunks(file_list, 255):
0781 file_list_str += ("readFiles.extend([\n'"+
0782 "',\n'".join(fileinfo.name for fileinfo in sub_list)+
0783 "'\n])\n")
0784
0785 fragment = FileListCreator._dataset_template.format(
0786 lumi_def = ("import FWCore.PythonUtilities.LumiList as LumiList\n\n"
0787 "lumiSecs = cms.untracked.VLuminosityBlockRange()\n"
0788 "goodLumiSecs = LumiList.LumiList(filename = "
0789 "'{0:s}').getCMSSWString().split(',')"
0790 .format(json_file)
0791 if json_file else ""),
0792 lumi_arg = ("lumisToProcess = lumiSecs,\n "
0793 if json_file else ""),
0794 lumi_extend = "lumiSecs.extend(goodLumiSecs)" if json_file else "",
0795 files = file_list_str)
0796
0797 with open(os.path.join(self._output_dir, name), "w") as f:
0798 f.write(fragment)
0799
0800
0801 _dataset_template = """\
0802 import FWCore.ParameterSet.Config as cms
0803 {lumi_def:s}
0804 readFiles = cms.untracked.vstring()
0805 source = cms.Source("PoolSource",
0806 {lumi_arg:s}fileNames = readFiles)
0807 {files:s}{lumi_extend:s}
0808 maxEvents = cms.untracked.PSet(input = cms.untracked.int32(-1))
0809 """
0810
0811
0812 class _DasCache(object):
0813 """Helper class to cache information from DAS requests."""
0814
0815 def __init__(self, file_list_id):
0816 """Constructor of the cache.
0817
0818 Arguments:
0819 - `file_list_id`: ID of the cached file lists
0820 """
0821
0822 self._file_list_id = file_list_id
0823 self._cache_file_name = os.path.join(file_list_id, ".das_cache.pkl")
0824 self.reset()
0825
0826
0827 def reset(self):
0828 """Reset the cache contents and the 'empty' flag."""
0829
0830 self._empty = True
0831 self._events_in_dataset = 0
0832 self._files = []
0833 self._file_info = []
0834 self._max_run = None
0835
0836
0837 def set(self, total_events, file_list, file_info, max_run):
0838 """Set the content of the cache.
0839
0840 Arguments:
0841 - `total_events`: total number of events in dataset
0842 - `file_list`: list of files in dataset
0843 - `file_info`: dictionary with numbers of events per file
0844 - `max_run`: highest run number contained in the dataset
0845 """
0846
0847 self._events_in_dataset = total_events
0848 self._files = file_list
0849 self._file_info = file_info
0850 self._max_run = max_run
0851 self._empty = False
0852
0853
0854 def get(self):
0855 """
0856 Get the content of the cache as tuple:
0857 result = (total number of events in dataset,
0858 list of files in dataset,
0859 dictionary with numbers of events and runs per file)
0860 """
0861
0862 return self._events_in_dataset, self._files, self._file_info, self._max_run
0863
0864
0865 def load(self):
0866 """Loads the cached contents."""
0867
0868 if not self.empty:
0869 print_msg("Overriding file information with cached information.")
0870 try:
0871 with open(self._cache_file_name, "rb") as f:
0872 tmp_dict = cPickle.load(f)
0873 self.__dict__.update(tmp_dict)
0874 except IOError as e:
0875 if e.args == (2, "No such file or directory"):
0876 msg = "Failed to load cache for '{}'.".format(self._file_list_id)
0877 if not self.empty:
0878 msg += " Keeping the previous file information."
0879 print_msg(msg)
0880 else:
0881 raise
0882
0883
0884 def dump(self):
0885 """Dumps the contents to the cache file."""
0886
0887 if self.empty:
0888 print_msg("Cache is empty. Not writing to file.")
0889 return
0890
0891 with open(self._cache_file_name, "wb") as f:
0892 cPickle.dump(self.__dict__, f, 2)
0893
0894
0895 @property
0896 def empty(self):
0897 """
0898 Flag indicating whether the cache is empty or has been filled (possibly
0899 with nothing).
0900 """
0901
0902 return self._empty
0903
0904
0905
0906
0907 def das_client(query, check_key = None):
0908 """
0909 Submit `query` to DAS client and handle possible errors.
0910 Further treatment of the output might be necessary.
0911
0912 Arguments:
0913 - `query`: DAS query
0914 - `check_key`: optional key to be checked for; retriggers query if needed
0915 """
0916
0917 error = True
0918 das_data = {'status': 'error'}
0919 for i in range(5):
0920 try:
0921 das_data = cmssw_das_client.get_data(query, limit = 0)
0922 except IOError as e:
0923 if e.errno == 14:
0924 continue
0925 except ValueError as e:
0926 if str(e) == "No JSON object could be decoded":
0927 das_data['reason'] = str(e)
0928 continue
0929
0930 if das_data["status"] == "ok":
0931 if das_data["nresults"] == 0 or check_key is None:
0932 error = False
0933 break
0934
0935 result_count = 0
0936 for d in find_key(das_data["data"], [check_key]):
0937 result_count += len(d)
0938 if result_count == 0:
0939 das_data["status"] = "error"
0940 das_data["reason"] = ("DAS did not return required data.")
0941 continue
0942 else:
0943 error = False
0944 break
0945
0946 if das_data["status"] == "error":
0947 print_msg("DAS query '{}' failed 5 times. "
0948 "The last time for the the following reason:".format(query))
0949 print(das_data.get("reason", "ERROR:UNKNOWN"))
0950 sys.exit(1)
0951 return das_data["data"]
0952
0953
0954 def find_key(collection, key_chain):
0955 """Searches for `key` in `collection` and returns first corresponding value.
0956
0957 Arguments:
0958 - `collection`: list of dictionaries
0959 - `key_chain`: chain of keys to be searched for
0960 """
0961
0962 result = None
0963 for i,key in enumerate(key_chain):
0964 for item in collection:
0965 if key in item:
0966 if i == len(key_chain) - 1:
0967 result = item[key]
0968 else:
0969 try:
0970 result = find_key(item[key], key_chain[i+1:])
0971 except LookupError:
0972 pass
0973 else:
0974 pass
0975
0976 if result is not None: return result
0977 raise LookupError(key_chain, collection)
0978
0979
0980 def print_msg(text, line_break = True, log_file = None):
0981 """Formatted printing of `text`.
0982
0983 Arguments:
0984 - `text`: string to be printed
0985 """
0986
0987 msg = " >>> " + str(text)
0988 if line_break:
0989 print(msg)
0990 else:
0991 print(msg, end=' ')
0992 sys.stdout.flush()
0993 if log_file:
0994 with open(log_file, "a") as f: f.write(msg+"\n")
0995 return msg
0996
0997
0998 def get_runs(file_name):
0999 """
1000 Try to guess the run number from `file_name`. If run could not be
1001 determined, gets the run numbers from DAS (slow!)
1002
1003 Arguments:
1004 - `file_name`: name of the considered file
1005 """
1006 try:
1007 return [int("".join(file_name.split("/")[-4:-2]))]
1008 except ValueError:
1009 query = "run file="+file_name+" system=dbs3"
1010 return [int(_) for _ in find_key(das_client(query), ["run", "run_number"])]
1011
1012
1013 def get_max_run(dataset_name):
1014 """Retrieve the maximum run number in `dataset_name`.
1015
1016 Arguments:
1017 - `dataset_name`: name of the dataset
1018 """
1019
1020 data = das_client("run dataset={0:s} system=dbs3".format(dataset_name))
1021 runs = [f["run"][0]["run_number"] for f in data]
1022 return max(runs)
1023
1024
1025 def get_files(dataset_name):
1026 """Retrieve list of files in `dataset_name`.
1027
1028 Arguments:
1029 - `dataset_name`: name of the dataset
1030 """
1031
1032 data = das_client(("file dataset={0:s} system=dbs3 detail=True | "+
1033 "grep file.name, file.nevents > 0").format(dataset_name),
1034 "file")
1035 return [find_key(f["file"], ["name"]) for f in data]
1036
1037
1038 def get_datasets(dataset_pattern):
1039 """Retrieve list of dataset matching `dataset_pattern`.
1040
1041 Arguments:
1042 - `dataset_pattern`: pattern of dataset names
1043 """
1044
1045 data = das_client("dataset dataset={0:s} system=dbs3 detail=True"
1046 "| grep dataset.name".format(dataset_pattern), "dataset")
1047 return sorted(set([find_key(f["dataset"], ["name"]) for f in data]))
1048
1049
1050 def get_events_per_dataset(dataset_name):
1051 """Retrieve the number of a events in `dataset_name`.
1052
1053 Arguments:
1054 - `dataset_name`: name of a dataset
1055 """
1056
1057 return _get_events("dataset", dataset_name)
1058
1059
1060 def get_events_per_file(file_name):
1061 """Retrieve the number of a events in `file_name`.
1062
1063 Arguments:
1064 - `file_name`: name of a dataset file
1065 """
1066
1067 return _get_events("file", file_name)
1068
1069
1070 def _get_events(entity, name):
1071 """Retrieve the number of events from `entity` called `name`.
1072
1073 Arguments:
1074 - `entity`: type of entity
1075 - `name`: name of entity
1076 """
1077
1078 data = das_client("{0:s}={1:s} system=dbs3 detail=True | grep {0:s}.nevents"
1079 .format(entity, name), entity)
1080 return int(find_key(data, [entity, "nevents"]))
1081
1082
1083 def _get_properties(name, entity, properties, filters = None, sub_entity = None,
1084 aggregators = None):
1085 """Retrieve `properties` from `entity` called `name`.
1086
1087 Arguments:
1088 - `name`: name of entity
1089 - `entity`: type of entity
1090 - `properties`: list of property names
1091 - `filters`: list of filters on properties
1092 - `sub_entity`: type of entity from which to extract the properties;
1093 defaults to `entity`
1094 - `aggregators`: additional aggregators/filters to amend to query
1095 """
1096
1097 if sub_entity is None: sub_entity = entity
1098 if filters is None: filters = []
1099 props = ["{0:s}.{1:s}".format(sub_entity,prop.split()[0])
1100 for prop in properties]
1101 conditions = ["{0:s}.{1:s}".format(sub_entity, filt)
1102 for filt in filters]
1103 add_ons = "" if aggregators is None else " | "+" | ".join(aggregators)
1104
1105 data = das_client("{0:s} {1:s}={2:s} system=dbs3 detail=True | grep {3:s}{4:s}"
1106 .format(sub_entity, entity, name,
1107 ", ".join(props+conditions), add_ons), sub_entity)
1108 return [[find_key(f[sub_entity], [prop]) for prop in properties] for f in data]
1109
1110 def get_file_info(dataset):
1111 result = _get_properties(name=dataset,
1112 properties = ["name", "nevents"],
1113 filters = ["nevents > 0"],
1114 entity = "dataset",
1115 sub_entity = "file")
1116 return [(dataset, name, nevents) for name, nevents in result]
1117
1118
1119
1120 FileInfo = collections.namedtuple("FileInfo", "dataset name nevents runs")
1121
1122 def _make_file_info(dataset_name_nevents):
1123 return FileInfo(*dataset_name_nevents, runs=get_runs(dataset_name_nevents[1]))
1124
1125 def get_chunks(long_list, chunk_size):
1126 """
1127 Generates list of sub-lists of `long_list` with a maximum size of
1128 `chunk_size`.
1129
1130 Arguments:
1131 - `long_list`: original list
1132 - `chunk_size`: maximum size of created sub-lists
1133 """
1134
1135 for i in range(0, len(long_list), chunk_size):
1136 yield long_list[i:i+chunk_size]
1137
1138
1139 def merge_strings(strings):
1140 """Merge strings in `strings` into a common string.
1141
1142 Arguments:
1143 - `strings`: list of strings
1144 """
1145
1146 if type(strings) == str:
1147 return strings
1148 elif len(strings) == 0:
1149 return ""
1150 elif len(strings) == 1:
1151 return strings[0]
1152 elif len(strings) == 2:
1153 first = strings[0]
1154 second = strings[1]
1155 else:
1156 first = merge_strings(strings[:-1])
1157 second = strings[-1]
1158
1159 merged_string = ""
1160 blocks = difflib.SequenceMatcher(None, first, second).get_matching_blocks()
1161
1162 last_i, last_j, last_n = 0, 0, 0
1163 for i, j, n in blocks:
1164 merged_string += first[last_i+last_n:i]
1165 merged_string += second[last_j+last_n:j]
1166 merged_string += first[i:i+n]
1167 last_i, last_j, last_n = i, j, n
1168
1169 return str(merged_string)
1170
1171
1172
1173 if __name__ == "__main__":
1174 try:
1175 main()
1176 except KeyboardInterrupt:
1177 pass