Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2023-03-17 10:38:47

0001 #!/usr/bin/env python3
0002 
0003 from __future__ import print_function
0004 
0005 from builtins import range
0006 import os
0007 import re
0008 import sys
0009 import glob
0010 import json
0011 import math
0012 import bisect
0013 import random
0014 import signal
0015 if sys.version_info[0]>2:
0016   import _pickle as cPickle
0017 else:
0018   import cPickle
0019 import difflib
0020 import argparse
0021 import functools
0022 import itertools
0023 import subprocess
0024 import collections
0025 import multiprocessing
0026 import FWCore.PythonUtilities.LumiList as LumiList
0027 import Utilities.General.cmssw_das_client as cmssw_das_client
0028 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0029 
0030 
0031 ################################################################################
0032 def main(argv = None):
0033     """
0034     Main routine. Not called, if this module is loaded via `import`.
0035 
0036     Arguments:
0037     - `argv`: Command line arguments passed to the script.
0038     """
0039 
0040     if argv == None:
0041         argv = sys.argv[1:]
0042 
0043     file_list_creator = FileListCreator(argv)
0044     file_list_creator.create()
0045 
0046 
0047 ################################################################################
0048 class FileListCreator(object):
0049     """Create file lists for alignment and validation for a given dataset.
0050     """
0051 
0052     def __init__(self, argv):
0053         """Constructor taking the command line arguments.
0054 
0055         Arguments:
0056         - `args`: command line arguments
0057         """
0058 
0059         self._first_dataset_ini = True
0060         self._parser = self._define_parser()
0061         self._args = self._parser.parse_args(argv)
0062 
0063         if not mps_tools.check_proxy():
0064             print_msg(
0065                 "Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0066             sys.exit(1)
0067 
0068         self._dataset_regex = re.compile(r"^/([^/]+)/([^/]+)/([^/]+)$")
0069         self._validate_input()
0070 
0071         if self._args.test_mode:
0072             import Configuration.PyReleaseValidation.relval_steps as rvs
0073             import Configuration.PyReleaseValidation.relval_production as rvp
0074             self._args.datasets = [rvs.steps[rvp.workflows[1000][1][0]]["INPUT"].dataSet]
0075             self._validate_input() # ensure that this change is valid
0076 
0077         self._datasets = sorted([dataset
0078                                  for pattern in self._args.datasets
0079                                  for dataset in get_datasets(pattern)
0080                                  if re.search(self._args.dataset_filter, dataset)])
0081         if len(self._datasets) == 0:
0082             print_msg("Found no dataset matching the pattern(s):")
0083             for d in self._args.datasets: print_msg("\t"+d)
0084             sys.exit(1)
0085 
0086         self._formatted_dataset = merge_strings(
0087             [re.sub(self._dataset_regex, r"\1_\2_\3", dataset)
0088              for dataset in self._datasets])
0089         self._output_dir = os.path.join(self._args.output_dir,
0090                                         self._formatted_dataset)
0091         self._output_dir = os.path.abspath(self._output_dir)
0092         self._cache = _DasCache(self._output_dir)
0093         self._prepare_iov_datastructures()
0094         self._prepare_run_datastructures()
0095 
0096         try:
0097             os.makedirs(self._output_dir)
0098         except OSError as e:
0099             if e.args == (17, "File exists"):
0100                 if self._args.force:
0101                     pass        # do nothing, just clear the existing output
0102                 elif self._args.use_cache:
0103                     self._cache.load() # load cache before clearing the output
0104                 else:
0105                     print_msg("Directory '{}' already exists from previous runs"
0106                               " of the script. Use '--use-cache' if you want to"
0107                               " use the cached DAS-query results Or use "
0108                               "'--force' to remove it."
0109                               .format(self._output_dir))
0110                     sys.exit(1)
0111                 files = glob.glob(os.path.join(self._output_dir, "*"))
0112                 for f in files: os.remove(f)
0113             else:
0114                 raise
0115 
0116 
0117     def create(self):
0118         """Creates file list. To be called by user of the class."""
0119 
0120         self._request_dataset_information()
0121         self._create_file_lists()
0122         self._print_eventcounts()
0123         self._write_file_lists()
0124 
0125 
0126     _event_count_log = "event_count_info.log"
0127 
0128 
0129     def _define_parser(self):
0130         """Definition of command line argument parser."""
0131 
0132         parser = argparse.ArgumentParser(
0133             description = "Create file lists for alignment",
0134             epilog = ("The tool will create a directory containing all file "
0135                       "lists and a log file with all relevant event counts "
0136                       "('{}').".format(FileListCreator._event_count_log)))
0137         parser.add_argument("-i", "--input", dest = "datasets", required = True,
0138                             metavar = "DATASET", action = "append",
0139                             help = ("CMS dataset name; supports wildcards; "
0140                                     "use multiple times for multiple datasets"))
0141         parser.add_argument("--dataset-filter", default = "",
0142                             help = "regex to match within in the datasets matched,"
0143                                    "in case the wildcard isn't flexible enough")
0144         parser.add_argument("-j", "--json", dest = "json", metavar = "PATH",
0145                             help = "path to JSON file (optional)")
0146         parser.add_argument("-f", "--fraction", dest = "fraction",
0147                             type = float, default = 1,
0148                             help = "max. fraction of files used for alignment")
0149         parser.add_argument("--iov", dest = "iovs", metavar = "RUN", type = int,
0150                             action = "append", default = [],
0151                             help = ("define IOV by specifying first run; for "
0152                                     "multiple IOVs use this option multiple "
0153                                     "times; files from runs before the lowest "
0154                                     "IOV are discarded (default: 1)"))
0155         parser.add_argument("--miniiov", dest="miniiovs", metavar="RUN", type=int,
0156                             action="append", default=[],
0157                             help=("in addition to the standard IOVs, break up hippy jobs "
0158                                   "at these points, so that jobs from before and after "
0159                                   "these runs are not in the same job"))
0160         parser.add_argument("-r", "--random", action = "store_true",
0161                             default = False, help = "select files randomly")
0162         parser.add_argument("-n", "--events-for-alignment", "--maxevents",
0163                             dest = "events", type = int, metavar = "NUMBER",
0164                             help = ("number of events needed for alignment; the"
0165                                     " remaining events in the dataset are used "
0166                                     "for validation; if n<=0, all events are "
0167                                     "used for validation"))
0168         parser.add_argument("--all-events", action = "store_true",
0169                             help = "Use all events for alignment")
0170         parser.add_argument("--tracks-for-alignment", dest = "tracks",
0171                             type = int, metavar = "NUMBER",
0172                             help = "number of tracks needed for alignment")
0173         parser.add_argument("--track-rate", dest = "rate", type = float,
0174                             metavar = "NUMBER",
0175                             help = "number of tracks per event")
0176         parser.add_argument("--run-by-run", dest = "run_by_run",
0177                             action = "store_true", default = False,
0178                             help = "create validation file list for each run")
0179         parser.add_argument("--minimum-events-in-iov",
0180                             dest = "minimum_events_in_iov", metavar = "NUMBER",
0181                             type = int, default = 100000,
0182                             help = ("minimum number of events for alignment per"
0183                                     " IOV; this option has a higher priority "
0184                                     "than '-f/--fraction' "
0185                                     "(default: %(default)s)"))
0186         parser.add_argument("--minimum-events-validation",
0187                             dest = "minimum_events_validation",
0188                             metavar = "NUMBER", type = int, default = 1,
0189                             help = ("minimum number of events for validation; "
0190                                     "applies to IOVs; in case of --run-by-run "
0191                                     "it applies to runs runs "
0192                                     "(default: %(default)s)"))
0193         parser.add_argument("--use-cache", dest = "use_cache",
0194                             action = "store_true", default = False,
0195                             help = "use DAS-query results of previous run")
0196         parser.add_argument("-o", "--output-dir", dest = "output_dir",
0197                             metavar = "PATH", default = os.getcwd(),
0198                             help = "output base directory (default: %(default)s)")
0199         parser.add_argument("--create-ini", dest = "create_ini",
0200                             action = "store_true", default = False,
0201                             help = ("create dataset ini file based on the "
0202                                     "created file lists"))
0203         parser.add_argument("--force", action = "store_true", default = False,
0204                             help = ("remove output directory from previous "
0205                                     "runs, if existing"))
0206         parser.add_argument("--hippy-events-per-job", type = int, default = 1,
0207                             help = ("approximate number of events in each job for HipPy"))
0208         parser.add_argument("--test-mode", dest = "test_mode",
0209                             action = "store_true", default = False,
0210                             help = argparse.SUPPRESS) # hidden option
0211         return parser
0212 
0213 
0214     def _validate_input(self):
0215         """Validate command line arguments."""
0216 
0217         if self._args.events is None:
0218             if self._args.all_events:
0219                 self._args.events = float("inf")
0220                 print_msg("Using all tracks for alignment")
0221             elif (self._args.tracks is None) and (self._args.rate is None):
0222                 msg = ("either -n/--events-for-alignment, --all-events, or both of "
0223                        "--tracks-for-alignment and --track-rate are required")
0224                 self._parser.error(msg)
0225             elif (((self._args.tracks is not None) and (self._args.rate is None)) or
0226                 ((self._args.rate is not None)and (self._args.tracks is None))):
0227                 msg = ("--tracks-for-alignment and --track-rate must be used "
0228                        "together")
0229                 self._parser.error(msg)
0230             else:
0231                 self._args.events = int(math.ceil(self._args.tracks /
0232                                                   self._args.rate))
0233                 print_msg("Requested {0:d} tracks with {1:.2f} tracks/event "
0234                           "-> {2:d} events for alignment."
0235                           .format(self._args.tracks, self._args.rate,
0236                                   self._args.events))
0237         else:
0238             if (self._args.tracks is not None) or (self._args.rate is not None) or self._args.all_events:
0239                 msg = ("-n/--events-for-alignment must not be used with "
0240                        "--tracks-for-alignment, --track-rate, or --all-events")
0241                 self._parser.error(msg)
0242             print_msg("Requested {0:d} events for alignment."
0243                       .format(self._args.events))
0244 
0245         for dataset in self._args.datasets:
0246             if not re.match(self._dataset_regex, dataset):
0247                 print_msg("Dataset pattern '"+dataset+"' is not in CMS format.")
0248                 sys.exit(1)
0249 
0250         nonzero_events_per_iov = (self._args.minimum_events_in_iov > 0)
0251         if nonzero_events_per_iov and self._args.fraction <= 0:
0252             print_msg("Setting minimum number of events per IOV for alignment "
0253                       "to 0 because a non-positive fraction of alignment events"
0254                       " is chosen: {}".format(self._args.fraction))
0255             nonzero_events_per_iov = False
0256             self._args.minimum_events_in_iov = 0
0257         if nonzero_events_per_iov and self._args.events <= 0:
0258             print_msg("Setting minimum number of events per IOV for alignment "
0259                       "to 0 because a non-positive number of alignment events"
0260                       " is chosen: {}".format(self._args.events))
0261             nonzero_events_per_iov = False
0262             self._args.minimum_events_in_iov = 0
0263 
0264 
0265     def _prepare_iov_datastructures(self):
0266         """Create the needed objects for IOV handling."""
0267 
0268         self._iovs = sorted(set(self._args.iovs))
0269         if len(self._iovs) == 0: self._iovs.append(1)
0270         self._iov_info_alignment = {iov: {"events": 0, "files": []}
0271                                         for iov in self._iovs}
0272         self._iov_info_validation = {iov: {"events": 0, "files": []}
0273                                          for iov in self._iovs}
0274 
0275         self._miniiovs = sorted(set(self._iovs) | set(self._args.miniiovs))
0276 
0277 
0278     def _get_iovs(self, runs, useminiiovs=False):
0279         """
0280         Return the IOV start for `run`. Returns 'None' if the run is before any
0281         defined IOV.
0282 
0283         Arguments:
0284         - `runs`: run numbers
0285         """
0286 
0287         iovlist = self._miniiovs if useminiiovs else self._iovs
0288 
0289         iovs = []
0290         for run in runs:
0291           iov_index = bisect.bisect(iovlist, run)
0292           if iov_index > 0: iovs.append(iovlist[iov_index-1])
0293         return iovs
0294 
0295 
0296     def _prepare_run_datastructures(self):
0297         """Create the needed objects for run-by-run validation file lists."""
0298 
0299         self._run_info = {}
0300 
0301 
0302     def _add_file_info(self, container, keys, fileinfo):
0303         """Add file with `file_name` to `container` using `key`.
0304 
0305         Arguments:
0306         - `container`: dictionary holding information on files and event counts
0307         - `keys`: keys to which the info should be added; will be created if not
0308                   existing
0309         - `file_name`: name of a dataset file
0310         """
0311 
0312         for key in keys:
0313             if key not in container:
0314                 container[key] = {"events": 0,
0315                                   "files": []}
0316             container[key]["events"] += fileinfo.nevents / len(keys)
0317             if fileinfo not in container[key]["files"]:
0318                 container[key]["files"].append(fileinfo)
0319 
0320 
0321     def _remove_file_info(self, container, keys, fileinfo):
0322         """Remove file with `file_name` to `container` using `key`.
0323 
0324         Arguments:
0325         - `container`: dictionary holding information on files and event counts
0326         - `keys`: keys from which the info should be removed
0327         - `file_name`: name of a dataset file
0328         - `event_count`: number of events in `file_name`
0329         """
0330 
0331         for key in keys:
0332             if key not in container: continue
0333             try:
0334                 index = container[key]["files"].index(fileinfo)
0335             except ValueError:      # file not found
0336                 return
0337             del container[key]["files"][index]
0338             container[key]["events"] -= fileinfo.nevents / len(keys)
0339 
0340 
0341     def _request_dataset_information(self):
0342         """Retrieve general dataset information and create file list."""
0343 
0344         if not self._cache.empty:
0345             print_msg("Using cached information.")
0346             (self._events_in_dataset,
0347              self._files,
0348              self._file_info,
0349              self._max_run) = self._cache.get()
0350             self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
0351             if self._args.random: random.shuffle(self._files)
0352             return
0353 
0354         # workaround to deal with KeyboardInterrupts in the worker processes:
0355         # - ignore interrupt signals in workers (see initializer)
0356         # - use a timeout of size sys.maxsize to avoid a bug in multiprocessing
0357         number_of_processes = multiprocessing.cpu_count() - 1
0358         number_of_processes = (number_of_processes
0359                                if number_of_processes > 0
0360                                else 1)
0361         pool = multiprocessing.Pool(
0362             processes = number_of_processes,
0363             initializer = lambda: signal.signal(signal.SIGINT, signal.SIG_IGN))
0364 
0365         print_msg("Requesting information for the following dataset(s):")
0366         for d in self._datasets: print_msg("\t"+d)
0367         print_msg("This may take a while...")
0368 
0369         result = pool.map_async(get_events_per_dataset, self._datasets).get(3600)
0370         self._events_in_dataset = sum(result)
0371 
0372         result = pool.map_async(get_max_run, self._datasets).get(3600)
0373         self._max_run = max(result)
0374 
0375         result = sum(pool.map_async(get_file_info, self._datasets).get(3600), [])
0376         files = pool.map_async(_make_file_info, result).get(3600)
0377         self._file_info = sorted(fileinfo for fileinfo in files)
0378 
0379         self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
0380 
0381         if self._args.test_mode:
0382             self._file_info = self._file_info[-200:] # take only last chunk of files
0383         self._files = [fileinfo.name for fileinfo in self._file_info]
0384 
0385         # write information to cache
0386         self._cache.set(self._events_in_dataset, self._files, self._file_info,
0387                         self._max_run)
0388         self._cache.dump()
0389         if self._args.random:
0390           random.shuffle(self._file_info)
0391           self._files = [fileinfo.name for fileinfo in self._file_info]
0392 
0393     def _create_file_lists(self):
0394         """Create file lists for alignment and validation."""
0395 
0396         # collect files for alignment until minimal requirements are fulfilled
0397         self._files_alignment = []
0398         self._files_validation = []
0399         self._events_for_alignment = 0
0400         self._events_for_validation = 0
0401 
0402         max_range = (0
0403                      if self._args.events <= 0
0404                      else int(math.ceil(len(self._files)*self._args.fraction)))
0405         use_for_alignment = True
0406         for i, fileinfo in enumerate(self._file_info):
0407             enough_events = self._events_for_alignment >= self._args.events
0408             fraction_exceeded = i >= max_range
0409             if enough_events or fraction_exceeded: use_for_alignment = False
0410 
0411             dataset, f, number_of_events, runs = fileinfo
0412 
0413             iovs = self._get_iovs(runs)
0414             if use_for_alignment:
0415                 if iovs:
0416                     self._events_for_alignment += number_of_events
0417                     self._files_alignment.append(fileinfo)
0418                     self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
0419                 else:
0420                     max_range += 1 # not used -> discard in fraction calculation
0421             else:
0422                 if iovs:
0423                     self._events_for_validation += number_of_events
0424                     self._files_validation.append(fileinfo)
0425                     self._add_file_info(self._iov_info_validation, iovs, fileinfo)
0426                     if self._args.run_by_run:
0427                         self._add_file_info(self._run_info, runs, fileinfo)
0428 
0429         self._fulfill_iov_eventcount()
0430 
0431         self._split_hippy_jobs()
0432 
0433 
0434     def _fulfill_iov_eventcount(self):
0435         """
0436         Try to fulfill the requirement on the minimum number of events per IOV
0437         in the alignment file list by picking files from the validation list.
0438         """
0439 
0440         for iov in self._iovs:
0441             if self._iov_info_alignment[iov]["events"] >= self._args.minimum_events_in_iov: continue
0442             for fileinfo in self._files_validation[:]:
0443                 dataset, f, number_of_events, runs = fileinfo
0444                 iovs = self._get_iovs(runs)
0445                 if iov in iovs:
0446                     self._files_alignment.append(fileinfo)
0447                     self._events_for_alignment += number_of_events
0448                     self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
0449 
0450                     self._events_for_validation -= number_of_events
0451                     self._remove_file_info(self._iov_info_validation, iovs, fileinfo)
0452                     if self._args.run_by_run:
0453                         self._remove_file_info(self._run_info, runs, fileinfo)
0454                     self._files_validation.remove(fileinfo)
0455 
0456                     if (self._iov_info_alignment[iov]["events"]
0457                         >= self._args.minimum_events_in_iov):
0458                         break   # break the file loop if already enough events
0459 
0460     def _split_hippy_jobs(self):
0461         hippyjobs = {}
0462         for dataset, miniiov in itertools.product(self._datasets, self._miniiovs):
0463             jobsforminiiov = []
0464             hippyjobs[dataset,miniiov] = jobsforminiiov
0465             eventsinthisjob = float("inf")
0466             for fileinfo in self._files_alignment:
0467                 if fileinfo.dataset != dataset: continue
0468                 miniiovs = set(self._get_iovs(fileinfo.runs, useminiiovs=True))
0469                 if miniiov not in miniiovs: continue
0470                 if len(miniiovs) > 1:
0471                     hippyjobs[dataset,miniiov] = []
0472                 if eventsinthisjob >= self._args.hippy_events_per_job:
0473                     currentjob = []
0474                     jobsforminiiov.append(currentjob)
0475                     eventsinthisjob = 0
0476                 currentjob.append(fileinfo)
0477                 currentjob.sort()
0478                 eventsinthisjob += fileinfo.nevents
0479 
0480         self._hippy_jobs = {
0481           (dataset, iov): sum((hippyjobs[dataset, miniiov]
0482                                for miniiov in self._miniiovs
0483                                if iov == max(_ for _ in self._iovs if _ <= miniiov)), []
0484                            )
0485           for dataset, iov in itertools.product(self._datasets, self._iovs)
0486         }
0487 
0488     def _print_eventcounts(self):
0489         """Print the event counts per file list and per IOV."""
0490 
0491         log = os.path.join(self._output_dir, FileListCreator._event_count_log)
0492 
0493         print_msg("Using {0:d} events for alignment ({1:.2f}%)."
0494                   .format(self._events_for_alignment,
0495                           100.0*
0496                           self._events_for_alignment/self._events_in_dataset),
0497                   log_file = log)
0498         for iov in sorted(self._iov_info_alignment):
0499             print_msg(("Approximate events" if self.rereco else "Events") + " for alignment in IOV since {0:f}: {1:f}"
0500                       .format(iov, self._iov_info_alignment[iov]["events"]),
0501                       log_file = log)
0502 
0503         print_msg("Using {0:d} events for validation ({1:.2f}%)."
0504                   .format(self._events_for_validation,
0505                           100.0*
0506                           self._events_for_validation/self._events_in_dataset),
0507                   log_file = log)
0508 
0509         for iov in sorted(self._iov_info_validation):
0510             msg = ("Approximate events" if self.rereco else "Events") + " for validation in IOV since {0:f}: {1:f}".format(
0511                 iov, self._iov_info_validation[iov]["events"])
0512             if (self._iov_info_validation[iov]["events"]
0513                 < self._args.minimum_events_validation):
0514                 msg += " (not enough events -> no dataset file will be created)"
0515             print_msg(msg, log_file = log)
0516 
0517         for run in sorted(self._run_info):
0518             msg = ("Approximate events" if self.rereco else "Events") + " for validation in run {0:f}: {1:f}".format(
0519                 run, self._run_info[run]["events"])
0520             if (self._run_info[run]["events"]
0521                 < self._args.minimum_events_validation):
0522                 msg += " (not enough events -> no dataset file will be created)"
0523             print_msg(msg, log_file = log)
0524 
0525         unused_events = (self._events_in_dataset
0526                          - self._events_for_validation
0527                          - self._events_for_alignment)
0528         if unused_events > 0 != self._events_in_dataset:
0529             print_msg("Unused events: {0:d} ({1:.2f}%)"
0530                       .format(unused_events,
0531                               100.0*unused_events/self._events_in_dataset),
0532                       log_file = log)
0533 
0534 
0535     def _create_dataset_ini_section(self, name, collection, json_file = None):
0536         """Write dataset ini snippet.
0537 
0538         Arguments:
0539         - `name`: name of the dataset section
0540         - `collection`: track collection of this dataset
0541         - `json_file`: JSON file to be used for this dataset (optional)
0542         """
0543 
0544         if json_file:
0545             splitted = name.split("_since")
0546             file_list = "_since".join(splitted[:-1]
0547                                       if len(splitted) > 1
0548                                       else splitted)
0549         else:
0550             file_list = name
0551         output = "[dataset:{}]\n".format(name)
0552         output += "collection = {}\n".format(collection)
0553         output += "inputFileList = ${{datasetdir}}/{}.txt\n".format(file_list)
0554         output += "json = ${{datasetdir}}/{}\n".format(json_file) if json_file else ""
0555 
0556         if collection in ("ALCARECOTkAlCosmicsCTF0T",
0557                           "ALCARECOTkAlCosmicsInCollisions"):
0558             if self._first_dataset_ini:
0559                 print_msg("\tDetermined cosmics dataset, i.e. please replace "
0560                           "'DUMMY_DECO_MODE_FLAG' and 'DUMMY_ZERO_TESLA_FLAG' "
0561                           "with the correct values.")
0562                 self._first_dataset_ini = False
0563             output += "cosmicsDecoMode  = DUMMY_DECO_MODE_FLAG\n"
0564             output += "cosmicsZeroTesla = DUMMY_ZERO_TESLA_FLAG\n"
0565         output += "\n"
0566 
0567         return output
0568 
0569 
0570     def _create_json_file(self, name, first, last = None):
0571         """
0572         Create JSON file with `name` covering runs from `first` to `last`.  If a
0573         global JSON is provided, the resulting file is the intersection of the
0574         file created here and the global one.
0575         Returns the name of the created JSON file.
0576 
0577         Arguments:
0578         - `name`: name of the creted JSON file
0579         - `first`: first run covered by the JSON file
0580         - `last`: last run covered by the JSON file
0581 
0582         """
0583 
0584         if last is None: last = self._max_run
0585         name += "_JSON.txt"
0586         print_msg("Creating JSON file: "+name)
0587 
0588         json_file = LumiList.LumiList(runs = range(first, last+1))
0589         if self._args.json:
0590             global_json = LumiList.LumiList(filename = self._args.json)
0591             json_file = json_file & global_json
0592         json_file.writeJSON(os.path.join(self._output_dir, name))
0593 
0594         return name
0595 
0596 
0597     def _get_track_collection(self, edm_file):
0598         """Extract track collection from given `edm_file`.
0599 
0600         Arguments:
0601         - `edm_file`: CMSSW dataset file
0602         """
0603 
0604         # use global redirector to allow also files not yet at your site:
0605         cmd = ["edmDumpEventContent", r"root://cms-xrd-global.cern.ch/"+edm_file]
0606         try:
0607             event_content = subprocess.check_output(cmd).split("\n")
0608         except subprocess.CalledProcessError as e:
0609             splitted = edm_file.split("/")
0610             try:
0611                 alcareco = splitted[splitted.index("ALCARECO")+1].split("-")[0]
0612                 alcareco = alcareco.replace("TkAlCosmics0T", "TkAlCosmicsCTF0T")
0613                 alcareco = "ALCARECO" + alcareco
0614                 print_msg("\tDetermined track collection as '{}'.".format(alcareco))
0615                 return alcareco
0616             except ValueError:
0617                 if "RECO" in splitted:
0618                     print_msg("\tDetermined track collection as 'generalTracks'.")
0619                     return "generalTracks"
0620                 else:
0621                     print_msg("\tCould not determine track collection "
0622                               "automatically.")
0623                     print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
0624                               "the correct value.")
0625                     return "DUMMY_TRACK_COLLECTION"
0626 
0627         track_collections = []
0628         for line in event_content:
0629             splitted = line.split()
0630             if len(splitted) > 0 and splitted[0] == r"vector<reco::Track>":
0631                 track_collections.append(splitted[1].strip().strip('"'))
0632         if len(track_collections) == 0:
0633             print_msg("No track collection found in file '{}'.".format(edm_file))
0634             sys.exit(1)
0635         elif len(track_collections) == 1:
0636             print_msg("\tDetermined track collection as "
0637                       "'{}'.".format(track_collections[0]))
0638             return track_collections[0]
0639         else:
0640             alcareco_tracks = filter(lambda x: x.startswith("ALCARECO"),
0641                                      track_collections)
0642             if len(alcareco_tracks) == 0 and "generalTracks" in track_collections:
0643                 print_msg("\tDetermined track collection as 'generalTracks'.")
0644                 return "generalTracks"
0645             elif len(alcareco_tracks) == 1:
0646                 print_msg("\tDetermined track collection as "
0647                           "'{}'.".format(alcareco_tracks[0]))
0648                 return alcareco_tracks[0]
0649             print_msg("\tCould not unambiguously determine track collection in "
0650                       "file '{}':".format(edm_file))
0651             print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
0652                       "the correct value from the following list.")
0653             for collection in track_collections:
0654                 print_msg("\t - "+collection)
0655             return "DUMMY_TRACK_COLLECTION"
0656 
0657 
0658     def _write_file_lists(self):
0659         """Write file lists to disk."""
0660 
0661         self._create_dataset_txt(self._formatted_dataset, self._files_alignment)
0662         self._create_hippy_txt(self._formatted_dataset, sum(self._hippy_jobs.values(), []))
0663         self._create_dataset_cff(
0664             "_".join(["Alignment", self._formatted_dataset]),
0665             self._files_alignment)
0666 
0667         self._create_dataset_cff(
0668             "_".join(["Validation", self._formatted_dataset]),
0669             self._files_validation)
0670 
0671 
0672         if self._args.create_ini:
0673             dataset_ini_general = "[general]\n"
0674             dataset_ini_general += "datasetdir = {}\n".format(self._output_dir)
0675             dataset_ini_general += ("json = {}\n\n".format(self._args.json)
0676                                     if self._args.json
0677                                     else "\n")
0678 
0679             ini_path = self._formatted_dataset + ".ini"
0680             print_msg("Creating dataset ini file: " + ini_path)
0681             ini_path = os.path.join(self._output_dir, ini_path)
0682 
0683             collection = self._get_track_collection(self._files[0])
0684 
0685             with open(ini_path, "w") as f:
0686                 f.write(dataset_ini_general)
0687                 f.write(self._create_dataset_ini_section(
0688                     self._formatted_dataset, collection))
0689 
0690             iov_wise_ini = dataset_ini_general
0691 
0692         for i,iov in enumerate(sorted(self._iovs)):
0693             iov_str = "since{0:d}".format(iov)
0694             iov_str = "_".join([self._formatted_dataset, iov_str])
0695 
0696             if self.rereco:
0697                 if i == len(self._iovs) - 1:
0698                     last = None
0699                 else:
0700                     last = sorted(self._iovs)[i+1] - 1
0701                 local_json = self._create_json_file(iov_str, iov, last)
0702             else:
0703                 local_json = None
0704 
0705             if self._args.create_ini:
0706                 iov_wise_ini += self._create_dataset_ini_section(iov_str,
0707                                                                  collection,
0708                                                                  local_json)
0709 
0710             self._create_dataset_txt(iov_str,
0711                                      self._iov_info_alignment[iov]["files"])
0712             self._create_hippy_txt(iov_str, sum((self._hippy_jobs[dataset,iov] for dataset in self._datasets), []))
0713             self._create_dataset_cff(
0714                 "_".join(["Alignment", iov_str]),
0715                 self._iov_info_alignment[iov]["files"],
0716                 json_file=local_json)
0717 
0718             if (self._iov_info_validation[iov]["events"]
0719                 < self._args.minimum_events_validation):
0720                 continue
0721             self._create_dataset_cff(
0722                 "_".join(["Validation", iov_str]),
0723                 self._iov_info_validation[iov]["files"],
0724                 json_file=local_json)
0725 
0726         if self._args.create_ini and iov_wise_ini != dataset_ini_general:
0727             ini_path = self._formatted_dataset + "_IOVs.ini"
0728             print_msg("Creating dataset ini file: " + ini_path)
0729             ini_path = os.path.join(self._output_dir, ini_path)
0730             with open(ini_path, "w") as f: f.write(iov_wise_ini)
0731 
0732         for run in sorted(self._run_info):
0733             if args.rereco: continue #need to implement more jsons
0734             if (self._run_info[run]["events"]
0735                 < self._args.minimum_events_validation):
0736                 continue
0737             self._create_dataset_cff(
0738                 "_".join(["Validation", self._formatted_dataset, str(run)]),
0739                 self._run_info[run]["files"])
0740 
0741 
0742     def _create_dataset_txt(self, name, file_list):
0743         """Write alignment file list to disk.
0744 
0745         Arguments:
0746         - `name`: name of the file list
0747         - `file_list`: list of files to write to `name`
0748         """
0749 
0750         name += ".txt"
0751         print_msg("Creating dataset file list: "+name)
0752         with open(os.path.join(self._output_dir, name), "w") as f:
0753             f.write("\n".join(fileinfo.name for fileinfo in file_list))
0754 
0755 
0756     def _create_hippy_txt(self, name, job_list):
0757         name += "_hippy.txt"
0758         print_msg("Creating dataset file list for HipPy: "+name)
0759         with open(os.path.join(self._output_dir, name), "w") as f:
0760             f.write("\n".join(",".join("'"+fileinfo.name+"'" for fileinfo in job) for job in job_list)+"\n")
0761 
0762 
0763     def _create_dataset_cff(self, name, file_list, json_file = None):
0764         """
0765         Create configuration fragment to define a dataset.
0766 
0767         Arguments:
0768         - `name`: name of the configuration fragment
0769         - `file_list`: list of files to write to `name`
0770         - `json_file`: JSON file to be used for this dataset (optional)
0771         """
0772 
0773         if json_file is None: json_file = self._args.json # might still be None
0774         if json_file is not None:
0775             json_file = os.path.join(self._output_dir, json_file)
0776 
0777         name = "_".join(["Dataset",name, "cff.py"])
0778         print_msg("Creating dataset configuration fragment: "+name)
0779 
0780         file_list_str = ""
0781         for sub_list in get_chunks(file_list, 255):
0782             file_list_str += ("readFiles.extend([\n'"+
0783                               "',\n'".join(fileinfo.name for fileinfo in sub_list)+
0784                               "'\n])\n")
0785 
0786         fragment = FileListCreator._dataset_template.format(
0787             lumi_def = ("import FWCore.PythonUtilities.LumiList as LumiList\n\n"
0788                         "lumiSecs = cms.untracked.VLuminosityBlockRange()\n"
0789                         "goodLumiSecs = LumiList.LumiList(filename = "
0790                         "'{0:s}').getCMSSWString().split(',')"
0791                         .format(json_file)
0792                         if json_file else ""),
0793             lumi_arg = ("lumisToProcess = lumiSecs,\n                    "
0794                         if json_file else ""),
0795             lumi_extend = "lumiSecs.extend(goodLumiSecs)" if json_file else "",
0796             files = file_list_str)
0797 
0798         with open(os.path.join(self._output_dir, name), "w") as f:
0799             f.write(fragment)
0800 
0801 
0802     _dataset_template = """\
0803 import FWCore.ParameterSet.Config as cms
0804 {lumi_def:s}
0805 readFiles = cms.untracked.vstring()
0806 source = cms.Source("PoolSource",
0807                     {lumi_arg:s}fileNames = readFiles)
0808 {files:s}{lumi_extend:s}
0809 maxEvents = cms.untracked.PSet(input = cms.untracked.int32(-1))
0810 """
0811 
0812 
0813 class _DasCache(object):
0814     """Helper class to cache information from DAS requests."""
0815 
0816     def __init__(self, file_list_id):
0817         """Constructor of the cache.
0818 
0819         Arguments:
0820         - `file_list_id`: ID of the cached file lists
0821         """
0822 
0823         self._file_list_id = file_list_id
0824         self._cache_file_name = os.path.join(file_list_id, ".das_cache.pkl")
0825         self.reset()
0826 
0827 
0828     def reset(self):
0829         """Reset the cache contents and the 'empty' flag."""
0830 
0831         self._empty = True
0832         self._events_in_dataset = 0
0833         self._files = []
0834         self._file_info = []
0835         self._max_run = None
0836 
0837 
0838     def set(self, total_events, file_list, file_info, max_run):
0839         """Set the content of the cache.
0840 
0841         Arguments:
0842         - `total_events`: total number of events in dataset
0843         - `file_list`: list of files in dataset
0844         - `file_info`: dictionary with numbers of events per file
0845         - `max_run`: highest run number contained in the dataset
0846         """
0847 
0848         self._events_in_dataset = total_events
0849         self._files = file_list
0850         self._file_info = file_info
0851         self._max_run = max_run
0852         self._empty = False
0853 
0854 
0855     def get(self):
0856         """
0857         Get the content of the cache as tuple:
0858            result = (total number of events in dataset,
0859                      list of files in dataset,
0860                      dictionary with numbers of events and runs per file)
0861         """
0862 
0863         return self._events_in_dataset, self._files, self._file_info, self._max_run
0864 
0865 
0866     def load(self):
0867         """Loads the cached contents."""
0868 
0869         if not self.empty:
0870             print_msg("Overriding file information with cached information.")
0871         try:
0872             with open(self._cache_file_name, "rb") as f:
0873                 tmp_dict = cPickle.load(f)
0874                 self.__dict__.update(tmp_dict)
0875         except IOError as e:
0876             if e.args == (2, "No such file or directory"):
0877                 msg = "Failed to load cache for '{}'.".format(self._file_list_id)
0878                 if not self.empty:
0879                     msg += " Keeping the previous file information."
0880                 print_msg(msg)
0881             else:
0882                 raise
0883 
0884 
0885     def dump(self):
0886         """Dumps the contents to the cache file."""
0887 
0888         if self.empty:
0889             print_msg("Cache is empty. Not writing to file.")
0890             return
0891 
0892         with open(self._cache_file_name, "wb") as f:
0893             cPickle.dump(self.__dict__, f, 2)
0894 
0895 
0896     @property
0897     def empty(self):
0898         """
0899         Flag indicating whether the cache is empty or has been filled (possibly
0900         with nothing).
0901         """
0902 
0903         return self._empty
0904 
0905 
0906 
0907 ################################################################################
0908 def das_client(query, check_key = None):
0909     """
0910     Submit `query` to DAS client and handle possible errors.
0911     Further treatment of the output might be necessary.
0912 
0913     Arguments:
0914     - `query`: DAS query
0915     - `check_key`: optional key to be checked for; retriggers query if needed
0916     """
0917 
0918     error = True
0919     das_data = {'status': 'error'}
0920     for i in range(5):         # maximum of 5 tries
0921         try:
0922             das_data = cmssw_das_client.get_data(query, limit = 0)
0923         except IOError as e:
0924             if e.errno == 14: #https://stackoverflow.com/q/36397853/5228524
0925                 continue
0926         except ValueError as e:
0927             if str(e) == "No JSON object could be decoded":
0928                 das_data['reason'] = str(e)
0929                 continue
0930 
0931         if das_data["status"] == "ok":
0932             if das_data["nresults"] == 0 or check_key is None:
0933                 error = False
0934                 break
0935 
0936             result_count = 0
0937             for d in find_key(das_data["data"], [check_key]):
0938                 result_count += len(d)
0939             if result_count == 0:
0940                 das_data["status"] = "error"
0941                 das_data["reason"] = ("DAS did not return required data.")
0942                 continue
0943             else:
0944                 error = False
0945                 break
0946 
0947     if das_data["status"] == "error":
0948         print_msg("DAS query '{}' failed 5 times. "
0949                   "The last time for the the following reason:".format(query))
0950         print(das_data.get("reason", "ERROR:UNKNOWN"))
0951         sys.exit(1)
0952     return das_data["data"]
0953 
0954 
0955 def find_key(collection, key_chain):
0956     """Searches for `key` in `collection` and returns first corresponding value.
0957 
0958     Arguments:
0959     - `collection`: list of dictionaries
0960     - `key_chain`: chain of keys to be searched for
0961     """
0962 
0963     result = None
0964     for i,key in enumerate(key_chain):
0965         for item in collection:
0966             if key in item:
0967                 if i == len(key_chain) - 1:
0968                     result = item[key]
0969                 else:
0970                     try:
0971                         result = find_key(item[key], key_chain[i+1:])
0972                     except LookupError:
0973                         pass    # continue with next `item` in `collection`
0974             else:
0975                 pass            # continue with next `item` in `collection`
0976 
0977     if result is not None: return result
0978     raise LookupError(key_chain, collection) # put
0979 
0980 
0981 def print_msg(text, line_break = True, log_file = None):
0982     """Formatted printing of `text`.
0983 
0984     Arguments:
0985     - `text`: string to be printed
0986     """
0987 
0988     msg = "  >>> " + str(text)
0989     if line_break:
0990         print(msg)
0991     else:
0992         print(msg, end=' ')
0993         sys.stdout.flush()
0994     if log_file:
0995         with open(log_file, "a") as f: f.write(msg+"\n")
0996     return msg
0997 
0998 
0999 def get_runs(file_name):
1000     """
1001     Try to guess the run number from `file_name`. If run could not be
1002     determined, gets the run numbers from DAS (slow!)
1003 
1004     Arguments:
1005     - `file_name`: name of the considered file
1006     """
1007     try:
1008         return [int("".join(file_name.split("/")[-4:-2]))]
1009     except ValueError:
1010         query = "run file="+file_name+" system=dbs3"
1011         return [int(_) for _ in find_key(das_client(query), ["run", "run_number"])]
1012 
1013 
1014 def get_max_run(dataset_name):
1015     """Retrieve the maximum run number in `dataset_name`.
1016 
1017     Arguments:
1018     - `dataset_name`: name of the dataset
1019     """
1020 
1021     data = das_client("run dataset={0:s} system=dbs3".format(dataset_name))
1022     runs = [f["run"][0]["run_number"] for f in data]
1023     return max(runs)
1024 
1025 
1026 def get_files(dataset_name):
1027     """Retrieve list of files in `dataset_name`.
1028 
1029     Arguments:
1030     - `dataset_name`: name of the dataset
1031     """
1032 
1033     data = das_client(("file dataset={0:s} system=dbs3 detail=True | "+
1034                        "grep file.name, file.nevents > 0").format(dataset_name),
1035                       "file")
1036     return [find_key(f["file"], ["name"]) for f in data]
1037 
1038 
1039 def get_datasets(dataset_pattern):
1040     """Retrieve list of dataset matching `dataset_pattern`.
1041 
1042     Arguments:
1043     - `dataset_pattern`: pattern of dataset names
1044     """
1045 
1046     data = das_client("dataset dataset={0:s} system=dbs3 detail=True"
1047                       "| grep dataset.name".format(dataset_pattern), "dataset")
1048     return sorted(set([find_key(f["dataset"], ["name"]) for f in data]))
1049 
1050 
1051 def get_events_per_dataset(dataset_name):
1052     """Retrieve the number of a events in `dataset_name`.
1053 
1054     Arguments:
1055     - `dataset_name`: name of a dataset
1056     """
1057 
1058     return _get_events("dataset", dataset_name)
1059 
1060 
1061 def get_events_per_file(file_name):
1062     """Retrieve the number of a events in `file_name`.
1063 
1064     Arguments:
1065     - `file_name`: name of a dataset file
1066     """
1067 
1068     return _get_events("file", file_name)
1069 
1070 
1071 def _get_events(entity, name):
1072     """Retrieve the number of events from `entity` called `name`.
1073 
1074     Arguments:
1075     - `entity`: type of entity
1076     - `name`: name of entity
1077     """
1078 
1079     data = das_client("{0:s}={1:s} system=dbs3 detail=True | grep {0:s}.nevents"
1080                       .format(entity, name), entity)
1081     return int(find_key(data, [entity, "nevents"]))
1082 
1083 
1084 def _get_properties(name, entity, properties, filters = None, sub_entity = None,
1085                     aggregators = None):
1086     """Retrieve `properties` from `entity` called `name`.
1087 
1088     Arguments:
1089     - `name`: name of entity
1090     - `entity`: type of entity
1091     - `properties`: list of property names
1092     - `filters`: list of filters on properties
1093     - `sub_entity`: type of entity from which to extract the properties;
1094                     defaults to `entity`
1095     - `aggregators`: additional aggregators/filters to amend to query
1096     """
1097 
1098     if sub_entity is None: sub_entity = entity
1099     if filters is None:    filters    = []
1100     props = ["{0:s}.{1:s}".format(sub_entity,prop.split()[0])
1101              for prop in properties]
1102     conditions = ["{0:s}.{1:s}".format(sub_entity, filt)
1103                   for filt in filters]
1104     add_ons = "" if aggregators is None else " | "+" | ".join(aggregators)
1105 
1106     data = das_client("{0:s} {1:s}={2:s} system=dbs3 detail=True | grep {3:s}{4:s}"
1107                       .format(sub_entity, entity, name,
1108                               ", ".join(props+conditions), add_ons), sub_entity)
1109     return [[find_key(f[sub_entity], [prop]) for prop in properties] for f in data]
1110 
1111 def get_file_info(dataset):
1112     result = _get_properties(name=dataset,
1113                              properties = ["name", "nevents"],
1114                              filters = ["nevents > 0"],
1115                              entity = "dataset",
1116                              sub_entity = "file")
1117     return [(dataset, name, nevents) for name, nevents in result]
1118 
1119 
1120 
1121 FileInfo = collections.namedtuple("FileInfo", "dataset name nevents runs")
1122 
1123 def _make_file_info(dataset_name_nevents):
1124     return FileInfo(*dataset_name_nevents, runs=get_runs(dataset_name_nevents[1]))
1125 
1126 def get_chunks(long_list, chunk_size):
1127     """
1128     Generates list of sub-lists of `long_list` with a maximum size of
1129     `chunk_size`.
1130 
1131     Arguments:
1132     - `long_list`: original list
1133     - `chunk_size`: maximum size of created sub-lists
1134     """
1135 
1136     for i in range(0, len(long_list), chunk_size):
1137         yield long_list[i:i+chunk_size]
1138 
1139 
1140 def merge_strings(strings):
1141     """Merge strings in `strings` into a common string.
1142 
1143     Arguments:
1144     - `strings`: list of strings
1145     """
1146 
1147     if type(strings) == str:
1148         return strings
1149     elif len(strings) == 0:
1150         return ""
1151     elif len(strings) == 1:
1152         return strings[0]
1153     elif len(strings) == 2:
1154         first = strings[0]
1155         second = strings[1]
1156     else:
1157         first = merge_strings(strings[:-1])
1158         second = strings[-1]
1159 
1160     merged_string = ""
1161     blocks = difflib.SequenceMatcher(None, first, second).get_matching_blocks()
1162 
1163     last_i, last_j, last_n = 0, 0, 0
1164     for i, j, n in blocks:
1165         merged_string += first[last_i+last_n:i]
1166         merged_string += second[last_j+last_n:j]
1167         merged_string += first[i:i+n]
1168         last_i, last_j, last_n = i, j, n
1169 
1170     return str(merged_string)
1171 
1172 
1173 ################################################################################
1174 if __name__ == "__main__":
1175     try:
1176         main()
1177     except KeyboardInterrupt:
1178         pass