Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:02

0001 #!/usr/bin/env python3
0002 
0003 
0004 from builtins import range
0005 import os
0006 import re
0007 import sys
0008 import glob
0009 import json
0010 import math
0011 import bisect
0012 import random
0013 import signal
0014 if sys.version_info[0]>2:
0015   import _pickle as cPickle
0016 else:
0017   import cPickle
0018 import difflib
0019 import argparse
0020 import functools
0021 import itertools
0022 import subprocess
0023 import collections
0024 import multiprocessing
0025 import FWCore.PythonUtilities.LumiList as LumiList
0026 import Utilities.General.cmssw_das_client as cmssw_das_client
0027 import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
0028 
0029 
0030 ################################################################################
0031 def main(argv = None):
0032     """
0033     Main routine. Not called, if this module is loaded via `import`.
0034 
0035     Arguments:
0036     - `argv`: Command line arguments passed to the script.
0037     """
0038 
0039     if argv == None:
0040         argv = sys.argv[1:]
0041 
0042     file_list_creator = FileListCreator(argv)
0043     file_list_creator.create()
0044 
0045 
0046 ################################################################################
0047 class FileListCreator(object):
0048     """Create file lists for alignment and validation for a given dataset.
0049     """
0050 
0051     def __init__(self, argv):
0052         """Constructor taking the command line arguments.
0053 
0054         Arguments:
0055         - `args`: command line arguments
0056         """
0057 
0058         self._first_dataset_ini = True
0059         self._parser = self._define_parser()
0060         self._args = self._parser.parse_args(argv)
0061 
0062         if not mps_tools.check_proxy():
0063             print_msg(
0064                 "Please create proxy via 'voms-proxy-init -voms cms -rfc'.")
0065             sys.exit(1)
0066 
0067         self._dataset_regex = re.compile(r"^/([^/]+)/([^/]+)/([^/]+)$")
0068         self._validate_input()
0069 
0070         if self._args.test_mode:
0071             import Configuration.PyReleaseValidation.relval_steps as rvs
0072             import Configuration.PyReleaseValidation.relval_production as rvp
0073             self._args.datasets = [rvs.steps[rvp.workflows[1000][1][0]]["INPUT"].dataSet]
0074             self._validate_input() # ensure that this change is valid
0075 
0076         self._datasets = sorted([dataset
0077                                  for pattern in self._args.datasets
0078                                  for dataset in get_datasets(pattern)
0079                                  if re.search(self._args.dataset_filter, dataset)])
0080         if len(self._datasets) == 0:
0081             print_msg("Found no dataset matching the pattern(s):")
0082             for d in self._args.datasets: print_msg("\t"+d)
0083             sys.exit(1)
0084 
0085         self._formatted_dataset = merge_strings(
0086             [re.sub(self._dataset_regex, r"\1_\2_\3", dataset)
0087              for dataset in self._datasets])
0088         self._output_dir = os.path.join(self._args.output_dir,
0089                                         self._formatted_dataset)
0090         self._output_dir = os.path.abspath(self._output_dir)
0091         self._cache = _DasCache(self._output_dir)
0092         self._prepare_iov_datastructures()
0093         self._prepare_run_datastructures()
0094 
0095         try:
0096             os.makedirs(self._output_dir)
0097         except OSError as e:
0098             if e.args == (17, "File exists"):
0099                 if self._args.force:
0100                     pass        # do nothing, just clear the existing output
0101                 elif self._args.use_cache:
0102                     self._cache.load() # load cache before clearing the output
0103                 else:
0104                     print_msg("Directory '{}' already exists from previous runs"
0105                               " of the script. Use '--use-cache' if you want to"
0106                               " use the cached DAS-query results Or use "
0107                               "'--force' to remove it."
0108                               .format(self._output_dir))
0109                     sys.exit(1)
0110                 files = glob.glob(os.path.join(self._output_dir, "*"))
0111                 for f in files: os.remove(f)
0112             else:
0113                 raise
0114 
0115 
0116     def create(self):
0117         """Creates file list. To be called by user of the class."""
0118 
0119         self._request_dataset_information()
0120         self._create_file_lists()
0121         self._print_eventcounts()
0122         self._write_file_lists()
0123 
0124 
0125     _event_count_log = "event_count_info.log"
0126 
0127 
0128     def _define_parser(self):
0129         """Definition of command line argument parser."""
0130 
0131         parser = argparse.ArgumentParser(
0132             description = "Create file lists for alignment",
0133             epilog = ("The tool will create a directory containing all file "
0134                       "lists and a log file with all relevant event counts "
0135                       "('{}').".format(FileListCreator._event_count_log)))
0136         parser.add_argument("-i", "--input", dest = "datasets", required = True,
0137                             metavar = "DATASET", action = "append",
0138                             help = ("CMS dataset name; supports wildcards; "
0139                                     "use multiple times for multiple datasets"))
0140         parser.add_argument("--dataset-filter", default = "",
0141                             help = "regex to match within in the datasets matched,"
0142                                    "in case the wildcard isn't flexible enough")
0143         parser.add_argument("-j", "--json", dest = "json", metavar = "PATH",
0144                             help = "path to JSON file (optional)")
0145         parser.add_argument("-f", "--fraction", dest = "fraction",
0146                             type = float, default = 1,
0147                             help = "max. fraction of files used for alignment")
0148         parser.add_argument("--iov", dest = "iovs", metavar = "RUN", type = int,
0149                             action = "append", default = [],
0150                             help = ("define IOV by specifying first run; for "
0151                                     "multiple IOVs use this option multiple "
0152                                     "times; files from runs before the lowest "
0153                                     "IOV are discarded (default: 1)"))
0154         parser.add_argument("--miniiov", dest="miniiovs", metavar="RUN", type=int,
0155                             action="append", default=[],
0156                             help=("in addition to the standard IOVs, break up hippy jobs "
0157                                   "at these points, so that jobs from before and after "
0158                                   "these runs are not in the same job"))
0159         parser.add_argument("-r", "--random", action = "store_true",
0160                             default = False, help = "select files randomly")
0161         parser.add_argument("-n", "--events-for-alignment", "--maxevents",
0162                             dest = "events", type = int, metavar = "NUMBER",
0163                             help = ("number of events needed for alignment; the"
0164                                     " remaining events in the dataset are used "
0165                                     "for validation; if n<=0, all events are "
0166                                     "used for validation"))
0167         parser.add_argument("--all-events", action = "store_true",
0168                             help = "Use all events for alignment")
0169         parser.add_argument("--tracks-for-alignment", dest = "tracks",
0170                             type = int, metavar = "NUMBER",
0171                             help = "number of tracks needed for alignment")
0172         parser.add_argument("--track-rate", dest = "rate", type = float,
0173                             metavar = "NUMBER",
0174                             help = "number of tracks per event")
0175         parser.add_argument("--run-by-run", dest = "run_by_run",
0176                             action = "store_true", default = False,
0177                             help = "create validation file list for each run")
0178         parser.add_argument("--minimum-events-in-iov",
0179                             dest = "minimum_events_in_iov", metavar = "NUMBER",
0180                             type = int, default = 100000,
0181                             help = ("minimum number of events for alignment per"
0182                                     " IOV; this option has a higher priority "
0183                                     "than '-f/--fraction' "
0184                                     "(default: %(default)s)"))
0185         parser.add_argument("--minimum-events-validation",
0186                             dest = "minimum_events_validation",
0187                             metavar = "NUMBER", type = int, default = 1,
0188                             help = ("minimum number of events for validation; "
0189                                     "applies to IOVs; in case of --run-by-run "
0190                                     "it applies to runs runs "
0191                                     "(default: %(default)s)"))
0192         parser.add_argument("--use-cache", dest = "use_cache",
0193                             action = "store_true", default = False,
0194                             help = "use DAS-query results of previous run")
0195         parser.add_argument("-o", "--output-dir", dest = "output_dir",
0196                             metavar = "PATH", default = os.getcwd(),
0197                             help = "output base directory (default: %(default)s)")
0198         parser.add_argument("--create-ini", dest = "create_ini",
0199                             action = "store_true", default = False,
0200                             help = ("create dataset ini file based on the "
0201                                     "created file lists"))
0202         parser.add_argument("--force", action = "store_true", default = False,
0203                             help = ("remove output directory from previous "
0204                                     "runs, if existing"))
0205         parser.add_argument("--hippy-events-per-job", type = int, default = 1,
0206                             help = ("approximate number of events in each job for HipPy"))
0207         parser.add_argument("--test-mode", dest = "test_mode",
0208                             action = "store_true", default = False,
0209                             help = argparse.SUPPRESS) # hidden option
0210         return parser
0211 
0212 
0213     def _validate_input(self):
0214         """Validate command line arguments."""
0215 
0216         if self._args.events is None:
0217             if self._args.all_events:
0218                 self._args.events = float("inf")
0219                 print_msg("Using all tracks for alignment")
0220             elif (self._args.tracks is None) and (self._args.rate is None):
0221                 msg = ("either -n/--events-for-alignment, --all-events, or both of "
0222                        "--tracks-for-alignment and --track-rate are required")
0223                 self._parser.error(msg)
0224             elif (((self._args.tracks is not None) and (self._args.rate is None)) or
0225                 ((self._args.rate is not None)and (self._args.tracks is None))):
0226                 msg = ("--tracks-for-alignment and --track-rate must be used "
0227                        "together")
0228                 self._parser.error(msg)
0229             else:
0230                 self._args.events = int(math.ceil(self._args.tracks /
0231                                                   self._args.rate))
0232                 print_msg("Requested {0:d} tracks with {1:.2f} tracks/event "
0233                           "-> {2:d} events for alignment."
0234                           .format(self._args.tracks, self._args.rate,
0235                                   self._args.events))
0236         else:
0237             if (self._args.tracks is not None) or (self._args.rate is not None) or self._args.all_events:
0238                 msg = ("-n/--events-for-alignment must not be used with "
0239                        "--tracks-for-alignment, --track-rate, or --all-events")
0240                 self._parser.error(msg)
0241             print_msg("Requested {0:d} events for alignment."
0242                       .format(self._args.events))
0243 
0244         for dataset in self._args.datasets:
0245             if not re.match(self._dataset_regex, dataset):
0246                 print_msg("Dataset pattern '"+dataset+"' is not in CMS format.")
0247                 sys.exit(1)
0248 
0249         nonzero_events_per_iov = (self._args.minimum_events_in_iov > 0)
0250         if nonzero_events_per_iov and self._args.fraction <= 0:
0251             print_msg("Setting minimum number of events per IOV for alignment "
0252                       "to 0 because a non-positive fraction of alignment events"
0253                       " is chosen: {}".format(self._args.fraction))
0254             nonzero_events_per_iov = False
0255             self._args.minimum_events_in_iov = 0
0256         if nonzero_events_per_iov and self._args.events <= 0:
0257             print_msg("Setting minimum number of events per IOV for alignment "
0258                       "to 0 because a non-positive number of alignment events"
0259                       " is chosen: {}".format(self._args.events))
0260             nonzero_events_per_iov = False
0261             self._args.minimum_events_in_iov = 0
0262 
0263 
0264     def _prepare_iov_datastructures(self):
0265         """Create the needed objects for IOV handling."""
0266 
0267         self._iovs = sorted(set(self._args.iovs))
0268         if len(self._iovs) == 0: self._iovs.append(1)
0269         self._iov_info_alignment = {iov: {"events": 0, "files": []}
0270                                         for iov in self._iovs}
0271         self._iov_info_validation = {iov: {"events": 0, "files": []}
0272                                          for iov in self._iovs}
0273 
0274         self._miniiovs = sorted(set(self._iovs) | set(self._args.miniiovs))
0275 
0276 
0277     def _get_iovs(self, runs, useminiiovs=False):
0278         """
0279         Return the IOV start for `run`. Returns 'None' if the run is before any
0280         defined IOV.
0281 
0282         Arguments:
0283         - `runs`: run numbers
0284         """
0285 
0286         iovlist = self._miniiovs if useminiiovs else self._iovs
0287 
0288         iovs = []
0289         for run in runs:
0290           iov_index = bisect.bisect(iovlist, run)
0291           if iov_index > 0: iovs.append(iovlist[iov_index-1])
0292         return iovs
0293 
0294 
0295     def _prepare_run_datastructures(self):
0296         """Create the needed objects for run-by-run validation file lists."""
0297 
0298         self._run_info = {}
0299 
0300 
0301     def _add_file_info(self, container, keys, fileinfo):
0302         """Add file with `file_name` to `container` using `key`.
0303 
0304         Arguments:
0305         - `container`: dictionary holding information on files and event counts
0306         - `keys`: keys to which the info should be added; will be created if not
0307                   existing
0308         - `file_name`: name of a dataset file
0309         """
0310 
0311         for key in keys:
0312             if key not in container:
0313                 container[key] = {"events": 0,
0314                                   "files": []}
0315             container[key]["events"] += fileinfo.nevents / len(keys)
0316             if fileinfo not in container[key]["files"]:
0317                 container[key]["files"].append(fileinfo)
0318 
0319 
0320     def _remove_file_info(self, container, keys, fileinfo):
0321         """Remove file with `file_name` to `container` using `key`.
0322 
0323         Arguments:
0324         - `container`: dictionary holding information on files and event counts
0325         - `keys`: keys from which the info should be removed
0326         - `file_name`: name of a dataset file
0327         - `event_count`: number of events in `file_name`
0328         """
0329 
0330         for key in keys:
0331             if key not in container: continue
0332             try:
0333                 index = container[key]["files"].index(fileinfo)
0334             except ValueError:      # file not found
0335                 return
0336             del container[key]["files"][index]
0337             container[key]["events"] -= fileinfo.nevents / len(keys)
0338 
0339 
0340     def _request_dataset_information(self):
0341         """Retrieve general dataset information and create file list."""
0342 
0343         if not self._cache.empty:
0344             print_msg("Using cached information.")
0345             (self._events_in_dataset,
0346              self._files,
0347              self._file_info,
0348              self._max_run) = self._cache.get()
0349             self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
0350             if self._args.random: random.shuffle(self._files)
0351             return
0352 
0353         # workaround to deal with KeyboardInterrupts in the worker processes:
0354         # - ignore interrupt signals in workers (see initializer)
0355         # - use a timeout of size sys.maxsize to avoid a bug in multiprocessing
0356         number_of_processes = multiprocessing.cpu_count() - 1
0357         number_of_processes = (number_of_processes
0358                                if number_of_processes > 0
0359                                else 1)
0360         pool = multiprocessing.Pool(
0361             processes = number_of_processes,
0362             initializer = lambda: signal.signal(signal.SIGINT, signal.SIG_IGN))
0363 
0364         print_msg("Requesting information for the following dataset(s):")
0365         for d in self._datasets: print_msg("\t"+d)
0366         print_msg("This may take a while...")
0367 
0368         result = pool.map_async(get_events_per_dataset, self._datasets).get(3600)
0369         self._events_in_dataset = sum(result)
0370 
0371         result = pool.map_async(get_max_run, self._datasets).get(3600)
0372         self._max_run = max(result)
0373 
0374         result = sum(pool.map_async(get_file_info, self._datasets).get(3600), [])
0375         files = pool.map_async(_make_file_info, result).get(3600)
0376         self._file_info = sorted(fileinfo for fileinfo in files)
0377 
0378         self.rereco = any(len(fileinfo.runs)>1 for fileinfo in self._file_info)
0379 
0380         if self._args.test_mode:
0381             self._file_info = self._file_info[-200:] # take only last chunk of files
0382         self._files = [fileinfo.name for fileinfo in self._file_info]
0383 
0384         # write information to cache
0385         self._cache.set(self._events_in_dataset, self._files, self._file_info,
0386                         self._max_run)
0387         self._cache.dump()
0388         if self._args.random:
0389           random.shuffle(self._file_info)
0390           self._files = [fileinfo.name for fileinfo in self._file_info]
0391 
0392     def _create_file_lists(self):
0393         """Create file lists for alignment and validation."""
0394 
0395         # collect files for alignment until minimal requirements are fulfilled
0396         self._files_alignment = []
0397         self._files_validation = []
0398         self._events_for_alignment = 0
0399         self._events_for_validation = 0
0400 
0401         max_range = (0
0402                      if self._args.events <= 0
0403                      else int(math.ceil(len(self._files)*self._args.fraction)))
0404         use_for_alignment = True
0405         for i, fileinfo in enumerate(self._file_info):
0406             enough_events = self._events_for_alignment >= self._args.events
0407             fraction_exceeded = i >= max_range
0408             if enough_events or fraction_exceeded: use_for_alignment = False
0409 
0410             dataset, f, number_of_events, runs = fileinfo
0411 
0412             iovs = self._get_iovs(runs)
0413             if use_for_alignment:
0414                 if iovs:
0415                     self._events_for_alignment += number_of_events
0416                     self._files_alignment.append(fileinfo)
0417                     self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
0418                 else:
0419                     max_range += 1 # not used -> discard in fraction calculation
0420             else:
0421                 if iovs:
0422                     self._events_for_validation += number_of_events
0423                     self._files_validation.append(fileinfo)
0424                     self._add_file_info(self._iov_info_validation, iovs, fileinfo)
0425                     if self._args.run_by_run:
0426                         self._add_file_info(self._run_info, runs, fileinfo)
0427 
0428         self._fulfill_iov_eventcount()
0429 
0430         self._split_hippy_jobs()
0431 
0432 
0433     def _fulfill_iov_eventcount(self):
0434         """
0435         Try to fulfill the requirement on the minimum number of events per IOV
0436         in the alignment file list by picking files from the validation list.
0437         """
0438 
0439         for iov in self._iovs:
0440             if self._iov_info_alignment[iov]["events"] >= self._args.minimum_events_in_iov: continue
0441             for fileinfo in self._files_validation[:]:
0442                 dataset, f, number_of_events, runs = fileinfo
0443                 iovs = self._get_iovs(runs)
0444                 if iov in iovs:
0445                     self._files_alignment.append(fileinfo)
0446                     self._events_for_alignment += number_of_events
0447                     self._add_file_info(self._iov_info_alignment, iovs, fileinfo)
0448 
0449                     self._events_for_validation -= number_of_events
0450                     self._remove_file_info(self._iov_info_validation, iovs, fileinfo)
0451                     if self._args.run_by_run:
0452                         self._remove_file_info(self._run_info, runs, fileinfo)
0453                     self._files_validation.remove(fileinfo)
0454 
0455                     if (self._iov_info_alignment[iov]["events"]
0456                         >= self._args.minimum_events_in_iov):
0457                         break   # break the file loop if already enough events
0458 
0459     def _split_hippy_jobs(self):
0460         hippyjobs = {}
0461         for dataset, miniiov in itertools.product(self._datasets, self._miniiovs):
0462             jobsforminiiov = []
0463             hippyjobs[dataset,miniiov] = jobsforminiiov
0464             eventsinthisjob = float("inf")
0465             for fileinfo in self._files_alignment:
0466                 if fileinfo.dataset != dataset: continue
0467                 miniiovs = set(self._get_iovs(fileinfo.runs, useminiiovs=True))
0468                 if miniiov not in miniiovs: continue
0469                 if len(miniiovs) > 1:
0470                     hippyjobs[dataset,miniiov] = []
0471                 if eventsinthisjob >= self._args.hippy_events_per_job:
0472                     currentjob = []
0473                     jobsforminiiov.append(currentjob)
0474                     eventsinthisjob = 0
0475                 currentjob.append(fileinfo)
0476                 currentjob.sort()
0477                 eventsinthisjob += fileinfo.nevents
0478 
0479         self._hippy_jobs = {
0480           (dataset, iov): sum((hippyjobs[dataset, miniiov]
0481                                for miniiov in self._miniiovs
0482                                if iov == max(_ for _ in self._iovs if _ <= miniiov)), []
0483                            )
0484           for dataset, iov in itertools.product(self._datasets, self._iovs)
0485         }
0486 
0487     def _print_eventcounts(self):
0488         """Print the event counts per file list and per IOV."""
0489 
0490         log = os.path.join(self._output_dir, FileListCreator._event_count_log)
0491 
0492         print_msg("Using {0:d} events for alignment ({1:.2f}%)."
0493                   .format(self._events_for_alignment,
0494                           100.0*
0495                           self._events_for_alignment/self._events_in_dataset),
0496                   log_file = log)
0497         for iov in sorted(self._iov_info_alignment):
0498             print_msg(("Approximate events" if self.rereco else "Events") + " for alignment in IOV since {0:f}: {1:f}"
0499                       .format(iov, self._iov_info_alignment[iov]["events"]),
0500                       log_file = log)
0501 
0502         print_msg("Using {0:d} events for validation ({1:.2f}%)."
0503                   .format(self._events_for_validation,
0504                           100.0*
0505                           self._events_for_validation/self._events_in_dataset),
0506                   log_file = log)
0507 
0508         for iov in sorted(self._iov_info_validation):
0509             msg = ("Approximate events" if self.rereco else "Events") + " for validation in IOV since {0:f}: {1:f}".format(
0510                 iov, self._iov_info_validation[iov]["events"])
0511             if (self._iov_info_validation[iov]["events"]
0512                 < self._args.minimum_events_validation):
0513                 msg += " (not enough events -> no dataset file will be created)"
0514             print_msg(msg, log_file = log)
0515 
0516         for run in sorted(self._run_info):
0517             msg = ("Approximate events" if self.rereco else "Events") + " for validation in run {0:f}: {1:f}".format(
0518                 run, self._run_info[run]["events"])
0519             if (self._run_info[run]["events"]
0520                 < self._args.minimum_events_validation):
0521                 msg += " (not enough events -> no dataset file will be created)"
0522             print_msg(msg, log_file = log)
0523 
0524         unused_events = (self._events_in_dataset
0525                          - self._events_for_validation
0526                          - self._events_for_alignment)
0527         if unused_events > 0 != self._events_in_dataset:
0528             print_msg("Unused events: {0:d} ({1:.2f}%)"
0529                       .format(unused_events,
0530                               100.0*unused_events/self._events_in_dataset),
0531                       log_file = log)
0532 
0533 
0534     def _create_dataset_ini_section(self, name, collection, json_file = None):
0535         """Write dataset ini snippet.
0536 
0537         Arguments:
0538         - `name`: name of the dataset section
0539         - `collection`: track collection of this dataset
0540         - `json_file`: JSON file to be used for this dataset (optional)
0541         """
0542 
0543         if json_file:
0544             splitted = name.split("_since")
0545             file_list = "_since".join(splitted[:-1]
0546                                       if len(splitted) > 1
0547                                       else splitted)
0548         else:
0549             file_list = name
0550         output = "[dataset:{}]\n".format(name)
0551         output += "collection = {}\n".format(collection)
0552         output += "inputFileList = ${{datasetdir}}/{}.txt\n".format(file_list)
0553         output += "json = ${{datasetdir}}/{}\n".format(json_file) if json_file else ""
0554 
0555         if collection in ("ALCARECOTkAlCosmicsCTF0T",
0556                           "ALCARECOTkAlCosmicsInCollisions"):
0557             if self._first_dataset_ini:
0558                 print_msg("\tDetermined cosmics dataset, i.e. please replace "
0559                           "'DUMMY_DECO_MODE_FLAG' and 'DUMMY_ZERO_TESLA_FLAG' "
0560                           "with the correct values.")
0561                 self._first_dataset_ini = False
0562             output += "cosmicsDecoMode  = DUMMY_DECO_MODE_FLAG\n"
0563             output += "cosmicsZeroTesla = DUMMY_ZERO_TESLA_FLAG\n"
0564         output += "\n"
0565 
0566         return output
0567 
0568 
0569     def _create_json_file(self, name, first, last = None):
0570         """
0571         Create JSON file with `name` covering runs from `first` to `last`.  If a
0572         global JSON is provided, the resulting file is the intersection of the
0573         file created here and the global one.
0574         Returns the name of the created JSON file.
0575 
0576         Arguments:
0577         - `name`: name of the creted JSON file
0578         - `first`: first run covered by the JSON file
0579         - `last`: last run covered by the JSON file
0580 
0581         """
0582 
0583         if last is None: last = self._max_run
0584         name += "_JSON.txt"
0585         print_msg("Creating JSON file: "+name)
0586 
0587         json_file = LumiList.LumiList(runs = range(first, last+1))
0588         if self._args.json:
0589             global_json = LumiList.LumiList(filename = self._args.json)
0590             json_file = json_file & global_json
0591         json_file.writeJSON(os.path.join(self._output_dir, name))
0592 
0593         return name
0594 
0595 
0596     def _get_track_collection(self, edm_file):
0597         """Extract track collection from given `edm_file`.
0598 
0599         Arguments:
0600         - `edm_file`: CMSSW dataset file
0601         """
0602 
0603         # use global redirector to allow also files not yet at your site:
0604         cmd = ["edmDumpEventContent", r"root://cms-xrd-global.cern.ch/"+edm_file]
0605         try:
0606             event_content = subprocess.check_output(cmd).split("\n")
0607         except subprocess.CalledProcessError as e:
0608             splitted = edm_file.split("/")
0609             try:
0610                 alcareco = splitted[splitted.index("ALCARECO")+1].split("-")[0]
0611                 alcareco = alcareco.replace("TkAlCosmics0T", "TkAlCosmicsCTF0T")
0612                 alcareco = "ALCARECO" + alcareco
0613                 print_msg("\tDetermined track collection as '{}'.".format(alcareco))
0614                 return alcareco
0615             except ValueError:
0616                 if "RECO" in splitted:
0617                     print_msg("\tDetermined track collection as 'generalTracks'.")
0618                     return "generalTracks"
0619                 else:
0620                     print_msg("\tCould not determine track collection "
0621                               "automatically.")
0622                     print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
0623                               "the correct value.")
0624                     return "DUMMY_TRACK_COLLECTION"
0625 
0626         track_collections = []
0627         for line in event_content:
0628             splitted = line.split()
0629             if len(splitted) > 0 and splitted[0] == r"vector<reco::Track>":
0630                 track_collections.append(splitted[1].strip().strip('"'))
0631         if len(track_collections) == 0:
0632             print_msg("No track collection found in file '{}'.".format(edm_file))
0633             sys.exit(1)
0634         elif len(track_collections) == 1:
0635             print_msg("\tDetermined track collection as "
0636                       "'{}'.".format(track_collections[0]))
0637             return track_collections[0]
0638         else:
0639             alcareco_tracks = filter(lambda x: x.startswith("ALCARECO"),
0640                                      track_collections)
0641             if len(alcareco_tracks) == 0 and "generalTracks" in track_collections:
0642                 print_msg("\tDetermined track collection as 'generalTracks'.")
0643                 return "generalTracks"
0644             elif len(alcareco_tracks) == 1:
0645                 print_msg("\tDetermined track collection as "
0646                           "'{}'.".format(alcareco_tracks[0]))
0647                 return alcareco_tracks[0]
0648             print_msg("\tCould not unambiguously determine track collection in "
0649                       "file '{}':".format(edm_file))
0650             print_msg("\tPlease replace 'DUMMY_TRACK_COLLECTION' with "
0651                       "the correct value from the following list.")
0652             for collection in track_collections:
0653                 print_msg("\t - "+collection)
0654             return "DUMMY_TRACK_COLLECTION"
0655 
0656 
0657     def _write_file_lists(self):
0658         """Write file lists to disk."""
0659 
0660         self._create_dataset_txt(self._formatted_dataset, self._files_alignment)
0661         self._create_hippy_txt(self._formatted_dataset, sum(self._hippy_jobs.values(), []))
0662         self._create_dataset_cff(
0663             "_".join(["Alignment", self._formatted_dataset]),
0664             self._files_alignment)
0665 
0666         self._create_dataset_cff(
0667             "_".join(["Validation", self._formatted_dataset]),
0668             self._files_validation)
0669 
0670 
0671         if self._args.create_ini:
0672             dataset_ini_general = "[general]\n"
0673             dataset_ini_general += "datasetdir = {}\n".format(self._output_dir)
0674             dataset_ini_general += ("json = {}\n\n".format(self._args.json)
0675                                     if self._args.json
0676                                     else "\n")
0677 
0678             ini_path = self._formatted_dataset + ".ini"
0679             print_msg("Creating dataset ini file: " + ini_path)
0680             ini_path = os.path.join(self._output_dir, ini_path)
0681 
0682             collection = self._get_track_collection(self._files[0])
0683 
0684             with open(ini_path, "w") as f:
0685                 f.write(dataset_ini_general)
0686                 f.write(self._create_dataset_ini_section(
0687                     self._formatted_dataset, collection))
0688 
0689             iov_wise_ini = dataset_ini_general
0690 
0691         for i,iov in enumerate(sorted(self._iovs)):
0692             iov_str = "since{0:d}".format(iov)
0693             iov_str = "_".join([self._formatted_dataset, iov_str])
0694 
0695             if self.rereco:
0696                 if i == len(self._iovs) - 1:
0697                     last = None
0698                 else:
0699                     last = sorted(self._iovs)[i+1] - 1
0700                 local_json = self._create_json_file(iov_str, iov, last)
0701             else:
0702                 local_json = None
0703 
0704             if self._args.create_ini:
0705                 iov_wise_ini += self._create_dataset_ini_section(iov_str,
0706                                                                  collection,
0707                                                                  local_json)
0708 
0709             self._create_dataset_txt(iov_str,
0710                                      self._iov_info_alignment[iov]["files"])
0711             self._create_hippy_txt(iov_str, sum((self._hippy_jobs[dataset,iov] for dataset in self._datasets), []))
0712             self._create_dataset_cff(
0713                 "_".join(["Alignment", iov_str]),
0714                 self._iov_info_alignment[iov]["files"],
0715                 json_file=local_json)
0716 
0717             if (self._iov_info_validation[iov]["events"]
0718                 < self._args.minimum_events_validation):
0719                 continue
0720             self._create_dataset_cff(
0721                 "_".join(["Validation", iov_str]),
0722                 self._iov_info_validation[iov]["files"],
0723                 json_file=local_json)
0724 
0725         if self._args.create_ini and iov_wise_ini != dataset_ini_general:
0726             ini_path = self._formatted_dataset + "_IOVs.ini"
0727             print_msg("Creating dataset ini file: " + ini_path)
0728             ini_path = os.path.join(self._output_dir, ini_path)
0729             with open(ini_path, "w") as f: f.write(iov_wise_ini)
0730 
0731         for run in sorted(self._run_info):
0732             if args.rereco: continue #need to implement more jsons
0733             if (self._run_info[run]["events"]
0734                 < self._args.minimum_events_validation):
0735                 continue
0736             self._create_dataset_cff(
0737                 "_".join(["Validation", self._formatted_dataset, str(run)]),
0738                 self._run_info[run]["files"])
0739 
0740 
0741     def _create_dataset_txt(self, name, file_list):
0742         """Write alignment file list to disk.
0743 
0744         Arguments:
0745         - `name`: name of the file list
0746         - `file_list`: list of files to write to `name`
0747         """
0748 
0749         name += ".txt"
0750         print_msg("Creating dataset file list: "+name)
0751         with open(os.path.join(self._output_dir, name), "w") as f:
0752             f.write("\n".join(fileinfo.name for fileinfo in file_list))
0753 
0754 
0755     def _create_hippy_txt(self, name, job_list):
0756         name += "_hippy.txt"
0757         print_msg("Creating dataset file list for HipPy: "+name)
0758         with open(os.path.join(self._output_dir, name), "w") as f:
0759             f.write("\n".join(",".join("'"+fileinfo.name+"'" for fileinfo in job) for job in job_list)+"\n")
0760 
0761 
0762     def _create_dataset_cff(self, name, file_list, json_file = None):
0763         """
0764         Create configuration fragment to define a dataset.
0765 
0766         Arguments:
0767         - `name`: name of the configuration fragment
0768         - `file_list`: list of files to write to `name`
0769         - `json_file`: JSON file to be used for this dataset (optional)
0770         """
0771 
0772         if json_file is None: json_file = self._args.json # might still be None
0773         if json_file is not None:
0774             json_file = os.path.join(self._output_dir, json_file)
0775 
0776         name = "_".join(["Dataset",name, "cff.py"])
0777         print_msg("Creating dataset configuration fragment: "+name)
0778 
0779         file_list_str = ""
0780         for sub_list in get_chunks(file_list, 255):
0781             file_list_str += ("readFiles.extend([\n'"+
0782                               "',\n'".join(fileinfo.name for fileinfo in sub_list)+
0783                               "'\n])\n")
0784 
0785         fragment = FileListCreator._dataset_template.format(
0786             lumi_def = ("import FWCore.PythonUtilities.LumiList as LumiList\n\n"
0787                         "lumiSecs = cms.untracked.VLuminosityBlockRange()\n"
0788                         "goodLumiSecs = LumiList.LumiList(filename = "
0789                         "'{0:s}').getCMSSWString().split(',')"
0790                         .format(json_file)
0791                         if json_file else ""),
0792             lumi_arg = ("lumisToProcess = lumiSecs,\n                    "
0793                         if json_file else ""),
0794             lumi_extend = "lumiSecs.extend(goodLumiSecs)" if json_file else "",
0795             files = file_list_str)
0796 
0797         with open(os.path.join(self._output_dir, name), "w") as f:
0798             f.write(fragment)
0799 
0800 
0801     _dataset_template = """\
0802 import FWCore.ParameterSet.Config as cms
0803 {lumi_def:s}
0804 readFiles = cms.untracked.vstring()
0805 source = cms.Source("PoolSource",
0806                     {lumi_arg:s}fileNames = readFiles)
0807 {files:s}{lumi_extend:s}
0808 maxEvents = cms.untracked.PSet(input = cms.untracked.int32(-1))
0809 """
0810 
0811 
0812 class _DasCache(object):
0813     """Helper class to cache information from DAS requests."""
0814 
0815     def __init__(self, file_list_id):
0816         """Constructor of the cache.
0817 
0818         Arguments:
0819         - `file_list_id`: ID of the cached file lists
0820         """
0821 
0822         self._file_list_id = file_list_id
0823         self._cache_file_name = os.path.join(file_list_id, ".das_cache.pkl")
0824         self.reset()
0825 
0826 
0827     def reset(self):
0828         """Reset the cache contents and the 'empty' flag."""
0829 
0830         self._empty = True
0831         self._events_in_dataset = 0
0832         self._files = []
0833         self._file_info = []
0834         self._max_run = None
0835 
0836 
0837     def set(self, total_events, file_list, file_info, max_run):
0838         """Set the content of the cache.
0839 
0840         Arguments:
0841         - `total_events`: total number of events in dataset
0842         - `file_list`: list of files in dataset
0843         - `file_info`: dictionary with numbers of events per file
0844         - `max_run`: highest run number contained in the dataset
0845         """
0846 
0847         self._events_in_dataset = total_events
0848         self._files = file_list
0849         self._file_info = file_info
0850         self._max_run = max_run
0851         self._empty = False
0852 
0853 
0854     def get(self):
0855         """
0856         Get the content of the cache as tuple:
0857            result = (total number of events in dataset,
0858                      list of files in dataset,
0859                      dictionary with numbers of events and runs per file)
0860         """
0861 
0862         return self._events_in_dataset, self._files, self._file_info, self._max_run
0863 
0864 
0865     def load(self):
0866         """Loads the cached contents."""
0867 
0868         if not self.empty:
0869             print_msg("Overriding file information with cached information.")
0870         try:
0871             with open(self._cache_file_name, "rb") as f:
0872                 tmp_dict = cPickle.load(f)
0873                 self.__dict__.update(tmp_dict)
0874         except IOError as e:
0875             if e.args == (2, "No such file or directory"):
0876                 msg = "Failed to load cache for '{}'.".format(self._file_list_id)
0877                 if not self.empty:
0878                     msg += " Keeping the previous file information."
0879                 print_msg(msg)
0880             else:
0881                 raise
0882 
0883 
0884     def dump(self):
0885         """Dumps the contents to the cache file."""
0886 
0887         if self.empty:
0888             print_msg("Cache is empty. Not writing to file.")
0889             return
0890 
0891         with open(self._cache_file_name, "wb") as f:
0892             cPickle.dump(self.__dict__, f, 2)
0893 
0894 
0895     @property
0896     def empty(self):
0897         """
0898         Flag indicating whether the cache is empty or has been filled (possibly
0899         with nothing).
0900         """
0901 
0902         return self._empty
0903 
0904 
0905 
0906 ################################################################################
0907 def das_client(query, check_key = None):
0908     """
0909     Submit `query` to DAS client and handle possible errors.
0910     Further treatment of the output might be necessary.
0911 
0912     Arguments:
0913     - `query`: DAS query
0914     - `check_key`: optional key to be checked for; retriggers query if needed
0915     """
0916 
0917     error = True
0918     das_data = {'status': 'error'}
0919     for i in range(5):         # maximum of 5 tries
0920         try:
0921             das_data = cmssw_das_client.get_data(query, limit = 0)
0922         except IOError as e:
0923             if e.errno == 14: #https://stackoverflow.com/q/36397853/5228524
0924                 continue
0925         except ValueError as e:
0926             if str(e) == "No JSON object could be decoded":
0927                 das_data['reason'] = str(e)
0928                 continue
0929 
0930         if das_data["status"] == "ok":
0931             if das_data["nresults"] == 0 or check_key is None:
0932                 error = False
0933                 break
0934 
0935             result_count = 0
0936             for d in find_key(das_data["data"], [check_key]):
0937                 result_count += len(d)
0938             if result_count == 0:
0939                 das_data["status"] = "error"
0940                 das_data["reason"] = ("DAS did not return required data.")
0941                 continue
0942             else:
0943                 error = False
0944                 break
0945 
0946     if das_data["status"] == "error":
0947         print_msg("DAS query '{}' failed 5 times. "
0948                   "The last time for the the following reason:".format(query))
0949         print(das_data.get("reason", "ERROR:UNKNOWN"))
0950         sys.exit(1)
0951     return das_data["data"]
0952 
0953 
0954 def find_key(collection, key_chain):
0955     """Searches for `key` in `collection` and returns first corresponding value.
0956 
0957     Arguments:
0958     - `collection`: list of dictionaries
0959     - `key_chain`: chain of keys to be searched for
0960     """
0961 
0962     result = None
0963     for i,key in enumerate(key_chain):
0964         for item in collection:
0965             if key in item:
0966                 if i == len(key_chain) - 1:
0967                     result = item[key]
0968                 else:
0969                     try:
0970                         result = find_key(item[key], key_chain[i+1:])
0971                     except LookupError:
0972                         pass    # continue with next `item` in `collection`
0973             else:
0974                 pass            # continue with next `item` in `collection`
0975 
0976     if result is not None: return result
0977     raise LookupError(key_chain, collection) # put
0978 
0979 
0980 def print_msg(text, line_break = True, log_file = None):
0981     """Formatted printing of `text`.
0982 
0983     Arguments:
0984     - `text`: string to be printed
0985     """
0986 
0987     msg = "  >>> " + str(text)
0988     if line_break:
0989         print(msg)
0990     else:
0991         print(msg, end=' ')
0992         sys.stdout.flush()
0993     if log_file:
0994         with open(log_file, "a") as f: f.write(msg+"\n")
0995     return msg
0996 
0997 
0998 def get_runs(file_name):
0999     """
1000     Try to guess the run number from `file_name`. If run could not be
1001     determined, gets the run numbers from DAS (slow!)
1002 
1003     Arguments:
1004     - `file_name`: name of the considered file
1005     """
1006     try:
1007         return [int("".join(file_name.split("/")[-4:-2]))]
1008     except ValueError:
1009         query = "run file="+file_name+" system=dbs3"
1010         return [int(_) for _ in find_key(das_client(query), ["run", "run_number"])]
1011 
1012 
1013 def get_max_run(dataset_name):
1014     """Retrieve the maximum run number in `dataset_name`.
1015 
1016     Arguments:
1017     - `dataset_name`: name of the dataset
1018     """
1019 
1020     data = das_client("run dataset={0:s} system=dbs3".format(dataset_name))
1021     runs = [f["run"][0]["run_number"] for f in data]
1022     return max(runs)
1023 
1024 
1025 def get_files(dataset_name):
1026     """Retrieve list of files in `dataset_name`.
1027 
1028     Arguments:
1029     - `dataset_name`: name of the dataset
1030     """
1031 
1032     data = das_client(("file dataset={0:s} system=dbs3 detail=True | "+
1033                        "grep file.name, file.nevents > 0").format(dataset_name),
1034                       "file")
1035     return [find_key(f["file"], ["name"]) for f in data]
1036 
1037 
1038 def get_datasets(dataset_pattern):
1039     """Retrieve list of dataset matching `dataset_pattern`.
1040 
1041     Arguments:
1042     - `dataset_pattern`: pattern of dataset names
1043     """
1044 
1045     data = das_client("dataset dataset={0:s} system=dbs3 detail=True"
1046                       "| grep dataset.name".format(dataset_pattern), "dataset")
1047     return sorted(set([find_key(f["dataset"], ["name"]) for f in data]))
1048 
1049 
1050 def get_events_per_dataset(dataset_name):
1051     """Retrieve the number of a events in `dataset_name`.
1052 
1053     Arguments:
1054     - `dataset_name`: name of a dataset
1055     """
1056 
1057     return _get_events("dataset", dataset_name)
1058 
1059 
1060 def get_events_per_file(file_name):
1061     """Retrieve the number of a events in `file_name`.
1062 
1063     Arguments:
1064     - `file_name`: name of a dataset file
1065     """
1066 
1067     return _get_events("file", file_name)
1068 
1069 
1070 def _get_events(entity, name):
1071     """Retrieve the number of events from `entity` called `name`.
1072 
1073     Arguments:
1074     - `entity`: type of entity
1075     - `name`: name of entity
1076     """
1077 
1078     data = das_client("{0:s}={1:s} system=dbs3 detail=True | grep {0:s}.nevents"
1079                       .format(entity, name), entity)
1080     return int(find_key(data, [entity, "nevents"]))
1081 
1082 
1083 def _get_properties(name, entity, properties, filters = None, sub_entity = None,
1084                     aggregators = None):
1085     """Retrieve `properties` from `entity` called `name`.
1086 
1087     Arguments:
1088     - `name`: name of entity
1089     - `entity`: type of entity
1090     - `properties`: list of property names
1091     - `filters`: list of filters on properties
1092     - `sub_entity`: type of entity from which to extract the properties;
1093                     defaults to `entity`
1094     - `aggregators`: additional aggregators/filters to amend to query
1095     """
1096 
1097     if sub_entity is None: sub_entity = entity
1098     if filters is None:    filters    = []
1099     props = ["{0:s}.{1:s}".format(sub_entity,prop.split()[0])
1100              for prop in properties]
1101     conditions = ["{0:s}.{1:s}".format(sub_entity, filt)
1102                   for filt in filters]
1103     add_ons = "" if aggregators is None else " | "+" | ".join(aggregators)
1104 
1105     data = das_client("{0:s} {1:s}={2:s} system=dbs3 detail=True | grep {3:s}{4:s}"
1106                       .format(sub_entity, entity, name,
1107                               ", ".join(props+conditions), add_ons), sub_entity)
1108     return [[find_key(f[sub_entity], [prop]) for prop in properties] for f in data]
1109 
1110 def get_file_info(dataset):
1111     result = _get_properties(name=dataset,
1112                              properties = ["name", "nevents"],
1113                              filters = ["nevents > 0"],
1114                              entity = "dataset",
1115                              sub_entity = "file")
1116     return [(dataset, name, nevents) for name, nevents in result]
1117 
1118 
1119 
1120 FileInfo = collections.namedtuple("FileInfo", "dataset name nevents runs")
1121 
1122 def _make_file_info(dataset_name_nevents):
1123     return FileInfo(*dataset_name_nevents, runs=get_runs(dataset_name_nevents[1]))
1124 
1125 def get_chunks(long_list, chunk_size):
1126     """
1127     Generates list of sub-lists of `long_list` with a maximum size of
1128     `chunk_size`.
1129 
1130     Arguments:
1131     - `long_list`: original list
1132     - `chunk_size`: maximum size of created sub-lists
1133     """
1134 
1135     for i in range(0, len(long_list), chunk_size):
1136         yield long_list[i:i+chunk_size]
1137 
1138 
1139 def merge_strings(strings):
1140     """Merge strings in `strings` into a common string.
1141 
1142     Arguments:
1143     - `strings`: list of strings
1144     """
1145 
1146     if type(strings) == str:
1147         return strings
1148     elif len(strings) == 0:
1149         return ""
1150     elif len(strings) == 1:
1151         return strings[0]
1152     elif len(strings) == 2:
1153         first = strings[0]
1154         second = strings[1]
1155     else:
1156         first = merge_strings(strings[:-1])
1157         second = strings[-1]
1158 
1159     merged_string = ""
1160     blocks = difflib.SequenceMatcher(None, first, second).get_matching_blocks()
1161 
1162     last_i, last_j, last_n = 0, 0, 0
1163     for i, j, n in blocks:
1164         merged_string += first[last_i+last_n:i]
1165         merged_string += second[last_j+last_n:j]
1166         merged_string += first[i:i+n]
1167         last_i, last_j, last_n = i, j, n
1168 
1169     return str(merged_string)
1170 
1171 
1172 ################################################################################
1173 if __name__ == "__main__":
1174     try:
1175         main()
1176     except KeyboardInterrupt:
1177         pass