Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-26 02:34:09

0001 #!/usr/bin/env python3
0002 
0003 ###########################################################################
0004 ## File       : cmsHarvest.py
0005 ## Authors    : Jeroen Hegeman (jeroen.hegeman@cern.ch)
0006 ##              Niklas Pietsch (niklas.pietsch@desy.de)
0007 ##              Franseco Costanza (francesco.costanza@desy.de)
0008 ## Last change: 20100308
0009 ##
0010 ## Purpose    : Main program to run all kinds of harvesting.
0011 ##              For more information please refer to the CMS Twiki url
0012 ##              mentioned just below here.
0013 ###########################################################################
0014 
0015 """Main program to run all kinds of harvesting.
0016 
0017 These are the basic kinds of harvesting implemented (contact me if
0018 your favourite is missing):
0019 
0020 - RelVal : Run for release validation samples. Makes heavy use of MC
0021            truth information.
0022 
0023 - RelValFS: FastSim RelVal.
0024 
0025 - MC : Run for MC samples.
0026 
0027 - DQMOffline : Run for real data (could also be run for MC).
0028 
0029 For the mappings of these harvesting types to sequence names please
0030 see the setup_harvesting_info() and option_handler_list_types()
0031 methods.
0032 
0033 """
0034 
0035 ###########################################################################
0036 
0037 from builtins import range
0038 __version__ = "3.8.2p1" # (version jump to match release)
0039 __author__ = "Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
0040              "Niklas Pietsch (niklas.pietsch@desy.de)"
0041 
0042 twiki_url = "https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
0043 
0044 ###########################################################################
0045 
0046 ###########################################################################
0047 ## TODO list
0048 ##
0049 ## !!! Some code refactoring is in order. A lot of the code that loads
0050 ## and builds dataset and run lists is duplicated. !!!
0051 ##
0052 ## - SPECIAL (future):
0053 ##   After discussing all these harvesting issues yet again with Luca,
0054 ##   it looks like we'll need something special to handle harvesting
0055 ##   for (collisions) data in reprocessing. Stuff with a special DBS
0056 ##   instance in which `rolling' reports of reprocessed datasets is
0057 ##   publised. In this case we will have to check (w.r.t. the parent
0058 ##   dataset) how much of a given run is ready, and submit once we're
0059 ##   satisfied (let's say 90%).
0060 ##
0061 ## - We could get rid of most of the `and dataset.status = VALID'
0062 ##   pieces in the DBS queries.
0063 ## - Change to a more efficient grid scheduler.
0064 ## - Implement incremental harvesting. Requires some changes to the
0065 ##   book keeping to store the harvested number of events for each
0066 ##   run. Also requires some changes to the dataset checking to see if
0067 ##   additional statistics have become available.
0068 ## - Emphasize the warnings in case we're running in force
0069 ##   mode. Otherwise they may get lost a bit in the output.
0070 ## - Fix the creation of the CASTOR dirs. The current approach works
0071 ##   but is a) too complicated and b) too inefficient.
0072 ## - Fully implement all harvesting types.
0073 ##   --> Discuss with Andreas what exactly should be in there. And be
0074 ##       careful with the run numbers!
0075 ## - Add the (second-step) harvesting config file to the (first-step)
0076 ##   ME extraction job to make sure it does not get lost.
0077 ## - Improve sanity checks on harvesting type vs. data type.
0078 ## - Implement reference histograms.
0079 ##   1) User-specified reference dataset.
0080 ##   2) Educated guess based on dataset name.
0081 ##   3) References from GlobalTag.
0082 ##   4) No reference at all.
0083 ## - Is this options.evt_type used anywhere?
0084 ## - Combine all these dbs_resolve_xxx into a single call to DBS(?).
0085 ## - Implement CRAB server use?
0086 ## - Add implementation of email address of user. (Only necessary for
0087 ##   CRAB server.)
0088 ###########################################################################
0089 
0090 import os
0091 import sys
0092 import subprocess
0093 import re
0094 import logging
0095 import optparse
0096 import datetime
0097 import copy
0098 from inspect import getargspec
0099 from random import choice
0100 
0101 
0102 # These we need to communicate with DBS global DBSAPI
0103 from DBSAPI.dbsApi import DbsApi
0104 import DBSAPI.dbsException
0105 import DBSAPI.dbsApiException
0106 from functools import reduce
0107 # and these we need to parse the DBS output.
0108 global xml
0109 global SAXParseException
0110 import xml.sax
0111 from xml.sax import SAXParseException
0112 
0113 import Configuration.PyReleaseValidation
0114 from Configuration.PyReleaseValidation.ConfigBuilder import \
0115      ConfigBuilder, defaultOptions
0116 # from Configuration.PyReleaseValidation.cmsDriverOptions import options, python_config_filename
0117 
0118 #import FWCore.ParameterSet.Config as cms
0119 
0120 # Debugging stuff.
0121 import pdb
0122 try:
0123     import debug_hook
0124 except ImportError:
0125     pass
0126 
0127 ###########################################################################
0128 ## Helper class: Usage exception.
0129 ###########################################################################
0130 class Usage(Exception):
0131     def __init__(self, msg):
0132         self.msg = msg
0133     def __str__(self):
0134         return repr(self.msg)
0135 
0136     # End of Usage.
0137 
0138 ###########################################################################
0139 ## Helper class: Error exception.
0140 ###########################################################################
0141 class Error(Exception):
0142     def __init__(self, msg):
0143         self.msg = msg
0144     def __str__(self):
0145         return repr(self.msg)
0146 
0147 ###########################################################################
0148 ## Helper class: CMSHarvesterHelpFormatter.
0149 ###########################################################################
0150 
0151 class CMSHarvesterHelpFormatter(optparse.IndentedHelpFormatter):
0152     """Helper class to add some customised help output to cmsHarvester.
0153 
0154     We want to add some instructions, as well as a pointer to the CMS
0155     Twiki.
0156 
0157     """
0158 
0159     def format_usage(self, usage):
0160 
0161         usage_lines = []
0162 
0163         sep_line = "-" * 60
0164         usage_lines.append(sep_line)
0165         usage_lines.append("Welcome to the CMS harvester, a (hopefully useful)")
0166         usage_lines.append("tool to create harvesting configurations.")
0167         usage_lines.append("For more information please have a look at the CMS Twiki:")
0168         usage_lines.append("  %s" % twiki_url)
0169         usage_lines.append(sep_line)
0170         usage_lines.append("")
0171 
0172         # Since we only add to the output, we now just append the
0173         # original output from IndentedHelpFormatter.
0174         usage_lines.append(optparse.IndentedHelpFormatter. \
0175                            format_usage(self, usage))
0176 
0177         formatted_usage = "\n".join(usage_lines)
0178         return formatted_usage
0179 
0180     # End of CMSHarvesterHelpFormatter.
0181 
0182 ###########################################################################
0183 ## Helper class: DBSXMLHandler.
0184 ###########################################################################
0185 
0186 class DBSXMLHandler(xml.sax.handler.ContentHandler):
0187     """XML handler class to parse DBS results.
0188 
0189     The tricky thing here is that older DBS versions (2.0.5 and
0190     earlier) return results in a different XML format than newer
0191     versions. Previously the result values were returned as attributes
0192     to the `result' element. The new approach returns result values as
0193     contents of named elements.
0194 
0195     The old approach is handled directly in startElement(), the new
0196     approach in characters().
0197 
0198     NOTE: All results are returned in the form of string values of
0199           course!
0200 
0201     """
0202 
0203     # This is the required mapping from the name of the variable we
0204     # ask for to what we call it ourselves. (Effectively this is the
0205     # mapping between the old attribute key name and the new element
0206     # name.)
0207     mapping = {
0208         "dataset"        : "PATH",
0209         "dataset.tag"    : "PROCESSEDDATASET_GLOBALTAG",
0210         "datatype.type"  : "PRIMARYDSTYPE_TYPE",
0211         "run"            : "RUNS_RUNNUMBER",
0212         "run.number"     : "RUNS_RUNNUMBER",
0213         "file.name"      : "FILES_LOGICALFILENAME",
0214         "file.numevents" : "FILES_NUMBEROFEVENTS",
0215         "algo.version"   : "APPVERSION_VERSION",
0216         "site"           : "STORAGEELEMENT_SENAME",
0217         }
0218 
0219     def __init__(self, tag_names):
0220         # This is a list used as stack to keep track of where we are
0221         # in the element tree.
0222         self.element_position = []
0223         self.tag_names = tag_names
0224         self.results = {}
0225 
0226     def startElement(self, name, attrs):
0227         self.element_position.append(name)
0228 
0229         self.current_value = []
0230 
0231         #----------
0232 
0233         # This is to catch results from DBS 2.0.5 and earlier.
0234         if name == "result":
0235             for name in self.tag_names:
0236                 key = DBSXMLHandler.mapping[name]
0237                 value = str(attrs[key])
0238                 try:
0239                     self.results[name].append(value)
0240                 except KeyError:
0241                     self.results[name] = [value]
0242 
0243         #----------
0244 
0245     def endElement(self, name):
0246         assert self.current_element() == name, \
0247                "closing unopenend element `%s'" % name
0248 
0249         if self.current_element() in self.tag_names:
0250             contents = "".join(self.current_value)
0251             if self.current_element() in self.results:
0252                 self.results[self.current_element()].append(contents)
0253             else:
0254                 self.results[self.current_element()] = [contents]
0255 
0256         self.element_position.pop()
0257 
0258     def characters(self, content):
0259         # NOTE: It is possible that the actual contents of the tag
0260         # gets split into multiple pieces. This method will be called
0261         # for each of the pieces. This means we have to concatenate
0262         # everything ourselves.
0263         if self.current_element() in self.tag_names:
0264             self.current_value.append(content)
0265 
0266     def current_element(self):
0267         return self.element_position[-1]
0268 
0269     def check_results_validity(self):
0270         """Make sure that all results arrays have equal length.
0271 
0272         We should have received complete rows from DBS. I.e. all
0273         results arrays in the handler should be of equal length.
0274 
0275         """
0276 
0277         results_valid = True
0278 
0279         res_names = self.results.keys()
0280         if len(res_names) > 1:
0281             for res_name in res_names[1:]:
0282                 res_tmp = self.results[res_name]
0283                 if len(res_tmp) != len(self.results[res_names[0]]):
0284                     results_valid = False
0285 
0286         return results_valid
0287 
0288     # End of DBSXMLHandler.
0289 
0290 ###########################################################################
0291 ## CMSHarvester class.
0292 ###########################################################################
0293 
0294 class CMSHarvester(object):
0295     """Class to perform CMS harvesting.
0296 
0297     More documentation `obviously' to follow.
0298 
0299     """
0300 
0301     ##########
0302 
0303     def __init__(self, cmd_line_opts=None):
0304         "Initialize class and process command line options."
0305 
0306         self.version = __version__
0307 
0308         # These are the harvesting types allowed. See the head of this
0309         # file for more information.
0310         self.harvesting_types = [
0311             "RelVal",
0312             "RelValFS",
0313             "MC",
0314             "DQMOffline",
0315             ]
0316 
0317         # These are the possible harvesting modes:
0318         #   - Single-step: harvesting takes place on-site in a single
0319         #   step. For each samples only a single ROOT file containing
0320         #   the harvesting results is returned.
0321         #   - Single-step-allow-partial: special hack to allow
0322         #   harvesting of partial statistics using single-step
0323         #   harvesting on spread-out samples.
0324         #   - Two-step: harvesting takes place in two steps. The first
0325         #   step returns a series of monitoring elenent summaries for
0326         #   each sample. The second step then merges these summaries
0327         #   locally and does the real harvesting. This second step
0328         #   produces the ROOT file containing the harvesting results.
0329         self.harvesting_modes = [
0330             "single-step",
0331             "single-step-allow-partial",
0332             "two-step"
0333             ]
0334 
0335         # It is possible to specify a GlobalTag that will override any
0336         # choices (regarding GlobalTags) made by the cmsHarvester.
0337         # BUG BUG BUG
0338         # For the moment, until I figure out a way to obtain the
0339         # GlobalTag with which a given data (!) dataset was created,
0340         # it is necessary to specify a GlobalTag for harvesting of
0341         # data.
0342         # BUG BUG BUG end
0343         self.globaltag = None
0344 
0345         # It's also possible to switch off the use of reference
0346         # histograms altogether.
0347         self.use_ref_hists = True
0348 
0349         # The database name and account are hard-coded. They are not
0350         # likely to change before the end-of-life of this tool. But of
0351         # course there is a way to override this from the command
0352         # line. One can even override the Frontier connection used for
0353         # the GlobalTag and for the reference histograms
0354         # independently. Please only use this for testing purposes.
0355         self.frontier_connection_name = {}
0356         self.frontier_connection_name["globaltag"] = "frontier://" \
0357                                                      "FrontierProd/"
0358         self.frontier_connection_name["refhists"] = "frontier://" \
0359                                                     "FrontierProd/"
0360         self.frontier_connection_overridden = {}
0361         for key in self.frontier_connection_name.keys():
0362             self.frontier_connection_overridden[key] = False
0363 
0364         # This contains information specific to each of the harvesting
0365         # types. Used to create the harvesting configuration. It is
0366         # filled by setup_harvesting_info().
0367         self.harvesting_info = None
0368 
0369         ###
0370 
0371         # These are default `unused' values that will be filled in
0372         # depending on the command line options.
0373 
0374         # The type of harvesting we're doing. See
0375         # self.harvesting_types for allowed types.
0376         self.harvesting_type = None
0377 
0378         # The harvesting mode, popularly known as single-step
0379         # vs. two-step. The thing to remember at this point is that
0380         # single-step is only possible for samples located completely
0381         # at a single site (i.e. SE).
0382         self.harvesting_mode = None
0383         # BUG BUG BUG
0384         # Default temporarily set to two-step until we can get staged
0385         # jobs working with CRAB.
0386         self.harvesting_mode_default = "single-step"
0387         # BUG BUG BUG end
0388 
0389         # The input method: are we reading a dataset name (or regexp)
0390         # directly from the command line or are we reading a file
0391         # containing a list of dataset specifications. Actually we
0392         # keep one of each for both datasets and runs.
0393         self.input_method = {}
0394         self.input_method["datasets"] = {}
0395         self.input_method["datasets"]["use"] = None
0396         self.input_method["datasets"]["ignore"] = None
0397         self.input_method["runs"] = {}
0398         self.input_method["runs"]["use"] = None
0399         self.input_method["runs"]["ignore"] = None
0400         self.input_method["runs"]["ignore"] = None
0401         # The name of whatever input we're using.
0402         self.input_name = {}
0403         self.input_name["datasets"] = {}
0404         self.input_name["datasets"]["use"] = None
0405         self.input_name["datasets"]["ignore"] = None
0406         self.input_name["runs"] = {}
0407         self.input_name["runs"]["use"] = None
0408         self.input_name["runs"]["ignore"] = None
0409 
0410         self.Jsonlumi = False
0411         self.Jsonfilename = "YourJSON.txt"
0412         self.Jsonrunfilename = "YourJSON.txt"
0413         self.todofile = "YourToDofile.txt"
0414 
0415         # If this is true, we're running in `force mode'. In this case
0416         # the sanity checks are performed but failure will not halt
0417         # everything.
0418         self.force_running = None
0419 
0420         # The base path of the output dir in CASTOR.
0421         self.castor_base_dir = None
0422         self.castor_base_dir_default = "/castor/cern.ch/" \
0423                                        "cms/store/temp/" \
0424                                        "dqm/offline/harvesting_output/"
0425 
0426         # The name of the file to be used for book keeping: which
0427         # datasets, runs, etc. we have already processed.
0428         self.book_keeping_file_name = None
0429         self.book_keeping_file_name_default = "harvesting_accounting.txt"
0430 
0431         # The dataset name to reference histogram name mapping is read
0432         # from a text file. The name of this file is kept in the
0433         # following variable.
0434         self.ref_hist_mappings_file_name = None
0435         # And this is the default value.
0436         self.ref_hist_mappings_file_name_default = "harvesting_ref_hist_mappings.txt"
0437 
0438         # Hmmm, hard-coded prefix of the CERN CASTOR area. This is the
0439         # only supported CASTOR area.
0440         # NOTE: Make sure this one starts with a `/'.
0441         self.castor_prefix = "/castor/cern.ch"
0442 
0443         # Normally the central harvesting should be done using the
0444         # `t1access' grid role. To be able to run without T1 access
0445         # the --no-t1access flag can be used. This variable keeps
0446         # track of that special mode.
0447         self.non_t1access = False
0448         self.caf_access = False
0449         self.saveByLumiSection = False
0450         self.crab_submission = False
0451         self.nr_max_sites = 1
0452 
0453         self.preferred_site = "no preference"
0454 
0455         # This will become the list of datasets and runs to consider
0456         self.datasets_to_use = {}
0457         # and this will become the list of datasets and runs to skip.
0458         self.datasets_to_ignore = {}
0459         # This, in turn, will hold all book keeping information.
0460         self.book_keeping_information = {}
0461         # And this is where the dataset name to reference histogram
0462         # name mapping is stored.
0463         self.ref_hist_mappings = {}
0464 
0465         # We're now also allowing run selection. This means we also
0466         # have to keep list of runs requested and vetoed by the user.
0467         self.runs_to_use = {}
0468         self.runs_to_ignore = {}
0469 
0470         # Cache for CMSSW version availability at different sites.
0471         self.sites_and_versions_cache = {}
0472 
0473         # Cache for checked GlobalTags.
0474         self.globaltag_check_cache = []
0475 
0476         # Global flag to see if there were any jobs for which we could
0477         # not find a matching site.
0478         self.all_sites_found = True
0479 
0480         # Helper string centrally defined.
0481         self.no_matching_site_found_str = "no_matching_site_found"
0482 
0483         # Store command line options for later use.
0484         if cmd_line_opts is None:
0485             cmd_line_opts = sys.argv[1:]
0486         self.cmd_line_opts = cmd_line_opts
0487 
0488         # Set up the logger.
0489         log_handler = logging.StreamHandler()
0490         # This is the default log formatter, the debug option switches
0491         # on some more information.
0492         log_formatter = logging.Formatter("%(message)s")
0493         log_handler.setFormatter(log_formatter)
0494         logger = logging.getLogger()
0495         logger.name = "main"
0496         logger.addHandler(log_handler)
0497         self.logger = logger
0498         # The default output mode is quite verbose.
0499         self.set_output_level("NORMAL")
0500 
0501         #logger.debug("Initialized successfully")
0502 
0503         # End of __init__.
0504 
0505     ##########
0506 
0507     def cleanup(self):
0508         "Clean up after ourselves."
0509 
0510         # NOTE: This is the safe replacement of __del__.
0511 
0512         #self.logger.debug("All done -> shutting down")
0513         logging.shutdown()
0514 
0515         # End of cleanup.
0516 
0517     ##########
0518 
0519     def time_stamp(self):
0520         "Create a timestamp to use in the created config files."
0521 
0522         time_now = datetime.datetime.utcnow()
0523         # We don't care about the microseconds.
0524         time_now = time_now.replace(microsecond = 0)
0525         time_stamp = "%sUTC" % datetime.datetime.isoformat(time_now)
0526 
0527         # End of time_stamp.
0528         return time_stamp
0529 
0530     ##########
0531 
0532     def ident_string(self):
0533         "Spit out an identification string for cmsHarvester.py."
0534 
0535         ident_str = "`cmsHarvester.py " \
0536                     "version %s': cmsHarvester.py %s" % \
0537                     (__version__,
0538                      reduce(lambda x, y: x+' '+y, sys.argv[1:]))
0539 
0540         return ident_str
0541 
0542     ##########
0543 
0544     def format_conditions_string(self, globaltag):
0545         """Create the conditions string needed for `cmsDriver'.
0546 
0547         Just glueing the FrontierConditions bit in front of it really.
0548 
0549         """
0550 
0551         # Not very robust but okay. The idea is that if the user
0552         # specified (since this cannot happen with GlobalTags coming
0553         # from DBS) something containing `conditions', they probably
0554         # know what they're doing and we should not muck things up. In
0555         # all other cases we just assume we only received the
0556         # GlobalTag part and we built the usual conditions string from
0557         # that .
0558         if globaltag.lower().find("conditions") > -1:
0559             conditions_string = globaltag
0560         else:
0561             conditions_string = "FrontierConditions_GlobalTag,%s" % \
0562                                 globaltag
0563 
0564         # End of format_conditions_string.
0565         return conditions_string
0566 
0567     ##########
0568 
0569     def db_account_name_cms_cond_globaltag(self):
0570         """Return the database account name used to store the GlobalTag.
0571 
0572         The name of the database account depends (albeit weakly) on
0573         the CMSSW release version.
0574 
0575         """
0576 
0577         # This never changed, unlike the cms_cond_31X_DQM_SUMMARY ->
0578         # cms_cond_34X_DQM transition.
0579         account_name = "CMS_COND_31X_GLOBALTAG"
0580 
0581         # End of db_account_name_cms_cond_globaltag.
0582         return account_name
0583 
0584     ##########
0585 
0586     def db_account_name_cms_cond_dqm_summary(self):
0587         """See db_account_name_cms_cond_globaltag."""
0588 
0589         account_name = None
0590         version = self.cmssw_version[6:11]
0591         if version < "3_4_0":
0592             account_name = "CMS_COND_31X_DQM_SUMMARY"
0593         else:
0594             account_name = "CMS_COND_34X"
0595 
0596         # End of db_account_name_cms_cond_dqm_summary.
0597         return account_name
0598 
0599     ##########
0600 
0601     def config_file_header(self):
0602         "Create a nice header to be used to mark the generated files."
0603 
0604         tmp = []
0605 
0606         time_stamp = self.time_stamp()
0607         ident_str = self.ident_string()
0608         tmp.append("# %s" % time_stamp)
0609         tmp.append("# WARNING: This file was created automatically!")
0610         tmp.append("")
0611         tmp.append("# Created by %s" % ident_str)
0612 
0613         header = "\n".join(tmp)
0614 
0615         # End of config_file_header.
0616         return header
0617 
0618     ##########
0619 
0620     def set_output_level(self, output_level):
0621         """Adjust the level of output generated.
0622 
0623         Choices are:
0624           - normal  : default level of output
0625           - quiet   : less output than the default
0626           - verbose : some additional information
0627           - debug   : lots more information, may be overwhelming
0628 
0629         NOTE: The debug option is a bit special in the sense that it
0630               also modifies the output format.
0631 
0632         """
0633 
0634         # NOTE: These levels are hooked up to the ones used in the
0635         #       logging module.
0636         output_levels = {
0637             "NORMAL"  : logging.INFO,
0638             "QUIET"   : logging.WARNING,
0639             "VERBOSE" : logging.INFO,
0640             "DEBUG"   : logging.DEBUG
0641             }
0642 
0643         output_level = output_level.upper()
0644 
0645         try:
0646             # Update the logger.
0647             self.log_level = output_levels[output_level]
0648             self.logger.setLevel(self.log_level)
0649         except KeyError:
0650             # Show a complaint
0651             self.logger.fatal("Unknown output level `%s'" % ouput_level)
0652             # and re-raise an exception.
0653             raise Exception
0654 
0655         # End of set_output_level.
0656 
0657     ##########
0658 
0659     def option_handler_debug(self, option, opt_str, value, parser):
0660         """Switch to debug mode.
0661 
0662         This both increases the amount of output generated, as well as
0663         changes the format used (more detailed information is given).
0664 
0665         """
0666 
0667         # Switch to a more informative log formatter for debugging.
0668         log_formatter_debug = logging.Formatter("[%(levelname)s] " \
0669                                                 # NOTE: funcName was
0670                                                 # only implemented
0671                                                 # starting with python
0672                                                 # 2.5.
0673                                                 #"%(funcName)s() " \
0674                                                 #"@%(filename)s:%(lineno)d " \
0675                                                 "%(message)s")
0676         # Hmmm, not very nice. This assumes there's only a single
0677         # handler associated with the current logger.
0678         log_handler = self.logger.handlers[0]
0679         log_handler.setFormatter(log_formatter_debug)
0680         self.set_output_level("DEBUG")
0681 
0682         # End of option_handler_debug.
0683 
0684     ##########
0685 
0686     def option_handler_quiet(self, option, opt_str, value, parser):
0687         "Switch to quiet mode: less verbose."
0688 
0689         self.set_output_level("QUIET")
0690 
0691         # End of option_handler_quiet.
0692 
0693     ##########
0694 
0695     def option_handler_force(self, option, opt_str, value, parser):
0696         """Switch on `force mode' in which case we don't brake for nobody.
0697 
0698         In so-called `force mode' all sanity checks are performed but
0699         we don't halt on failure. Of course this requires some care
0700         from the user.
0701 
0702         """
0703 
0704         self.logger.debug("Switching on `force mode'.")
0705         self.force_running = True
0706 
0707         # End of option_handler_force.
0708 
0709     ##########
0710 
0711     def option_handler_harvesting_type(self, option, opt_str, value, parser):
0712         """Set the harvesting type to be used.
0713 
0714         This checks that no harvesting type is already set, and sets
0715         the harvesting type to be used to the one specified. If a
0716         harvesting type is already set an exception is thrown. The
0717         same happens when an unknown type is specified.
0718 
0719         """
0720 
0721         # Check for (in)valid harvesting types.
0722         # NOTE: The matching is done in a bit of a complicated
0723         # way. This allows the specification of the type to be
0724         # case-insensitive while still ending up with the properly
0725         # `cased' version afterwards.
0726         value = value.lower()
0727         harvesting_types_lowered = [i.lower() for i in self.harvesting_types]
0728         try:
0729             type_index = harvesting_types_lowered.index(value)
0730             # If this works, we now have the index to the `properly
0731             # cased' version of the harvesting type.
0732         except ValueError:
0733             self.logger.fatal("Unknown harvesting type `%s'" % \
0734                               value)
0735             self.logger.fatal("  possible types are: %s" %
0736                               ", ".join(self.harvesting_types))
0737             raise Usage("Unknown harvesting type `%s'" % \
0738                         value)
0739 
0740         # Check if multiple (by definition conflicting) harvesting
0741         # types are being specified.
0742         if not self.harvesting_type is None:
0743             msg = "Only one harvesting type should be specified"
0744             self.logger.fatal(msg)
0745             raise Usage(msg)
0746         self.harvesting_type = self.harvesting_types[type_index]
0747 
0748         self.logger.info("Harvesting type to be used: `%s'" % \
0749                          self.harvesting_type)
0750 
0751         # End of option_handler_harvesting_type.
0752 
0753     ##########
0754 
0755     def option_handler_harvesting_mode(self, option, opt_str, value, parser):
0756         """Set the harvesting mode to be used.
0757 
0758         Single-step harvesting can be used for samples that are
0759         located completely at a single site (= SE). Otherwise use
0760         two-step mode.
0761 
0762         """
0763 
0764         # Check for valid mode.
0765         harvesting_mode = value.lower()
0766         if not harvesting_mode in self.harvesting_modes:
0767             msg = "Unknown harvesting mode `%s'" % harvesting_mode
0768             self.logger.fatal(msg)
0769             self.logger.fatal("  possible modes are: %s" % \
0770                               ", ".join(self.harvesting_modes))
0771             raise Usage(msg)
0772 
0773         # Check if we've been given only a single mode, otherwise
0774         # complain.
0775         if not self.harvesting_mode is None:
0776             msg = "Only one harvesting mode should be specified"
0777             self.logger.fatal(msg)
0778             raise Usage(msg)
0779         self.harvesting_mode = harvesting_mode
0780 
0781         self.logger.info("Harvesting mode to be used: `%s'" % \
0782                          self.harvesting_mode)
0783 
0784         # End of option_handler_harvesting_mode.
0785 
0786     ##########
0787 
0788     def option_handler_globaltag(self, option, opt_str, value, parser):
0789         """Set the GlobalTag to be used, overriding our own choices.
0790 
0791         By default the cmsHarvester will use the GlobalTag with which
0792         a given dataset was created also for the harvesting. The
0793         --globaltag option is the way to override this behaviour.
0794 
0795         """
0796 
0797         # Make sure that this flag only occurred once.
0798         if not self.globaltag is None:
0799             msg = "Only one GlobalTag should be specified"
0800             self.logger.fatal(msg)
0801             raise Usage(msg)
0802         self.globaltag = value
0803 
0804         self.logger.info("GlobalTag to be used: `%s'" % \
0805                          self.globaltag)
0806 
0807         # End of option_handler_globaltag.
0808 
0809     ##########
0810 
0811     def option_handler_no_ref_hists(self, option, opt_str, value, parser):
0812         "Switch use of all reference histograms off."
0813 
0814         self.use_ref_hists = False
0815 
0816         self.logger.warning("Switching off all use of reference histograms")
0817 
0818         # End of option_handler_no_ref_hists.
0819 
0820      ##########
0821 
0822     def option_handler_frontier_connection(self, option, opt_str,
0823                                            value, parser):
0824         """Override the default Frontier connection string.
0825 
0826         Please only use this for testing (e.g. when a test payload has
0827         been inserted into cms_orc_off instead of cms_orc_on).
0828 
0829         This method gets called for three different command line
0830         options:
0831         - --frontier-connection,
0832         - --frontier-connection-for-globaltag,
0833         - --frontier-connection-for-refhists.
0834         Appropriate care has to be taken to make sure things are only
0835         specified once.
0836 
0837         """
0838 
0839         # Figure out with which command line option we've been called.
0840         frontier_type = opt_str.split("-")[-1]
0841         if frontier_type == "connection":
0842             # Main option: change all connection strings.
0843             frontier_types = self.frontier_connection_name.keys()
0844         else:
0845             frontier_types = [frontier_type]
0846 
0847         # Make sure that each Frontier connection is specified only
0848         # once. (Okay, in a bit of a dodgy way...)
0849         for connection_name in frontier_types:
0850             if self.frontier_connection_overridden[connection_name] == True:
0851                 msg = "Please specify either:\n" \
0852                       "  `--frontier-connection' to change the " \
0853                       "Frontier connection used for everything, or\n" \
0854                       "either one or both of\n" \
0855                       "  `--frontier-connection-for-globaltag' to " \
0856                       "change the Frontier connection used for the " \
0857                       "GlobalTag and\n" \
0858                       "  `--frontier-connection-for-refhists' to change " \
0859                       "the Frontier connection used for the " \
0860                       "reference histograms."
0861                 self.logger.fatal(msg)
0862                 raise Usage(msg)
0863 
0864         frontier_prefix = "frontier://"
0865         if not value.startswith(frontier_prefix):
0866             msg = "Expecting Frontier connections to start with " \
0867                   "`%s'. You specified `%s'." % \
0868                   (frontier_prefix, value)
0869             self.logger.fatal(msg)
0870             raise Usage(msg)
0871         # We also kind of expect this to be either FrontierPrep or
0872         # FrontierProd (but this is just a warning).
0873         if value.find("FrontierProd") < 0 and \
0874                value.find("FrontierProd") < 0:
0875             msg = "Expecting Frontier connections to contain either " \
0876                   "`FrontierProd' or `FrontierPrep'. You specified " \
0877                   "`%s'. Are you sure?" % \
0878                   value
0879             self.logger.warning(msg)
0880 
0881         if not value.endswith("/"):
0882             value += "/"
0883 
0884         for connection_name in frontier_types:
0885             self.frontier_connection_name[connection_name] = value
0886             self.frontier_connection_overridden[connection_name] = True
0887 
0888             frontier_type_str = "unknown"
0889             if connection_name == "globaltag":
0890                 frontier_type_str = "the GlobalTag"
0891             elif connection_name == "refhists":
0892                 frontier_type_str = "the reference histograms"
0893 
0894             self.logger.warning("Overriding default Frontier " \
0895                                 "connection for %s " \
0896                                 "with `%s'" % \
0897                                 (frontier_type_str,
0898                                  self.frontier_connection_name[connection_name]))
0899 
0900         # End of option_handler_frontier_connection
0901 
0902     ##########
0903 
0904     def option_handler_input_todofile(self, option, opt_str, value, parser):
0905 
0906         self.todofile = value
0907         # End of option_handler_input_todofile.
0908 
0909     ##########
0910 
0911     def option_handler_input_Jsonfile(self, option, opt_str, value, parser):
0912 
0913         self.Jsonfilename = value
0914         # End of option_handler_input_Jsonfile.
0915 
0916     ##########
0917 
0918     def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser):
0919 
0920         self.Jsonrunfilename = value
0921         # End of option_handler_input_Jsonrunfile.
0922 
0923     ##########
0924 
0925     def option_handler_input_spec(self, option, opt_str, value, parser):
0926         """TODO TODO TODO
0927         Document this...
0928 
0929         """
0930 
0931         # Figure out if we were called for the `use these' or the
0932         # `ignore these' case.
0933         if opt_str.lower().find("ignore") > -1:
0934             spec_type = "ignore"
0935         else:
0936             spec_type = "use"
0937 
0938         # Similar: are we being called for datasets or for runs?
0939         if opt_str.lower().find("dataset") > -1:
0940             select_type = "datasets"
0941         else:
0942             select_type = "runs"
0943 
0944         if not self.input_method[select_type][spec_type] is None:
0945             msg = "Please only specify one input method " \
0946                   "(for the `%s' case)" % opt_str
0947             self.logger.fatal(msg)
0948             raise Usage(msg)
0949 
0950         input_method = opt_str.replace("-", "").replace("ignore", "")
0951         self.input_method[select_type][spec_type] = input_method
0952         self.input_name[select_type][spec_type] = value
0953 
0954         self.logger.debug("Input method for the `%s' case: %s" % \
0955                           (spec_type, input_method))
0956 
0957         # End of option_handler_input_spec
0958 
0959     ##########
0960 
0961     def option_handler_book_keeping_file(self, option, opt_str, value, parser):
0962         """Store the name of the file to be used for book keeping.
0963 
0964         The only check done here is that only a single book keeping
0965         file is specified.
0966 
0967         """
0968 
0969         file_name = value
0970 
0971         if not self.book_keeping_file_name is None:
0972             msg = "Only one book keeping file should be specified"
0973             self.logger.fatal(msg)
0974             raise Usage(msg)
0975         self.book_keeping_file_name = file_name
0976 
0977         self.logger.info("Book keeping file to be used: `%s'" % \
0978                          self.book_keeping_file_name)
0979 
0980         # End of option_handler_book_keeping_file.
0981 
0982     ##########
0983 
0984     def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser):
0985         """Store the name of the file for the ref. histogram mapping.
0986 
0987         """
0988 
0989         file_name = value
0990 
0991         if not self.ref_hist_mappings_file_name is None:
0992             msg = "Only one reference histogram mapping file " \
0993                   "should be specified"
0994             self.logger.fatal(msg)
0995             raise Usage(msg)
0996         self.ref_hist_mappings_file_name = file_name
0997 
0998         self.logger.info("Reference histogram mapping file " \
0999                          "to be used: `%s'" % \
1000                          self.ref_hist_mappings_file_name)
1001 
1002         # End of option_handler_ref_hist_mapping_file.
1003 
1004     ##########
1005 
1006     # OBSOLETE OBSOLETE OBSOLETE
1007 
1008 ##    def option_handler_dataset_name(self, option, opt_str, value, parser):
1009 ##        """Specify the name(s) of the dataset(s) to be processed.
1010 
1011 ##        It is checked to make sure that no dataset name or listfile
1012 ##        names are given yet. If all is well (i.e. we still have a
1013 ##        clean slate) the dataset name is stored for later use,
1014 ##        otherwise a Usage exception is raised.
1015 
1016 ##        """
1017 
1018 ##        if not self.input_method is None:
1019 ##            if self.input_method == "dataset":
1020 ##                raise Usage("Please only feed me one dataset specification")
1021 ##            elif self.input_method == "listfile":
1022 ##                raise Usage("Cannot specify both dataset and input list file")
1023 ##            else:
1024 ##                assert False, "Unknown input method `%s'" % self.input_method
1025 ##        self.input_method = "dataset"
1026 ##        self.input_name = value
1027 ##        self.logger.info("Input method used: %s" % self.input_method)
1028 
1029 ##        # End of option_handler_dataset_name.
1030 
1031 ##    ##########
1032 
1033 ##    def option_handler_listfile_name(self, option, opt_str, value, parser):
1034 ##        """Specify the input list file containing datasets to be processed.
1035 
1036 ##        It is checked to make sure that no dataset name or listfile
1037 ##        names are given yet. If all is well (i.e. we still have a
1038 ##        clean slate) the listfile name is stored for later use,
1039 ##        otherwise a Usage exception is raised.
1040 
1041 ##        """
1042 
1043 ##        if not self.input_method is None:
1044 ##            if self.input_method == "listfile":
1045 ##                raise Usage("Please only feed me one list file")
1046 ##            elif self.input_method == "dataset":
1047 ##                raise Usage("Cannot specify both dataset and input list file")
1048 ##            else:
1049 ##                assert False, "Unknown input method `%s'" % self.input_method
1050 ##        self.input_method = "listfile"
1051 ##        self.input_name = value
1052 ##        self.logger.info("Input method used: %s" % self.input_method)
1053 
1054 ##        # End of option_handler_listfile_name.
1055 
1056     # OBSOLETE OBSOLETE OBSOLETE end
1057 
1058     ##########
1059 
1060     def option_handler_castor_dir(self, option, opt_str, value, parser):
1061         """Specify where on CASTOR the output should go.
1062 
1063         At the moment only output to CERN CASTOR is
1064         supported. Eventually the harvested results should go into the
1065         central place for DQM on CASTOR anyway.
1066 
1067         """
1068 
1069         # Check format of specified CASTOR area.
1070         castor_dir = value
1071         #castor_dir = castor_dir.lstrip(os.path.sep)
1072         castor_prefix = self.castor_prefix
1073 
1074         # Add a leading slash if necessary and clean up the path.
1075         castor_dir = os.path.join(os.path.sep, castor_dir)
1076         self.castor_base_dir = os.path.normpath(castor_dir)
1077 
1078         self.logger.info("CASTOR (base) area to be used: `%s'" % \
1079                          self.castor_base_dir)
1080 
1081         # End of option_handler_castor_dir.
1082 
1083     ##########
1084 
1085     def option_handler_no_t1access(self, option, opt_str, value, parser):
1086         """Set the self.no_t1access flag to try and create jobs that
1087         run without special `t1access' role.
1088 
1089         """
1090 
1091         self.non_t1access = True
1092 
1093         self.logger.warning("Running in `non-t1access' mode. " \
1094                             "Will try to create jobs that run " \
1095                             "without special rights but no " \
1096                             "further promises...")
1097 
1098         # End of option_handler_no_t1access.
1099 
1100     ##########
1101 
1102     def option_handler_caf_access(self, option, opt_str, value, parser):
1103         """Set the self.caf_access flag to try and create jobs that
1104         run on the CAF.
1105 
1106         """
1107         self.caf_access = True
1108 
1109         self.logger.warning("Running in `caf_access' mode. " \
1110                             "Will try to create jobs that run " \
1111                             "on CAF but no" \
1112                             "further promises...")
1113 
1114         # End of option_handler_caf_access.
1115 
1116    ##########
1117 
1118     def option_handler_saveByLumiSection(self, option, opt_str, value, parser):
1119         """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1120         """
1121         self.saveByLumiSection = True
1122 
1123         self.logger.warning("waning concerning saveByLumiSection option")
1124 
1125         # End of option_handler_saveByLumiSection.
1126 
1127 
1128     ##########
1129 
1130     def option_handler_crab_submission(self, option, opt_str, value, parser):
1131         """Crab jobs are not created and
1132         "submitted automatically",
1133         """
1134         self.crab_submission = True
1135 
1136         # End of option_handler_crab_submission.
1137 
1138     ##########
1139 
1140     def option_handler_sites(self, option, opt_str, value, parser):
1141 
1142         self.nr_max_sites = value
1143 
1144     ##########
1145 
1146     def option_handler_preferred_site(self, option, opt_str, value, parser):
1147 
1148         self.preferred_site = value
1149 
1150     ##########
1151 
1152     def option_handler_list_types(self, option, opt_str, value, parser):
1153         """List all harvesting types and their mappings.
1154 
1155         This lists all implemented harvesting types with their
1156         corresponding mappings to sequence names. This had to be
1157         separated out from the help since it depends on the CMSSW
1158         version and was making things a bit of a mess.
1159 
1160         NOTE: There is no way (at least not that I could come up with)
1161         to code this in a neat generic way that can be read both by
1162         this method and by setup_harvesting_info(). Please try hard to
1163         keep these two methods in sync!
1164 
1165         """
1166 
1167         sep_line = "-" * 50
1168         sep_line_short = "-" * 20
1169 
1170         print(sep_line)
1171         print("The following harvesting types are available:")
1172         print(sep_line)
1173 
1174         print("`RelVal' maps to:")
1175         print("  pre-3_3_0           : HARVESTING:validationHarvesting")
1176         print("  3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting")
1177         print("  Exceptions:")
1178         print("    3_3_0_pre1-4        : HARVESTING:validationHarvesting")
1179         print("    3_3_0_pre6          : HARVESTING:validationHarvesting")
1180         print("    3_4_0_pre1          : HARVESTING:validationHarvesting")
1181 
1182         print(sep_line_short)
1183 
1184         print("`RelValFS' maps to:")
1185         print("  always              : HARVESTING:validationHarvestingFS")
1186 
1187         print(sep_line_short)
1188 
1189         print("`MC' maps to:")
1190         print("    always          : HARVESTING:validationprodHarvesting")
1191 
1192         print(sep_line_short)
1193 
1194         print("`DQMOffline' maps to:")
1195         print("  always              : HARVESTING:dqmHarvesting")
1196 
1197         print(sep_line)
1198 
1199         # We're done, let's quit. (This is the same thing optparse
1200         # does after printing the help.)
1201         raise SystemExit
1202 
1203         # End of option_handler_list_types.
1204 
1205     ##########
1206 
1207     def setup_harvesting_info(self):
1208         """Fill our dictionary with all info needed to understand
1209         harvesting.
1210 
1211         This depends on the CMSSW version since at some point the
1212         names and sequences were modified.
1213 
1214         NOTE: There is no way (at least not that I could come up with)
1215         to code this in a neat generic way that can be read both by
1216         this method and by option_handler_list_types(). Please try
1217         hard to keep these two methods in sync!
1218 
1219         """
1220 
1221         assert not self.cmssw_version is None, \
1222                "ERROR setup_harvesting() requires " \
1223                "self.cmssw_version to be set!!!"
1224 
1225         harvesting_info = {}
1226 
1227         # This is the version-independent part.
1228         harvesting_info["DQMOffline"] = {}
1229         harvesting_info["DQMOffline"]["beamspot"] = None
1230         harvesting_info["DQMOffline"]["eventcontent"] = None
1231         harvesting_info["DQMOffline"]["harvesting"] = "AtRunEnd"
1232 
1233         harvesting_info["RelVal"] = {}
1234         harvesting_info["RelVal"]["beamspot"] = None
1235         harvesting_info["RelVal"]["eventcontent"] = None
1236         harvesting_info["RelVal"]["harvesting"] = "AtRunEnd"
1237 
1238         harvesting_info["RelValFS"] = {}
1239         harvesting_info["RelValFS"]["beamspot"] = None
1240         harvesting_info["RelValFS"]["eventcontent"] = None
1241         harvesting_info["RelValFS"]["harvesting"] = "AtRunEnd"
1242 
1243         harvesting_info["MC"] = {}
1244         harvesting_info["MC"]["beamspot"] = None
1245         harvesting_info["MC"]["eventcontent"] = None
1246         harvesting_info["MC"]["harvesting"] = "AtRunEnd"
1247 
1248         # This is the version-dependent part. And I know, strictly
1249         # speaking it's not necessary to fill in all three types since
1250         # in a single run we'll only use one type anyway. This does
1251         # look more readable, however, and required less thought from
1252         # my side when I put this together.
1253 
1254         # DEBUG DEBUG DEBUG
1255         # Check that we understand our own version naming.
1256         assert self.cmssw_version.startswith("CMSSW_")
1257         # DEBUG DEBUG DEBUG end
1258 
1259         version = self.cmssw_version[6:]
1260 
1261         #----------
1262 
1263         # RelVal
1264         step_string = None
1265         if version < "3_3_0":
1266             step_string = "validationHarvesting"
1267         elif version in ["3_3_0_pre1", "3_3_0_pre2",
1268                          "3_3_0_pre3", "3_3_0_pre4",
1269                          "3_3_0_pre6", "3_4_0_pre1"]:
1270             step_string = "validationHarvesting"
1271         else:
1272             step_string = "validationHarvesting+dqmHarvesting"
1273 
1274         harvesting_info["RelVal"]["step_string"] = step_string
1275 
1276         # DEBUG DEBUG DEBUG
1277         # Let's make sure we found something.
1278         assert not step_string is None, \
1279                "ERROR Could not decide a RelVal harvesting sequence " \
1280                "for CMSSW version %s" % self.cmssw_version
1281         # DEBUG DEBUG DEBUG end
1282 
1283         #----------
1284 
1285         # RelVal
1286         step_string = "validationHarvestingFS"
1287 
1288         harvesting_info["RelValFS"]["step_string"] = step_string
1289 
1290         #----------
1291 
1292         # MC
1293         step_string = "validationprodHarvesting"
1294 
1295         harvesting_info["MC"]["step_string"] = step_string
1296 
1297         # DEBUG DEBUG DEBUG
1298         # Let's make sure we found something.
1299         assert not step_string is None, \
1300                "ERROR Could not decide a MC harvesting " \
1301                "sequence for CMSSW version %s" % self.cmssw_version
1302         # DEBUG DEBUG DEBUG end
1303 
1304         #----------
1305 
1306         # DQMOffline
1307         step_string = "dqmHarvesting"
1308 
1309         harvesting_info["DQMOffline"]["step_string"] = step_string
1310 
1311         #----------
1312 
1313         self.harvesting_info = harvesting_info
1314 
1315         self.logger.info("Based on the CMSSW version (%s) " \
1316                          "I decided to use the `HARVESTING:%s' " \
1317                          "sequence for %s harvesting" % \
1318                          (self.cmssw_version,
1319                           self.harvesting_info[self.harvesting_type]["step_string"],
1320                           self.harvesting_type))
1321 
1322         # End of setup_harvesting_info.
1323 
1324     ##########
1325 
1326     def create_castor_path_name_common(self, dataset_name):
1327         """Build the common part of the output path to be used on
1328         CASTOR.
1329 
1330         This consists of the CASTOR area base path specified by the
1331         user and a piece depending on the data type (data vs. MC), the
1332         harvesting type and the dataset name followed by a piece
1333         containing the run number and event count. (See comments in
1334         create_castor_path_name_special for details.) This method
1335         creates the common part, without run number and event count.
1336 
1337         """
1338 
1339         castor_path = self.castor_base_dir
1340 
1341         ###
1342 
1343         # The data type: data vs. mc.
1344         datatype = self.datasets_information[dataset_name]["datatype"]
1345         datatype = datatype.lower()
1346         castor_path = os.path.join(castor_path, datatype)
1347 
1348         # The harvesting type.
1349         harvesting_type = self.harvesting_type
1350         harvesting_type = harvesting_type.lower()
1351         castor_path = os.path.join(castor_path, harvesting_type)
1352 
1353         # The CMSSW release version (only the `digits'). Note that the
1354         # CMSSW version used here is the version used for harvesting,
1355         # not the one from the dataset. This does make the results
1356         # slightly harder to find. On the other hand it solves
1357         # problems in case one re-harvests a given dataset with a
1358         # different CMSSW version, which would lead to ambiguous path
1359         # names. (Of course for many cases the harvesting is done with
1360         # the same CMSSW version the dataset was created with.)
1361         release_version = self.cmssw_version
1362         release_version = release_version.lower(). \
1363                           replace("cmssw", ""). \
1364                           strip("_")
1365         castor_path = os.path.join(castor_path, release_version)
1366 
1367         # The dataset name.
1368         dataset_name_escaped = self.escape_dataset_name(dataset_name)
1369         castor_path = os.path.join(castor_path, dataset_name_escaped)
1370 
1371         ###
1372 
1373         castor_path = os.path.normpath(castor_path)
1374 
1375         # End of create_castor_path_name_common.
1376         return castor_path
1377 
1378     ##########
1379 
1380     def create_castor_path_name_special(self,
1381                                         dataset_name, run_number,
1382                                         castor_path_common):
1383         """Create the specialised part of the CASTOR output dir name.
1384 
1385         NOTE: To avoid clashes with `incremental harvesting'
1386         (re-harvesting when a dataset grows) we have to include the
1387         event count in the path name. The underlying `problem' is that
1388         CRAB does not overwrite existing output files so if the output
1389         file already exists CRAB will fail to copy back the output.
1390 
1391         NOTE: It's not possible to create different kinds of
1392         harvesting jobs in a single call to this tool. However, in
1393         principle it could be possible to create both data and MC jobs
1394         in a single go.
1395 
1396         NOTE: The number of events used in the path name is the
1397         _total_ number of events in the dataset/run at the time of
1398         harvesting. If we're doing partial harvesting the final
1399         results will reflect lower statistics. This is a) the easiest
1400         to code and b) the least likely to lead to confusion if
1401         someone ever decides to swap/copy around file blocks between
1402         sites.
1403 
1404         """
1405 
1406         castor_path = castor_path_common
1407 
1408         ###
1409 
1410         # The run number part.
1411         castor_path = os.path.join(castor_path, "run_%d" % run_number)
1412 
1413         ###
1414 
1415         # The event count (i.e. the number of events we currently see
1416         # for this dataset).
1417         #nevents = self.datasets_information[dataset_name] \
1418         #          ["num_events"][run_number]
1419         castor_path = os.path.join(castor_path, "nevents")
1420 
1421         ###
1422 
1423         castor_path = os.path.normpath(castor_path)
1424 
1425         # End of create_castor_path_name_special.
1426         return castor_path
1427 
1428     ##########
1429 
1430     def create_and_check_castor_dirs(self):
1431         """Make sure all required CASTOR output dirs exist.
1432 
1433         This checks the CASTOR base dir specified by the user as well
1434         as all the subdirs required by the current set of jobs.
1435 
1436         """
1437 
1438         self.logger.info("Checking (and if necessary creating) CASTOR " \
1439                          "output area(s)...")
1440 
1441         # Call the real checker method for the base dir.
1442         self.create_and_check_castor_dir(self.castor_base_dir)
1443 
1444         # Now call the checker for all (unique) subdirs.
1445         castor_dirs = []
1446         for (dataset_name, runs) in self.datasets_to_use.items():
1447 
1448             for run in runs:
1449                 castor_dirs.append(self.datasets_information[dataset_name] \
1450                                    ["castor_path"][run])
1451         castor_dirs_unique = sorted(set(castor_dirs))
1452         # This can take some time. E.g. CRAFT08 has > 300 runs, each
1453         # of which will get a new directory. So we show some (rough)
1454         # info in between.
1455         ndirs = len(castor_dirs_unique)
1456         step = max(ndirs / 10, 1)
1457         for (i, castor_dir) in enumerate(castor_dirs_unique):
1458             if (i + 1) % step == 0 or \
1459                    (i + 1) == ndirs:
1460                 self.logger.info("  %d/%d" % \
1461                                  (i + 1, ndirs))
1462             self.create_and_check_castor_dir(castor_dir)
1463 
1464             # Now check if the directory is empty. If (an old version
1465             # of) the output file already exists CRAB will run new
1466             # jobs but never copy the results back. We assume the user
1467             # knows what they are doing and only issue a warning in
1468             # case the directory is not empty.
1469             self.logger.debug("Checking if path `%s' is empty" % \
1470                               castor_dir)
1471             cmd = "rfdir %s" % castor_dir
1472             (status, output) = subprocess.getstatusoutput(cmd)
1473             if status != 0:
1474                 msg = "Could not access directory `%s'" \
1475                       " !!! This is bad since I should have just" \
1476                       " created it !!!" % castor_dir
1477                 self.logger.fatal(msg)
1478                 raise Error(msg)
1479             if len(output) > 0:
1480                 self.logger.warning("Output directory `%s' is not empty:" \
1481                                     " new jobs will fail to" \
1482                                     " copy back output" % \
1483                                     castor_dir)
1484 
1485         # End of create_and_check_castor_dirs.
1486 
1487     ##########
1488 
1489     def create_and_check_castor_dir(self, castor_dir):
1490         """Check existence of the give CASTOR dir, if necessary create
1491         it.
1492 
1493         Some special care has to be taken with several things like
1494         setting the correct permissions such that CRAB can store the
1495         output results. Of course this means that things like
1496         /castor/cern.ch/ and user/j/ have to be recognised and treated
1497         properly.
1498 
1499         NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1500         the moment.
1501 
1502         NOTE: This method uses some slightly tricky caching to make
1503         sure we don't keep over and over checking the same base paths.
1504 
1505         """
1506 
1507         ###
1508 
1509         # Local helper function to fully split a path into pieces.
1510         def split_completely(path):
1511             (parent_path, name) = os.path.split(path)
1512             if name == "":
1513                 return (parent_path, )
1514             else:
1515                 return split_completely(parent_path) + (name, )
1516 
1517         ###
1518 
1519         # Local helper function to check rfio (i.e. CASTOR)
1520         # directories.
1521         def extract_permissions(rfstat_output):
1522             """Parse the output from rfstat and return the
1523             5-digit permissions string."""
1524 
1525             permissions_line = [i for i in output.split("\n") \
1526                                 if i.lower().find("protection") > -1]
1527             regexp = re.compile(".*\(([0123456789]{5})\).*")
1528             match = regexp.search(rfstat_output)
1529             if not match or len(match.groups()) != 1:
1530                 msg = "Could not extract permissions " \
1531                       "from output: %s" % rfstat_output
1532                 self.logger.fatal(msg)
1533                 raise Error(msg)
1534             permissions = match.group(1)
1535 
1536             # End of extract_permissions.
1537             return permissions
1538 
1539         ###
1540 
1541         # These are the pieces of CASTOR directories that we do not
1542         # want to touch when modifying permissions.
1543 
1544         # NOTE: This is all a bit involved, basically driven by the
1545         # fact that one wants to treat the `j' directory of
1546         # `/castor/cern.ch/user/j/jhegeman/' specially.
1547         # BUG BUG BUG
1548         # This should be simplified, for example by comparing to the
1549         # CASTOR prefix or something like that.
1550         # BUG BUG BUG end
1551         castor_paths_dont_touch = {
1552             0: ["/", "castor", "cern.ch", "cms", "store", "temp",
1553                 "dqm", "offline", "user"],
1554             -1: ["user", "store"]
1555             }
1556 
1557         self.logger.debug("Checking CASTOR path `%s'" % castor_dir)
1558 
1559         ###
1560 
1561         # First we take the full CASTOR path apart.
1562         castor_path_pieces = split_completely(castor_dir)
1563 
1564         # Now slowly rebuild the CASTOR path and see if a) all
1565         # permissions are set correctly and b) the final destination
1566         # exists.
1567         path = ""
1568         check_sizes = sorted(castor_paths_dont_touch.keys())
1569         len_castor_path_pieces = len(castor_path_pieces)
1570         for piece_index in range (len_castor_path_pieces):
1571             skip_this_path_piece = False
1572             piece = castor_path_pieces[piece_index]
1573 ##            self.logger.debug("Checking CASTOR path piece `%s'" % \
1574 ##                              piece)
1575             for check_size in check_sizes:
1576                 # Do we need to do anything with this?
1577                 if (piece_index + check_size) > -1:
1578 ##                    self.logger.debug("Checking `%s' against `%s'" % \
1579 ##                                      (castor_path_pieces[piece_index + check_size],
1580 ##                                       castor_paths_dont_touch[check_size]))
1581                     if castor_path_pieces[piece_index + check_size] in castor_paths_dont_touch[check_size]:
1582 ##                        self.logger.debug("  skipping")
1583                         skip_this_path_piece = True
1584 ##                    else:
1585 ##                        # Piece not in the list, fine.
1586 ##                        self.logger.debug("  accepting")
1587             # Add piece to the path we're building.
1588 ##            self.logger.debug("!!! Skip path piece `%s'? %s" % \
1589 ##                              (piece, str(skip_this_path_piece)))
1590 ##            self.logger.debug("Adding piece to path...")
1591             path = os.path.join(path, piece)
1592 ##            self.logger.debug("Path is now `%s'" % \
1593 ##                              path)
1594 
1595             # Hmmmm, only at this point can we do some caching. Not
1596             # ideal, but okay.
1597             try:
1598                 if path in self.castor_path_checks_cache:
1599                     continue
1600             except AttributeError:
1601                 # This only happens the first time around.
1602                 self.castor_path_checks_cache = []
1603             self.castor_path_checks_cache.append(path)
1604 
1605             # Now, unless we're supposed to skip this piece of the
1606             # path, let's make sure it exists and set the permissions
1607             # correctly for use by CRAB. This means that:
1608             # - the final output directory should (at least) have
1609             #   permissions 775
1610             # - all directories above that should (at least) have
1611             #   permissions 755.
1612 
1613             # BUT: Even though the above permissions are the usual
1614             # ones to used when setting up CASTOR areas for grid job
1615             # output, there is one caveat in case multiple people are
1616             # working in the same CASTOR area. If user X creates
1617             # /a/b/c/ and user Y wants to create /a/b/d/ he/she does
1618             # not have sufficient rights. So: we set all dir
1619             # permissions to 775 to avoid this.
1620 
1621             if not skip_this_path_piece:
1622 
1623                 # Ok, first thing: let's make sure this directory
1624                 # exists.
1625                 # NOTE: The nice complication is of course that the
1626                 # usual os.path.isdir() etc. methods don't work for an
1627                 # rfio filesystem. So we call rfstat and interpret an
1628                 # error as meaning that the path does not exist.
1629                 self.logger.debug("Checking if path `%s' exists" % \
1630                                   path)
1631                 cmd = "rfstat %s" % path
1632                 (status, output) = subprocess.getstatusoutput(cmd)
1633                 if status != 0:
1634                     # Path does not exist, let's try and create it.
1635                     self.logger.debug("Creating path `%s'" % path)
1636                     cmd = "nsmkdir -m 775 %s" % path
1637                     (status, output) = subprocess.getstatusoutput(cmd)
1638                     if status != 0:
1639                         msg = "Could not create directory `%s'" % path
1640                         self.logger.fatal(msg)
1641                         raise Error(msg)
1642                     cmd = "rfstat %s" % path
1643                     (status, output) = subprocess.getstatusoutput(cmd)
1644                 # Now check that it looks like a directory. If I'm not
1645                 # mistaken one can deduce this from the fact that the
1646                 # (octal) permissions string starts with `40' (instead
1647                 # of `100').
1648                 permissions = extract_permissions(output)
1649                 if not permissions.startswith("40"):
1650                     msg = "Path `%s' is not a directory(?)" % path
1651                     self.logger.fatal(msg)
1652                     raise Error(msg)
1653 
1654                 # Figure out the current permissions for this
1655                 # (partial) path.
1656                 self.logger.debug("Checking permissions for path `%s'" % path)
1657                 cmd = "rfstat %s" % path
1658                 (status, output) = subprocess.getstatusoutput(cmd)
1659                 if status != 0:
1660                     msg = "Could not obtain permissions for directory `%s'" % \
1661                           path
1662                     self.logger.fatal(msg)
1663                     raise Error(msg)
1664                 # Take the last three digits of the permissions.
1665                 permissions = extract_permissions(output)[-3:]
1666 
1667                 # Now if necessary fix permissions.
1668                 # NOTE: Be careful never to `downgrade' permissions.
1669                 if piece_index == (len_castor_path_pieces - 1):
1670                     # This means we're looking at the final
1671                     # destination directory.
1672                     permissions_target = "775"
1673                 else:
1674                     # `Only' an intermediate directory.
1675                     permissions_target = "775"
1676 
1677                 # Compare permissions.
1678                 permissions_new = []
1679                 for (i, j) in zip(permissions, permissions_target):
1680                     permissions_new.append(str(max(int(i), int(j))))
1681                 permissions_new = "".join(permissions_new)
1682                 self.logger.debug("  current permissions: %s" % \
1683                                   permissions)
1684                 self.logger.debug("  target permissions : %s" % \
1685                                   permissions_target)
1686                 if permissions_new != permissions:
1687                     # We have to modify the permissions.
1688                     self.logger.debug("Changing permissions of `%s' " \
1689                                       "to %s (were %s)" % \
1690                                       (path, permissions_new, permissions))
1691                     cmd = "rfchmod %s %s" % (permissions_new, path)
1692                     (status, output) = subprocess.getstatusoutput(cmd)
1693                     if status != 0:
1694                         msg = "Could not change permissions for path `%s' " \
1695                               "to %s" % (path, permissions_new)
1696                         self.logger.fatal(msg)
1697                         raise Error(msg)
1698 
1699                 self.logger.debug("  Permissions ok (%s)" % permissions_new)
1700 
1701         # End of create_and_check_castor_dir.
1702 
1703     ##########
1704 
1705     def pick_a_site(self, sites, cmssw_version):
1706 
1707         # Create list of forbidden sites
1708         sites_forbidden = []
1709 
1710         if (self.preferred_site == "CAF") or (self.preferred_site == "caf.cern.ch"):
1711             self.caf_access = True
1712 
1713         if self.caf_access == False:
1714             sites_forbidden.append("caf.cern.ch")
1715 
1716         # These are the T1 sites. These are only forbidden if we're
1717         # running in non-T1 mode.
1718         # Source:
1719         # https://cmsweb.cern.ch/sitedb/sitelist/?naming_scheme=ce
1720         # Hard-coded, yes. Not nice, no.
1721 
1722         all_t1 = [
1723                 "srm-cms.cern.ch",
1724                 "ccsrm.in2p3.fr",
1725                 "cmssrm-fzk.gridka.de",
1726                 "cmssrm.fnal.gov",
1727                 "gridka-dCache.fzk.de",
1728                 "srm-cms.gridpp.rl.ac.uk",
1729                 "srm.grid.sinica.edu.tw",
1730                 "srm2.grid.sinica.edu.tw",
1731                 "srmcms.pic.es",
1732                 "storm-fe-cms.cr.cnaf.infn.it"
1733                 ]
1734 
1735         country_codes = {
1736               "CAF" : "caf.cern.ch",
1737               "CH" : "srm-cms.cern.ch",
1738               "FR" : "ccsrm.in2p3.fr",
1739               "DE" : "cmssrm-fzk.gridka.de",
1740               "GOV" : "cmssrm.fnal.gov",
1741               "DE2" : "gridka-dCache.fzk.de",
1742               "UK" : "srm-cms.gridpp.rl.ac.uk",
1743               "TW" : "srm.grid.sinica.edu.tw",
1744               "TW2" : "srm2.grid.sinica.edu.tw",
1745               "ES" : "srmcms.pic.es",
1746               "IT" : "storm-fe-cms.cr.cnaf.infn.it"
1747                 }
1748 
1749         if self.non_t1access: 
1750             sites_forbidden.extend(all_t1)
1751 
1752         for site in sites_forbidden:
1753             if site in sites:
1754                 sites.remove(site)
1755 
1756         if self.preferred_site in country_codes:
1757             self.preferred_site = country_codes[self.preferred_site]
1758 
1759         if self.preferred_site != "no preference":
1760             if self.preferred_site in sites:
1761                 sites = [self.preferred_site]
1762             else:
1763                 sites= []
1764 
1765         #print sites
1766 
1767         # Looks like we have to do some caching here, otherwise things
1768         # become waaaay toooo sloooooow. So that's what the
1769         # sites_and_versions_cache does.
1770 
1771         # NOTE: Keep this set to None!
1772         site_name = None
1773         cmd = None
1774         while len(sites) > 0 and \
1775               site_name is None:
1776 
1777             # Create list of t1_sites
1778             t1_sites = []
1779             for site in sites:
1780                 if site in all_t1:
1781                     t1_sites.append(site)
1782                 if site == "caf.cern.ch":
1783                     t1_sites.append(site)
1784 
1785             # If avilable pick preferred site
1786             #if self.preferred_site in sites:
1787             #  se_name = self.preferred_site
1788             # Else, if available pick t1 site
1789 
1790             if len(t1_sites) > 0:
1791                 se_name = choice(t1_sites)
1792             # Else pick any site
1793             else:
1794                 se_name = choice(sites)
1795 
1796             # But check that it hosts the CMSSW version we want.
1797 
1798             if se_name in self.sites_and_versions_cache and \
1799                    cmssw_version in self.sites_and_versions_cache[se_name]:
1800                 if self.sites_and_versions_cache[se_name][cmssw_version]:
1801                     site_name = se_name
1802                     break
1803                 else:
1804                     self.logger.debug("  --> rejecting site `%s'" % se_name)
1805                     sites.remove(se_name)
1806 
1807             else:
1808                 self.logger.info("Checking if site `%s' " \
1809                                  "has CMSSW version `%s'" % \
1810                                  (se_name, cmssw_version))
1811                 self.sites_and_versions_cache[se_name] = {}
1812 
1813                 # TODO TODO TODO
1814                 # Test for SCRAM architecture removed as per request
1815                 # from Andreas.
1816                 # scram_arch = os.getenv("SCRAM_ARCH")
1817                 # cmd = "lcg-info --list-ce " \
1818                 #       "--query '" \
1819                 #       "Tag=VO-cms-%s," \
1820                 #       "Tag=VO-cms-%s," \
1821                 #       "CEStatus=Production," \
1822                 #       "CloseSE=%s'" % \
1823                 #       (cmssw_version, scram_arch, se_name)
1824                 # TODO TODO TODO end
1825 
1826                 cmd = "lcg-info --list-ce " \
1827                       "--query '" \
1828                       "Tag=VO-cms-%s," \
1829                       "CEStatus=Production," \
1830                       "CloseSE=%s'" % \
1831                       (cmssw_version, se_name)
1832                 (status, output) = subprocess.getstatusoutput(cmd)
1833                 if status != 0:
1834                     self.logger.error("Could not check site information " \
1835                                       "for site `%s'" % se_name)
1836                 else:
1837                     if (len(output) > 0) or (se_name == "caf.cern.ch"):
1838                         self.sites_and_versions_cache[se_name][cmssw_version] = True
1839                         site_name = se_name
1840                         break
1841                     else:
1842                         self.sites_and_versions_cache[se_name][cmssw_version] = False
1843                         self.logger.debug("  --> rejecting site `%s'" % se_name)
1844                         sites.remove(se_name)
1845 
1846         if site_name is self.no_matching_site_found_str:
1847             self.logger.error("  --> no matching site found")
1848             self.logger.error("    --> Your release or SCRAM " \
1849                               "architecture may not be available" \
1850                               "anywhere on the (LCG) grid.")
1851             if not cmd is None:
1852                 self.logger.debug("      (command used: `%s')" % cmd)
1853         else:
1854             self.logger.debug("  --> selected site `%s'" % site_name)
1855 
1856         # Return something more descriptive (than `None') in case we
1857         # found nothing.
1858         if site_name is None:
1859             site_name = self.no_matching_site_found_str
1860             # Keep track of our global flag signifying that this
1861             # happened.
1862             self.all_sites_found = False
1863 
1864         # End of pick_a_site.
1865         return site_name
1866 
1867     ##########
1868 
1869     def parse_cmd_line_options(self):
1870 
1871         # Set up the command line parser. Note that we fix up the help
1872         # formatter so that we can add some text pointing people to
1873         # the Twiki etc.
1874         parser = optparse.OptionParser(version="%s %s" % \
1875                                        ("%prog", self.version),
1876                                        formatter=CMSHarvesterHelpFormatter())
1877 
1878         self.option_parser = parser
1879 
1880         # The debug switch.
1881         parser.add_option("-d", "--debug",
1882                           help="Switch on debug mode",
1883                           action="callback",
1884                           callback=self.option_handler_debug)
1885 
1886         # The quiet switch.
1887         parser.add_option("-q", "--quiet",
1888                           help="Be less verbose",
1889                           action="callback",
1890                           callback=self.option_handler_quiet)
1891 
1892         # The force switch. If this switch is used sanity checks are
1893         # performed but failures do not lead to aborts. Use with care.
1894         parser.add_option("", "--force",
1895                           help="Force mode. Do not abort on sanity check "
1896                           "failures",
1897                           action="callback",
1898                           callback=self.option_handler_force)
1899 
1900         # Choose between the different kinds of harvesting.
1901         parser.add_option("", "--harvesting_type",
1902                           help="Harvesting type: %s" % \
1903                           ", ".join(self.harvesting_types),
1904                           action="callback",
1905                           callback=self.option_handler_harvesting_type,
1906                           type="string",
1907                           metavar="HARVESTING_TYPE")
1908 
1909         # Choose between single-step and two-step mode.
1910         parser.add_option("", "--harvesting_mode",
1911                           help="Harvesting mode: %s (default = %s)" % \
1912                           (", ".join(self.harvesting_modes),
1913                           self.harvesting_mode_default),
1914                           action="callback",
1915                           callback=self.option_handler_harvesting_mode,
1916                           type="string",
1917                           metavar="HARVESTING_MODE")
1918 
1919         # Override the GlobalTag chosen by the cmsHarvester.
1920         parser.add_option("", "--globaltag",
1921                           help="GlobalTag to use. Default is the ones " \
1922                           "the dataset was created with for MC, for data" \
1923                           "a GlobalTag has to be specified.",
1924                           action="callback",
1925                           callback=self.option_handler_globaltag,
1926                           type="string",
1927                           metavar="GLOBALTAG")
1928 
1929         # Allow switching off of reference histograms.
1930         parser.add_option("", "--no-ref-hists",
1931                           help="Don't use any reference histograms",
1932                           action="callback",
1933                           callback=self.option_handler_no_ref_hists)
1934 
1935         # Allow the default (i.e. the one that should be used)
1936         # Frontier connection to be overridden.
1937         parser.add_option("", "--frontier-connection",
1938                           help="Use this Frontier connection to find " \
1939                           "GlobalTags and LocalTags (for reference " \
1940                           "histograms).\nPlease only use this for " \
1941                           "testing.",
1942                           action="callback",
1943                           callback=self.option_handler_frontier_connection,
1944                           type="string",
1945                           metavar="FRONTIER")
1946 
1947         # Similar to the above but specific to the Frontier connection
1948         # to be used for the GlobalTag.
1949         parser.add_option("", "--frontier-connection-for-globaltag",
1950                           help="Use this Frontier connection to find " \
1951                           "GlobalTags.\nPlease only use this for " \
1952                           "testing.",
1953                           action="callback",
1954                           callback=self.option_handler_frontier_connection,
1955                           type="string",
1956                           metavar="FRONTIER")
1957 
1958         # Similar to the above but specific to the Frontier connection
1959         # to be used for the reference histograms.
1960         parser.add_option("", "--frontier-connection-for-refhists",
1961                           help="Use this Frontier connection to find " \
1962                           "LocalTags (for reference " \
1963                           "histograms).\nPlease only use this for " \
1964                           "testing.",
1965                           action="callback",
1966                           callback=self.option_handler_frontier_connection,
1967                           type="string",
1968                           metavar="FRONTIER")
1969 
1970         # Option to specify the name (or a regexp) of the dataset(s)
1971         # to be used.
1972         parser.add_option("", "--dataset",
1973                           help="Name (or regexp) of dataset(s) to process",
1974                           action="callback",
1975                           #callback=self.option_handler_dataset_name,
1976                           callback=self.option_handler_input_spec,
1977                           type="string",
1978                           #dest="self.input_name",
1979                           metavar="DATASET")
1980 
1981         # Option to specify the name (or a regexp) of the dataset(s)
1982         # to be ignored.
1983         parser.add_option("", "--dataset-ignore",
1984                           help="Name (or regexp) of dataset(s) to ignore",
1985                           action="callback",
1986                           callback=self.option_handler_input_spec,
1987                           type="string",
1988                           metavar="DATASET-IGNORE")
1989 
1990         # Option to specify the name (or a regexp) of the run(s)
1991         # to be used.
1992         parser.add_option("", "--runs",
1993                           help="Run number(s) to process",
1994                           action="callback",
1995                           callback=self.option_handler_input_spec,
1996                           type="string",
1997                           metavar="RUNS")                   
1998 
1999         # Option to specify the name (or a regexp) of the run(s)
2000         # to be ignored.
2001         parser.add_option("", "--runs-ignore",
2002                           help="Run number(s) to ignore",
2003                           action="callback",
2004                           callback=self.option_handler_input_spec,
2005                           type="string",
2006                           metavar="RUNS-IGNORE")
2007 
2008         # Option to specify a file containing a list of dataset names
2009         # (or regexps) to be used.
2010         parser.add_option("", "--datasetfile",
2011                           help="File containing list of dataset names " \
2012                           "(or regexps) to process",
2013                           action="callback",
2014                           #callback=self.option_handler_listfile_name,
2015                           callback=self.option_handler_input_spec,
2016                           type="string",
2017                           #dest="self.input_name",
2018                           metavar="DATASETFILE")
2019 
2020         # Option to specify a file containing a list of dataset names
2021         # (or regexps) to be ignored.
2022         parser.add_option("", "--datasetfile-ignore",
2023                           help="File containing list of dataset names " \
2024                           "(or regexps) to ignore",
2025                           action="callback",
2026                           callback=self.option_handler_input_spec,
2027                           type="string",
2028                           metavar="DATASETFILE-IGNORE")
2029 
2030         # Option to specify a file containing a list of runs to be
2031         # used.
2032         parser.add_option("", "--runslistfile",
2033                           help="File containing list of run numbers " \
2034                           "to process",
2035                           action="callback",
2036                           callback=self.option_handler_input_spec,
2037                           type="string",
2038                           metavar="RUNSLISTFILE")
2039 
2040         # Option to specify a file containing a list of runs
2041         # to be ignored.
2042         parser.add_option("", "--runslistfile-ignore",
2043                           help="File containing list of run numbers " \
2044                           "to ignore",
2045                           action="callback",
2046                           callback=self.option_handler_input_spec,
2047                           type="string",
2048                           metavar="RUNSLISTFILE-IGNORE")
2049 
2050         # Option to specify a Jsonfile contaning a list of runs
2051         # to be used.
2052         parser.add_option("", "--Jsonrunfile",
2053                           help="Jsonfile containing dictionary of run/lumisections pairs. " \
2054                           "All lumisections of runs contained in dictionary are processed.",
2055                           action="callback",
2056                           callback=self.option_handler_input_Jsonrunfile,
2057                           type="string",
2058                           metavar="JSONRUNFILE")
2059 
2060         # Option to specify a Jsonfile contaning a dictionary of run/lumisections pairs
2061         # to be used.
2062         parser.add_option("", "--Jsonfile",
2063                           help="Jsonfile containing dictionary of run/lumisections pairs. " \
2064                           "Only specified lumisections of runs contained in dictionary are processed.",
2065                           action="callback",
2066                           callback=self.option_handler_input_Jsonfile,
2067                           type="string",
2068                           metavar="JSONFILE")
2069 
2070         # Option to specify a ToDo file contaning a list of runs
2071         # to be used.
2072         parser.add_option("", "--todo-file",
2073                           help="Todo file containing a list of runs to process.",
2074                           action="callback",
2075                           callback=self.option_handler_input_todofile,
2076                           type="string",
2077                           metavar="TODO-FILE")
2078 
2079         # Option to specify which file to use for the dataset name to
2080         # reference histogram name mappings.
2081         parser.add_option("", "--refhistmappingfile",
2082                           help="File to be use for the reference " \
2083                           "histogram mappings. Default: `%s'." % \
2084                           self.ref_hist_mappings_file_name_default,
2085                           action="callback",
2086                           callback=self.option_handler_ref_hist_mapping_file,
2087                           type="string",
2088                           metavar="REFHISTMAPPING-FILE")
2089 
2090         # Specify the place in CASTOR where the output should go.
2091         # NOTE: Only output to CASTOR is supported for the moment,
2092         # since the central DQM results place is on CASTOR anyway.
2093         parser.add_option("", "--castordir",
2094                           help="Place on CASTOR to store results. " \
2095                           "Default: `%s'." % \
2096                           self.castor_base_dir_default,
2097                           action="callback",
2098                           callback=self.option_handler_castor_dir,
2099                           type="string",
2100                           metavar="CASTORDIR")
2101 
2102         # Use this to try and create jobs that will run without
2103         # special `t1access' role.
2104         parser.add_option("", "--no-t1access",
2105                           help="Try to create jobs that will run " \
2106                           "without special `t1access' role",
2107                           action="callback",
2108                           callback=self.option_handler_no_t1access)
2109 
2110         # Use this to create jobs that may run on CAF
2111         parser.add_option("", "--caf-access",
2112                           help="Crab jobs may run " \
2113                           "on CAF",
2114                           action="callback",
2115                           callback=self.option_handler_caf_access)
2116 
2117         # set process.dqmSaver.saveByLumiSection=1 in harvesting cfg file
2118         parser.add_option("", "--saveByLumiSection",
2119                           help="set saveByLumiSection=1 in harvesting cfg file",
2120                           action="callback",
2121                           callback=self.option_handler_saveByLumiSection)
2122 
2123         # Use this to enable automatic creation and submission of crab jobs
2124         parser.add_option("", "--automatic-crab-submission",
2125                           help="Crab jobs are created and " \
2126                           "submitted automatically",
2127                           action="callback",
2128                           callback=self.option_handler_crab_submission)
2129 
2130         # Option to set the max number of sites, each
2131         #job is submitted to 
2132         parser.add_option("", "--max-sites",
2133                           help="Max. number of sites each job is submitted to",
2134                           action="callback",
2135                           callback=self.option_handler_sites,
2136                           type="int") 
2137 
2138         # Option to set the preferred site
2139         parser.add_option("", "--site",
2140                           help="Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2141                           srm-cms.cern.ch : CH \
2142                           ccsrm.in2p3.fr : FR \
2143                           cmssrm-fzk.gridka.de : DE \
2144                           cmssrm.fnal.gov : GOV \
2145                           gridka-dCache.fzk.de : DE2 \
2146                           rm-cms.gridpp.rl.ac.uk : UK \
2147                           srm.grid.sinica.edu.tw : TW \
2148                           srm2.grid.sinica.edu.tw : TW2 \
2149                           srmcms.pic.es : ES \
2150                           storm-fe-cms.cr.cnaf.infn.it : IT",
2151                           action="callback",
2152                           callback=self.option_handler_preferred_site,
2153                           type="string") 
2154 
2155         # This is the command line flag to list all harvesting
2156         # type-to-sequence mappings.
2157         parser.add_option("-l", "--list",
2158                           help="List all harvesting types and their" \
2159                           "corresponding sequence names",
2160                           action="callback",
2161                           callback=self.option_handler_list_types)
2162 
2163         # If nothing was specified: tell the user how to do things the
2164         # next time and exit.
2165         # NOTE: We just use the OptParse standard way of doing this by
2166         #       acting as if a '--help' was specified.
2167         if len(self.cmd_line_opts) < 1:
2168             self.cmd_line_opts = ["--help"]
2169 
2170         # Some trickery with the options. Why? Well, since these
2171         # options change the output level immediately from the option
2172         # handlers, the results differ depending on where they are on
2173         # the command line. Let's just make sure they are at the
2174         # front.
2175         # NOTE: Not very efficient or sophisticated, but it works and
2176         # it only has to be done once anyway.
2177         for i in ["-d", "--debug",
2178                   "-q", "--quiet"]:
2179             if i in self.cmd_line_opts:
2180                 self.cmd_line_opts.remove(i)
2181                 self.cmd_line_opts.insert(0, i)
2182 
2183         # Everything is set up, now parse what we were given.
2184         parser.set_defaults()
2185         (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2186 
2187         # End of parse_cmd_line_options.
2188 
2189     ##########
2190 
2191     def check_input_status(self):
2192         """Check completeness and correctness of input information.
2193 
2194         Check that all required information has been specified and
2195         that, at least as far as can be easily checked, it makes
2196         sense.
2197 
2198         NOTE: This is also where any default values are applied.
2199 
2200         """
2201 
2202         self.logger.info("Checking completeness/correctness of input...")
2203 
2204         # The cmsHarvester does not take (i.e. understand) any
2205         # arguments so there should not be any.
2206         if len(self.args) > 0:
2207             msg = "Sorry but I don't understand `%s'" % \
2208                   (" ".join(self.args))
2209             self.logger.fatal(msg)
2210             raise Usage(msg)
2211 
2212         # BUG BUG BUG
2213         # While we wait for some bugs left and right to get fixed, we
2214         # disable two-step.
2215         if self.harvesting_mode == "two-step":
2216             msg = "--------------------\n" \
2217                   "  Sorry, but for the moment (well, till it works)" \
2218                   "  the two-step mode has been disabled.\n" \
2219                   "--------------------\n"
2220             self.logger.fatal(msg)
2221             raise Error(msg)
2222         # BUG BUG BUG end
2223 
2224         # We need a harvesting method to be specified
2225         if self.harvesting_type is None:
2226             msg = "Please specify a harvesting type"
2227             self.logger.fatal(msg)
2228             raise Usage(msg)
2229         # as well as a harvesting mode.
2230         if self.harvesting_mode is None:
2231             self.harvesting_mode = self.harvesting_mode_default
2232             msg = "No harvesting mode specified --> using default `%s'" % \
2233                   self.harvesting_mode
2234             self.logger.warning(msg)
2235             #raise Usage(msg)
2236 
2237         ###
2238 
2239         # We need an input method so we can find the dataset name(s).
2240         if self.input_method["datasets"]["use"] is None:
2241             msg = "Please specify an input dataset name " \
2242                   "or a list file name"
2243             self.logger.fatal(msg)
2244             raise Usage(msg)
2245 
2246         # DEBUG DEBUG DEBUG
2247         # If we get here, we should also have an input name.
2248         assert not self.input_name["datasets"]["use"] is None
2249         # DEBUG DEBUG DEBUG end
2250 
2251         ###
2252 
2253         # The same holds for the reference histogram mapping file (if
2254         # we're using references).
2255         if self.use_ref_hists:
2256             if self.ref_hist_mappings_file_name is None:
2257                 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2258                 msg = "No reference histogram mapping file specified --> " \
2259                       "using default `%s'" % \
2260                       self.ref_hist_mappings_file_name
2261                 self.logger.warning(msg)
2262 
2263         ###
2264 
2265         # We need to know where to put the stuff (okay, the results)
2266         # on CASTOR.
2267         if self.castor_base_dir is None:
2268             self.castor_base_dir = self.castor_base_dir_default
2269             msg = "No CASTOR area specified -> using default `%s'" % \
2270                   self.castor_base_dir
2271             self.logger.warning(msg)
2272             #raise Usage(msg)
2273 
2274         # Only the CERN CASTOR area is supported.
2275         if not self.castor_base_dir.startswith(self.castor_prefix):
2276             msg = "CASTOR area does not start with `%s'" % \
2277                   self.castor_prefix
2278             self.logger.fatal(msg)
2279             if self.castor_base_dir.find("castor") > -1 and \
2280                not self.castor_base_dir.find("cern.ch") > -1:
2281                 self.logger.fatal("Only CERN CASTOR is supported")
2282             raise Usage(msg)
2283 
2284         ###
2285 
2286         # TODO TODO TODO
2287         # This should be removed in the future, once I find out how to
2288         # get the config file used to create a given dataset from DBS.
2289 
2290         # For data we need to have a GlobalTag. (For MC we can figure
2291         # it out by ourselves.)
2292         if self.globaltag is None:
2293             self.logger.warning("No GlobalTag specified. This means I cannot")
2294             self.logger.warning("run on data, only on MC.")
2295             self.logger.warning("I will skip all data datasets.")
2296 
2297         # TODO TODO TODO end
2298 
2299         # Make sure the GlobalTag ends with `::All'.
2300         if not self.globaltag is None:
2301             if not self.globaltag.endswith("::All"):
2302                 self.logger.warning("Specified GlobalTag `%s' does " \
2303                                     "not end in `::All' --> " \
2304                                     "appending this missing piece" % \
2305                                     self.globaltag)
2306                 self.globaltag = "%s::All" % self.globaltag
2307 
2308         ###
2309 
2310         # Dump some info about the Frontier connections used.
2311         for (key, value) in self.frontier_connection_name.items():
2312             frontier_type_str = "unknown"
2313             if key == "globaltag":
2314                 frontier_type_str = "the GlobalTag"
2315             elif key == "refhists":
2316                 frontier_type_str = "the reference histograms"
2317             non_str = None
2318             if self.frontier_connection_overridden[key] == True:
2319                 non_str = "non-"
2320             else:
2321                 non_str = ""
2322             self.logger.info("Using %sdefault Frontier " \
2323                              "connection for %s: `%s'" % \
2324                              (non_str, frontier_type_str, value))
2325 
2326         ###
2327 
2328         # End of check_input_status.
2329 
2330     ##########
2331 
2332     def check_cmssw(self):
2333         """Check if CMSSW is setup.
2334 
2335         """
2336 
2337         # Try to access the CMSSW_VERSION environment variable. If
2338         # it's something useful we consider CMSSW to be set up
2339         # properly. Otherwise we raise an error.
2340         cmssw_version = os.getenv("CMSSW_VERSION")
2341         if cmssw_version is None:
2342             self.logger.fatal("It seems CMSSW is not setup...")
2343             self.logger.fatal("($CMSSW_VERSION is empty)")
2344             raise Error("ERROR: CMSSW needs to be setup first!")
2345 
2346         self.cmssw_version = cmssw_version
2347         self.logger.info("Found CMSSW version %s properly set up" % \
2348                           self.cmssw_version)
2349 
2350         # End of check_cmsssw.
2351         return True
2352 
2353     ##########
2354 
2355     def check_dbs(self):
2356         """Check if DBS is setup.
2357 
2358         """
2359 
2360         # Try to access the DBSCMD_HOME environment variable. If this
2361         # looks useful we consider DBS to be set up
2362         # properly. Otherwise we raise an error.
2363         dbs_home = os.getenv("DBSCMD_HOME")
2364         if dbs_home is None:
2365             self.logger.fatal("It seems DBS is not setup...")
2366             self.logger.fatal("  $DBSCMD_HOME is empty")
2367             raise Error("ERROR: DBS needs to be setup first!")
2368 
2369 ##        # Now we try to do a very simple DBS search. If that works
2370 ##        # instead of giving us the `Unsupported API call' crap, we
2371 ##        # should be good to go.
2372 ##        # NOTE: Not ideal, I know, but it reduces the amount of
2373 ##        #       complaints I get...
2374 ##        cmd = "dbs search --query=\"find dataset where dataset = impossible\""
2375 ##        (status, output) = subprocess.getstatusoutput(cmd)
2376 ##        pdb.set_trace()
2377 ##        if status != 0 or \
2378 ##           output.lower().find("unsupported api call") > -1:
2379 ##            self.logger.fatal("It seems DBS is not setup...")
2380 ##            self.logger.fatal("  %s returns crap:" % cmd)
2381 ##            for line in output.split("\n"):
2382 ##                self.logger.fatal("  %s" % line)
2383 ##            raise Error("ERROR: DBS needs to be setup first!")
2384 
2385         self.logger.debug("Found DBS properly set up")
2386 
2387         # End of check_dbs.
2388         return True
2389 
2390     ##########
2391 
2392     def setup_dbs(self):
2393         """Setup the Python side of DBS.
2394 
2395         For more information see the DBS Python API documentation:
2396         https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2397 
2398         """
2399 
2400         try:
2401             args={}
2402             args["url"]= "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2403                          "servlet/DBSServlet"
2404             api = DbsApi(args)
2405             self.dbs_api = api
2406 
2407         except DBSAPI.dbsApiException.DbsApiException as ex:
2408             self.logger.fatal("Caught DBS API exception %s: %s "  % \
2409                               (ex.getClassName(), ex.getErrorMessage()))
2410             if ex.getErrorCode() not in (None, ""):
2411                 logger.debug("DBS exception error code: ", ex.getErrorCode())
2412             raise
2413 
2414         # End of setup_dbs.
2415 
2416     ##########
2417 
2418     def dbs_resolve_dataset_name(self, dataset_name):
2419         """Use DBS to resolve a wildcarded dataset name.
2420 
2421         """
2422 
2423         # DEBUG DEBUG DEBUG
2424         # If we get here DBS should have been set up already.
2425         assert not self.dbs_api is None
2426         # DEBUG DEBUG DEBUG end
2427 
2428         # Some minor checking to make sure that whatever we've been
2429         # given as dataset name actually sounds like a dataset name.
2430         if not (dataset_name.startswith("/") and \
2431                 dataset_name.endswith("RECO")):
2432             self.logger.warning("Dataset name `%s' does not sound " \
2433                                  "like a valid dataset name!" % \
2434                                 dataset_name)
2435 
2436         #----------
2437 
2438         api = self.dbs_api
2439         dbs_query = "find dataset where dataset like %s " \
2440                     "and dataset.status = VALID" % \
2441                     dataset_name
2442         try:
2443             api_result = api.executeQuery(dbs_query)
2444         except DBSAPI.dbsApiException.DbsApiException:
2445             msg = "ERROR: Could not execute DBS query"
2446             self.logger.fatal(msg)
2447             raise Error(msg)
2448 
2449         # Setup parsing.
2450         handler = DBSXMLHandler(["dataset"])
2451         parser = xml.sax.make_parser()
2452         parser.setContentHandler(handler)
2453 
2454         # Parse.
2455         try:
2456             xml.sax.parseString(api_result, handler)
2457         except SAXParseException:
2458             msg = "ERROR: Could not parse DBS server output"
2459             self.logger.fatal(msg)
2460             raise Error(msg)
2461 
2462         # DEBUG DEBUG DEBUG
2463         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2464         # DEBUG DEBUG DEBUG end
2465 
2466         # Extract the results.
2467         datasets = handler.results.values()[0]
2468 
2469         # End of dbs_resolve_dataset_name.
2470         return datasets
2471 
2472     ##########
2473 
2474     def dbs_resolve_cmssw_version(self, dataset_name):
2475         """Ask DBS for the CMSSW version used to create this dataset.
2476 
2477         """
2478 
2479         # DEBUG DEBUG DEBUG
2480         # If we get here DBS should have been set up already.
2481         assert not self.dbs_api is None
2482         # DEBUG DEBUG DEBUG end
2483 
2484         api = self.dbs_api
2485         dbs_query = "find algo.version where dataset = %s " \
2486                     "and dataset.status = VALID" % \
2487                     dataset_name
2488         try:
2489             api_result = api.executeQuery(dbs_query)
2490         except DBSAPI.dbsApiException.DbsApiException:
2491             msg = "ERROR: Could not execute DBS query"
2492             self.logger.fatal(msg)
2493             raise Error(msg)
2494 
2495         handler = DBSXMLHandler(["algo.version"])
2496         parser = xml.sax.make_parser()
2497         parser.setContentHandler(handler)
2498 
2499         try:
2500             xml.sax.parseString(api_result, handler)
2501         except SAXParseException:
2502             msg = "ERROR: Could not parse DBS server output"
2503             self.logger.fatal(msg)
2504             raise Error(msg)
2505 
2506         # DEBUG DEBUG DEBUG
2507         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2508         # DEBUG DEBUG DEBUG end
2509 
2510         cmssw_version = handler.results.values()[0]
2511 
2512         # DEBUG DEBUG DEBUG
2513         assert len(cmssw_version) == 1
2514         # DEBUG DEBUG DEBUG end
2515 
2516         cmssw_version = cmssw_version[0]
2517 
2518         # End of dbs_resolve_cmssw_version.
2519         return cmssw_version
2520 
2521     ##########
2522 
2523 ##    def dbs_resolve_dataset_number_of_events(self, dataset_name):
2524 ##        """Ask DBS across how many events this dataset has been spread
2525 ##        out.
2526 
2527 ##        This is especially useful to check that we do not submit a job
2528 ##        supposed to run on a complete sample that is not contained at
2529 ##        a single site.
2530 
2531 ##        """
2532 
2533 ##        # DEBUG DEBUG DEBUG
2534 ##        # If we get here DBS should have been set up already.
2535 ##        assert not self.dbs_api is None
2536 ##        # DEBUG DEBUG DEBUG end
2537 
2538 ##        api = self.dbs_api
2539 ##        dbs_query = "find count(site) where dataset = %s " \
2540 ##                    "and dataset.status = VALID" % \
2541 ##                    dataset_name
2542 ##        try:
2543 ##            api_result = api.executeQuery(dbs_query)
2544 ##        except DbsApiException:
2545 ##            raise Error("ERROR: Could not execute DBS query")
2546 
2547 ##        try:
2548 ##            num_events = []
2549 ##            class Handler(xml.sax.handler.ContentHandler):
2550 ##                def startElement(self, name, attrs):
2551 ##                    if name == "result":
2552 ##                        num_events.append(str(attrs["COUNT_STORAGEELEMENT"]))
2553 ##            xml.sax.parseString(api_result, Handler())
2554 ##        except SAXParseException:
2555 ##            raise Error("ERROR: Could not parse DBS server output")
2556 
2557 ##        # DEBUG DEBUG DEBUG
2558 ##        assert len(num_events) == 1
2559 ##        # DEBUG DEBUG DEBUG end
2560 
2561 ##        num_events = int(num_events[0])
2562 
2563 ##        # End of dbs_resolve_dataset_number_of_events.
2564 ##        return num_events
2565 
2566     ##########
2567 
2568     def dbs_resolve_runs(self, dataset_name):
2569         """Ask DBS for the list of runs in a given dataset.
2570 
2571         # NOTE: This does not (yet?) skip/remove empty runs. There is
2572         # a bug in the DBS entry run.numevents (i.e. it always returns
2573         # zero) which should be fixed in the `next DBS release'.
2574         # See also:
2575         #   https://savannah.cern.ch/bugs/?53452
2576         #   https://savannah.cern.ch/bugs/?53711
2577 
2578         """
2579 
2580         # TODO TODO TODO
2581         # We should remove empty runs as soon as the above mentioned
2582         # bug is fixed.
2583         # TODO TODO TODO end
2584 
2585         # DEBUG DEBUG DEBUG
2586         # If we get here DBS should have been set up already.
2587         assert not self.dbs_api is None
2588         # DEBUG DEBUG DEBUG end
2589 
2590         api = self.dbs_api
2591         dbs_query = "find run where dataset = %s " \
2592                     "and dataset.status = VALID" % \
2593                     dataset_name
2594         try:
2595             api_result = api.executeQuery(dbs_query)
2596         except DBSAPI.dbsApiException.DbsApiException:
2597             msg = "ERROR: Could not execute DBS query"
2598             self.logger.fatal(msg)
2599             raise Error(msg)
2600 
2601         handler = DBSXMLHandler(["run"])
2602         parser = xml.sax.make_parser()
2603         parser.setContentHandler(handler)
2604 
2605         try:
2606             xml.sax.parseString(api_result, handler)
2607         except SAXParseException:
2608             msg = "ERROR: Could not parse DBS server output"
2609             self.logger.fatal(msg)
2610             raise Error(msg)
2611 
2612         # DEBUG DEBUG DEBUG
2613         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2614         # DEBUG DEBUG DEBUG end
2615 
2616         runs = handler.results.values()[0]
2617         # Turn strings into integers.
2618         runs = sorted([int(i) for i in runs])
2619 
2620         # End of dbs_resolve_runs.
2621         return runs
2622 
2623     ##########
2624 
2625     def dbs_resolve_globaltag(self, dataset_name):
2626         """Ask DBS for the globaltag corresponding to a given dataset.
2627 
2628         # BUG BUG BUG
2629         # This does not seem to work for data datasets? E.g. for
2630         # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2631         # Probaly due to the fact that the GlobalTag changed during
2632         # datataking...
2633         BUG BUG BUG end
2634 
2635         """
2636 
2637         # DEBUG DEBUG DEBUG
2638         # If we get here DBS should have been set up already.
2639         assert not self.dbs_api is None
2640         # DEBUG DEBUG DEBUG end
2641 
2642         api = self.dbs_api
2643         dbs_query = "find dataset.tag where dataset = %s " \
2644                     "and dataset.status = VALID" % \
2645                     dataset_name
2646         try:
2647             api_result = api.executeQuery(dbs_query)
2648         except DBSAPI.dbsApiException.DbsApiException:
2649             msg = "ERROR: Could not execute DBS query"
2650             self.logger.fatal(msg)
2651             raise Error(msg)
2652 
2653         handler = DBSXMLHandler(["dataset.tag"])
2654         parser = xml.sax.make_parser()
2655         parser.setContentHandler(parser)
2656 
2657         try:
2658             xml.sax.parseString(api_result, handler)
2659         except SAXParseException:
2660             msg = "ERROR: Could not parse DBS server output"
2661             self.logger.fatal(msg)
2662             raise Error(msg)
2663 
2664         # DEBUG DEBUG DEBUG
2665         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2666         # DEBUG DEBUG DEBUG end
2667 
2668         globaltag = handler.results.values()[0]
2669 
2670         # DEBUG DEBUG DEBUG
2671         assert len(globaltag) == 1
2672         # DEBUG DEBUG DEBUG end
2673 
2674         globaltag = globaltag[0]
2675 
2676         # End of dbs_resolve_globaltag.
2677         return globaltag
2678 
2679     ##########
2680 
2681     def dbs_resolve_datatype(self, dataset_name):
2682         """Ask DBS for the the data type (data or mc) of a given
2683         dataset.
2684 
2685         """
2686 
2687         # DEBUG DEBUG DEBUG
2688         # If we get here DBS should have been set up already.
2689         assert not self.dbs_api is None
2690         # DEBUG DEBUG DEBUG end
2691 
2692         api = self.dbs_api
2693         dbs_query = "find datatype.type where dataset = %s " \
2694                     "and dataset.status = VALID" % \
2695                     dataset_name
2696         try:
2697             api_result = api.executeQuery(dbs_query)
2698         except DBSAPI.dbsApiException.DbsApiException:
2699             msg = "ERROR: Could not execute DBS query"
2700             self.logger.fatal(msg)
2701             raise Error(msg)
2702 
2703         handler = DBSXMLHandler(["datatype.type"])
2704         parser = xml.sax.make_parser()
2705         parser.setContentHandler(handler)
2706 
2707         try:
2708             xml.sax.parseString(api_result, handler)
2709         except SAXParseException:
2710             msg = "ERROR: Could not parse DBS server output"
2711             self.logger.fatal(msg)
2712             raise Error(msg)
2713 
2714         # DEBUG DEBUG DEBUG
2715         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2716         # DEBUG DEBUG DEBUG end
2717 
2718         datatype = handler.results.values()[0]
2719 
2720         # DEBUG DEBUG DEBUG
2721         assert len(datatype) == 1
2722         # DEBUG DEBUG DEBUG end
2723 
2724         datatype = datatype[0]
2725 
2726         # End of dbs_resolve_datatype.
2727         return datatype
2728 
2729     ##########
2730 
2731     # OBSOLETE OBSOLETE OBSOLETE
2732     # This method is no longer used.
2733 
2734     def dbs_resolve_number_of_events(self, dataset_name, run_number=None):
2735         """Determine the number of events in a given dataset (and run).
2736 
2737         Ask DBS for the number of events in a dataset. If a run number
2738         is specified the number of events returned is that in that run
2739         of that dataset. If problems occur we throw an exception.
2740 
2741         # BUG BUG BUG
2742         # Since DBS does not return the number of events correctly,
2743         # neither for runs nor for whole datasets, we have to work
2744         # around that a bit...
2745         # BUG BUG BUG end
2746 
2747         """
2748 
2749         # DEBUG DEBUG DEBUG
2750         # If we get here DBS should have been set up already.
2751         assert not self.dbs_api is None
2752         # DEBUG DEBUG DEBUG end
2753 
2754         api = self.dbs_api
2755         dbs_query = "find file.name, file.numevents where dataset = %s " \
2756                     "and dataset.status = VALID" % \
2757                     dataset_name
2758         if not run_number is None:
2759             dbs_query = dbq_query + (" and run = %d" % run_number)
2760         try:
2761             api_result = api.executeQuery(dbs_query)
2762         except DBSAPI.dbsApiException.DbsApiException:
2763             msg = "ERROR: Could not execute DBS query"
2764             self.logger.fatal(msg)
2765             raise Error(msg)
2766 
2767         handler = DBSXMLHandler(["file.name", "file.numevents"])
2768         parser = xml.sax.make_parser()
2769         parser.setContentHandler(handler)
2770 
2771         try:
2772             xml.sax.parseString(api_result, handler)
2773         except SAXParseException:
2774             msg = "ERROR: Could not parse DBS server output"
2775             self.logger.fatal(msg)
2776             raise Error(msg)
2777 
2778         # DEBUG DEBUG DEBUG
2779         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2780         # DEBUG DEBUG DEBUG end
2781 
2782         num_events = sum(handler.results["file.numevents"])
2783 
2784         # End of dbs_resolve_number_of_events.
2785         return num_events
2786 
2787     # OBSOLETE OBSOLETE OBSOLETE end
2788 
2789     ##########
2790 
2791 ##    def dbs_resolve_dataset_number_of_sites(self, dataset_name):
2792 ##        """Ask DBS across how many sites this dataset has been spread
2793 ##        out.
2794 
2795 ##        This is especially useful to check that we do not submit a job
2796 ##        supposed to run on a complete sample that is not contained at
2797 ##        a single site.
2798 
2799 ##        """
2800 
2801 ##        # DEBUG DEBUG DEBUG
2802 ##        # If we get here DBS should have been set up already.
2803 ##        assert not self.dbs_api is None
2804 ##        # DEBUG DEBUG DEBUG end
2805 
2806 ##        api = self.dbs_api
2807 ##        dbs_query = "find count(site) where dataset = %s " \
2808 ##                    "and dataset.status = VALID" % \
2809 ##                    dataset_name
2810 ##        try:
2811 ##            api_result = api.executeQuery(dbs_query)
2812 ##        except DbsApiException:
2813 ##            raise Error("ERROR: Could not execute DBS query")
2814 
2815 ##        try:
2816 ##            num_sites = []
2817 ##            class Handler(xml.sax.handler.ContentHandler):
2818 ##                def startElement(self, name, attrs):
2819 ##                    if name == "result":
2820 ##                        num_sites.append(str(attrs["COUNT_STORAGEELEMENT"]))
2821 ##            xml.sax.parseString(api_result, Handler())
2822 ##        except SAXParseException:
2823 ##            raise Error("ERROR: Could not parse DBS server output")
2824 
2825 ##        # DEBUG DEBUG DEBUG
2826 ##        assert len(num_sites) == 1
2827 ##        # DEBUG DEBUG DEBUG end
2828 
2829 ##        num_sites = int(num_sites[0])
2830 
2831 ##        # End of dbs_resolve_dataset_number_of_sites.
2832 ##        return num_sites
2833 
2834     ##########
2835 
2836 ##    def dbs_check_dataset_spread(self, dataset_name):
2837 ##        """Figure out across how many sites this dataset is spread.
2838 
2839 ##        NOTE: This is something we need to figure out per run, since
2840 ##        we want to submit harvesting jobs per run.
2841 
2842 ##        Basically three things can happen with a given dataset:
2843 ##        - the whole dataset is available on a single site,
2844 ##        - the whole dataset is available (mirrored) at multiple sites,
2845 ##        - the dataset is spread across multiple sites and there is no
2846 ##          single site containing the full dataset in one place.
2847 
2848 ##        NOTE: If all goes well, it should not be possible that
2849 ##        anything but a _full_ dataset is mirrored. So we ignore the
2850 ##        possibility in which for example one site contains the full
2851 ##        dataset and two others mirror half of it.
2852 ##        ANOTHER NOTE: According to some people this last case _could_
2853 ##        actually happen. I will not design for it, but make sure it
2854 ##        ends up as a false negative, in which case we just loose some
2855 ##        efficiency and treat the dataset (unnecessarily) as
2856 ##        spread-out.
2857 
2858 ##        We don't really care about the first two possibilities, but in
2859 ##        the third case we need to make sure to run the harvesting in
2860 ##        two-step mode.
2861 
2862 ##        This method checks with DBS which of the above cases is true
2863 ##        for the dataset name given, and returns a 1 for the first two
2864 ##        cases, and the number of sites across which the dataset is
2865 ##        spread for the third case.
2866 
2867 ##        The way in which this is done is by asking how many files each
2868 ##        site has for the dataset. In the first case there is only one
2869 ##        site, in the second case all sites should have the same number
2870 ##        of files (i.e. the total number of files in the dataset) and
2871 ##        in the third case the file counts from all sites should add up
2872 ##        to the total file count for the dataset.
2873 
2874 ##        """
2875 
2876 ##        # DEBUG DEBUG DEBUG
2877 ##        # If we get here DBS should have been set up already.
2878 ##        assert not self.dbs_api is None
2879 ##        # DEBUG DEBUG DEBUG end
2880 
2881 ##        api = self.dbs_api
2882 ##        dbs_query = "find run, run.numevents, site, file.count " \
2883 ##                    "where dataset = %s " \
2884 ##                    "and dataset.status = VALID" % \
2885 ##                    dataset_name
2886 ##        try:
2887 ##            api_result = api.executeQuery(dbs_query)
2888 ##        except DbsApiException:
2889 ##            msg = "ERROR: Could not execute DBS query"
2890 ##            self.logger.fatal(msg)
2891 ##            raise Error(msg)
2892 
2893 ##        # Index things by run number. No cross-check is done to make
2894 ##        # sure we get results for each and every run in the
2895 ##        # dataset. I'm not sure this would make sense since we'd be
2896 ##        # cross-checking DBS info with DBS info anyway. Note that we
2897 ##        # use the file count per site to see if we're dealing with an
2898 ##        # incomplete vs. a mirrored dataset.
2899 ##        sample_info = {}
2900 ##        try:
2901 ##            class Handler(xml.sax.handler.ContentHandler):
2902 ##                def startElement(self, name, attrs):
2903 ##                    if name == "result":
2904 ##                        run_number = int(attrs["RUNS_RUNNUMBER"])
2905 ##                        site_name = str(attrs["STORAGEELEMENT_SENAME"])
2906 ##                        file_count = int(attrs["COUNT_FILES"])
2907 ##                        # BUG BUG BUG
2908 ##                        # Doh! For some reason DBS never returns any other
2909 ##                        # event count than zero.
2910 ##                        event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
2911 ##                        # BUG BUG BUG end
2912 ##                        info = (site_name, file_count, event_count)
2913 ##                        try:
2914 ##                            sample_info[run_number].append(info)
2915 ##                        except KeyError:
2916 ##                            sample_info[run_number] = [info]
2917 ##            xml.sax.parseString(api_result, Handler())
2918 ##        except SAXParseException:
2919 ##            msg = "ERROR: Could not parse DBS server output"
2920 ##            self.logger.fatal(msg)
2921 ##            raise Error(msg)
2922 
2923 ##        # Now translate this into a slightly more usable mapping.
2924 ##        sites = {}
2925 ##        for (run_number, site_info) in sample_info.items():
2926 ##            # Quick-n-dirty trick to see if all file counts are the
2927 ##            # same.
2928 ##            unique_file_counts = set([i[1] for i in site_info])
2929 ##            if len(unique_file_counts) == 1:
2930 ##                # Okay, so this must be a mirrored dataset.
2931 ##                # We have to pick one but we have to be careful. We
2932 ##                # cannot submit to things like a T0, a T1, or CAF.
2933 ##                site_names = [self.pick_a_site([i[0] for i in site_info])]
2934 ##                nevents = [site_info[0][2]]
2935 ##            else:
2936 ##                # Looks like this is a spread-out sample.
2937 ##                site_names = [i[0] for i in site_info]
2938 ##                nevents = [i[2] for i in site_info]
2939 ##            sites[run_number] = zip(site_names, nevents)
2940 
2941 ##        self.logger.debug("Sample `%s' spread is:" % dataset_name)
2942 ##        run_numbers = sites.keys()
2943 ##        run_numbers.sort()
2944 ##        for run_number in run_numbers:
2945 ##            self.logger.debug("  run # %6d: %d sites (%s)" % \
2946 ##                              (run_number,
2947 ##                               len(sites[run_number]),
2948 ##                               ", ".join([i[0] for i in sites[run_number]])))
2949 
2950 ##        # End of dbs_check_dataset_spread.
2951 ##        return sites
2952 
2953 ##    # DEBUG DEBUG DEBUG
2954 ##    # Just kept for debugging now.
2955 ##    def dbs_check_dataset_spread_old(self, dataset_name):
2956 ##        """Figure out across how many sites this dataset is spread.
2957 
2958 ##        NOTE: This is something we need to figure out per run, since
2959 ##        we want to submit harvesting jobs per run.
2960 
2961 ##        Basically three things can happen with a given dataset:
2962 ##        - the whole dataset is available on a single site,
2963 ##        - the whole dataset is available (mirrored) at multiple sites,
2964 ##        - the dataset is spread across multiple sites and there is no
2965 ##          single site containing the full dataset in one place.
2966 
2967 ##        NOTE: If all goes well, it should not be possible that
2968 ##        anything but a _full_ dataset is mirrored. So we ignore the
2969 ##        possibility in which for example one site contains the full
2970 ##        dataset and two others mirror half of it.
2971 ##        ANOTHER NOTE: According to some people this last case _could_
2972 ##        actually happen. I will not design for it, but make sure it
2973 ##        ends up as a false negative, in which case we just loose some
2974 ##        efficiency and treat the dataset (unnecessarily) as
2975 ##        spread-out.
2976 
2977 ##        We don't really care about the first two possibilities, but in
2978 ##        the third case we need to make sure to run the harvesting in
2979 ##        two-step mode.
2980 
2981 ##        This method checks with DBS which of the above cases is true
2982 ##        for the dataset name given, and returns a 1 for the first two
2983 ##        cases, and the number of sites across which the dataset is
2984 ##        spread for the third case.
2985 
2986 ##        The way in which this is done is by asking how many files each
2987 ##        site has for the dataset. In the first case there is only one
2988 ##        site, in the second case all sites should have the same number
2989 ##        of files (i.e. the total number of files in the dataset) and
2990 ##        in the third case the file counts from all sites should add up
2991 ##        to the total file count for the dataset.
2992 
2993 ##        """
2994 
2995 ##        # DEBUG DEBUG DEBUG
2996 ##        # If we get here DBS should have been set up already.
2997 ##        assert not self.dbs_api is None
2998 ##        # DEBUG DEBUG DEBUG end
2999 
3000 ##        api = self.dbs_api
3001 ##        dbs_query = "find run, run.numevents, site, file.count " \
3002 ##                    "where dataset = %s " \
3003 ##                    "and dataset.status = VALID" % \
3004 ##                    dataset_name
3005 ##        try:
3006 ##            api_result = api.executeQuery(dbs_query)
3007 ##        except DbsApiException:
3008 ##            msg = "ERROR: Could not execute DBS query"
3009 ##            self.logger.fatal(msg)
3010 ##            raise Error(msg)
3011 
3012 ##        # Index things by run number. No cross-check is done to make
3013 ##        # sure we get results for each and every run in the
3014 ##        # dataset. I'm not sure this would make sense since we'd be
3015 ##        # cross-checking DBS info with DBS info anyway. Note that we
3016 ##        # use the file count per site to see if we're dealing with an
3017 ##        # incomplete vs. a mirrored dataset.
3018 ##        sample_info = {}
3019 ##        try:
3020 ##            class Handler(xml.sax.handler.ContentHandler):
3021 ##                def startElement(self, name, attrs):
3022 ##                    if name == "result":
3023 ##                        run_number = int(attrs["RUNS_RUNNUMBER"])
3024 ##                        site_name = str(attrs["STORAGEELEMENT_SENAME"])
3025 ##                        file_count = int(attrs["COUNT_FILES"])
3026 ##                        # BUG BUG BUG
3027 ##                        # Doh! For some reason DBS never returns any other
3028 ##                        # event count than zero.
3029 ##                        event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
3030 ##                        # BUG BUG BUG end
3031 ##                        info = (site_name, file_count, event_count)
3032 ##                        try:
3033 ##                            sample_info[run_number].append(info)
3034 ##                        except KeyError:
3035 ##                            sample_info[run_number] = [info]
3036 ##            xml.sax.parseString(api_result, Handler())
3037 ##        except SAXParseException:
3038 ##            msg = "ERROR: Could not parse DBS server output"
3039 ##            self.logger.fatal(msg)
3040 ##            raise Error(msg)
3041 
3042 ##        # Now translate this into a slightly more usable mapping.
3043 ##        sites = {}
3044 ##        for (run_number, site_info) in sample_info.items():
3045 ##            # Quick-n-dirty trick to see if all file counts are the
3046 ##            # same.
3047 ##            unique_file_counts = set([i[1] for i in site_info])
3048 ##            if len(unique_file_counts) == 1:
3049 ##                # Okay, so this must be a mirrored dataset.
3050 ##                # We have to pick one but we have to be careful. We
3051 ##                # cannot submit to things like a T0, a T1, or CAF.
3052 ##                site_names = [self.pick_a_site([i[0] for i in site_info])]
3053 ##                nevents = [site_info[0][2]]
3054 ##            else:
3055 ##                # Looks like this is a spread-out sample.
3056 ##                site_names = [i[0] for i in site_info]
3057 ##                nevents = [i[2] for i in site_info]
3058 ##            sites[run_number] = zip(site_names, nevents)
3059 
3060 ##        self.logger.debug("Sample `%s' spread is:" % dataset_name)
3061 ##        run_numbers = sites.keys()
3062 ##        run_numbers.sort()
3063 ##        for run_number in run_numbers:
3064 ##            self.logger.debug("  run # %6d: %d site(s) (%s)" % \
3065 ##                              (run_number,
3066 ##                               len(sites[run_number]),
3067 ##                               ", ".join([i[0] for i in sites[run_number]])))
3068 
3069 ##        # End of dbs_check_dataset_spread_old.
3070 ##        return sites
3071 ##    # DEBUG DEBUG DEBUG end
3072 
3073     ##########
3074 
3075     def dbs_check_dataset_spread(self, dataset_name):
3076         """Figure out the number of events in each run of this dataset.
3077 
3078         This is a more efficient way of doing this than calling
3079         dbs_resolve_number_of_events for each run.
3080 
3081         """
3082 
3083         self.logger.debug("Checking spread of dataset `%s'" % dataset_name)
3084 
3085         # DEBUG DEBUG DEBUG
3086         # If we get here DBS should have been set up already.
3087         assert not self.dbs_api is None
3088         # DEBUG DEBUG DEBUG end
3089 
3090         api = self.dbs_api
3091         dbs_query = "find run.number, site, file.name, file.numevents " \
3092                     "where dataset = %s " \
3093                     "and dataset.status = VALID" % \
3094                     dataset_name
3095         try:
3096             api_result = api.executeQuery(dbs_query)
3097         except DBSAPI.dbsApiException.DbsApiException:
3098             msg = "ERROR: Could not execute DBS query"
3099             self.logger.fatal(msg)
3100             raise Error(msg)
3101 
3102         handler = DBSXMLHandler(["run.number", "site", "file.name", "file.numevents"])
3103         parser = xml.sax.make_parser()
3104         parser.setContentHandler(handler)
3105 
3106         try:
3107             # OBSOLETE OBSOLETE OBSOLETE
3108 ##            class Handler(xml.sax.handler.ContentHandler):
3109 ##                def startElement(self, name, attrs):
3110 ##                    if name == "result":
3111 ##                        site_name = str(attrs["STORAGEELEMENT_SENAME"])
3112 ##                        # TODO TODO TODO
3113 ##                        # Ugly hack to get around cases like this:
3114 ##                        #   $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3115 ##                        #   Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3116 ##                        #   Processing ... \
3117 ##                        #   PATH    STORAGEELEMENT_SENAME   COUNT_FILES
3118 ##                        #   _________________________________________________________________________________
3119 ##                        #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO          1
3120 ##                        #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO  cmssrm.fnal.gov 12
3121 ##                        #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO  srm-cms.cern.ch 12
3122 ##                        if len(site_name) < 1:
3123 ##                            return
3124 ##                        # TODO TODO TODO end
3125 ##                        run_number = int(attrs["RUNS_RUNNUMBER"])
3126 ##                        file_name = str(attrs["FILES_LOGICALFILENAME"])
3127 ##                        nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3128 ##                        # I know, this is a bit of a kludge.
3129 ##                        if not files_info.has_key(run_number):
3130 ##                            # New run.
3131 ##                            files_info[run_number] = {}
3132 ##                            files_info[run_number][file_name] = (nevents,
3133 ##                                                                 [site_name])
3134 ##                        elif not files_info[run_number].has_key(file_name):
3135 ##                            # New file for a known run.
3136 ##                            files_info[run_number][file_name] = (nevents,
3137 ##                                                                 [site_name])
3138 ##                        else:
3139 ##                            # New entry for a known file for a known run.
3140 ##                            # DEBUG DEBUG DEBUG
3141 ##                            # Each file should have the same number of
3142 ##                            # events independent of the site it's at.
3143 ##                            assert nevents == files_info[run_number][file_name][0]
3144 ##                            # DEBUG DEBUG DEBUG end
3145 ##                            files_info[run_number][file_name][1].append(site_name)
3146             # OBSOLETE OBSOLETE OBSOLETE end
3147             xml.sax.parseString(api_result, handler)
3148         except SAXParseException:
3149             msg = "ERROR: Could not parse DBS server output"
3150             self.logger.fatal(msg)
3151             raise Error(msg)
3152 
3153         # DEBUG DEBUG DEBUG
3154         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
3155         # DEBUG DEBUG DEBUG end
3156 
3157         # Now reshuffle all results a bit so we can more easily use
3158         # them later on. (Remember that all arrays in the results
3159         # should have equal length.)
3160         files_info = {}
3161         for (index, site_name) in enumerate(handler.results["site"]):
3162             # Ugly hack to get around cases like this:
3163             #   $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3164             #   Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3165             #   Processing ... \
3166             #   PATH    STORAGEELEMENT_SENAME   COUNT_FILES
3167             #   _________________________________________________________________________________
3168             #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO          1
3169             #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO  cmssrm.fnal.gov 12
3170             #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO  srm-cms.cern.ch 12
3171             if len(site_name) < 1:
3172                 continue
3173             run_number = int(handler.results["run.number"][index])
3174             file_name = handler.results["file.name"][index]
3175             nevents = int(handler.results["file.numevents"][index])
3176 
3177             # I know, this is a bit of a kludge.
3178             if run_number not in files_info:
3179                 # New run.
3180                 files_info[run_number] = {}
3181                 files_info[run_number][file_name] = (nevents,
3182                                                      [site_name])
3183             elif file_name not in files_info[run_number]:
3184                 # New file for a known run.
3185                 files_info[run_number][file_name] = (nevents,
3186                                                      [site_name])
3187             else:
3188                 # New entry for a known file for a known run.
3189                 # DEBUG DEBUG DEBUG
3190                 # Each file should have the same number of
3191                 # events independent of the site it's at.
3192                 assert nevents == files_info[run_number][file_name][0]
3193                 # DEBUG DEBUG DEBUG end
3194                 files_info[run_number][file_name][1].append(site_name)
3195 
3196         # Remove any information for files that are not available
3197         # anywhere. NOTE: After introducing the ugly hack above, this
3198         # is a bit redundant, but let's keep it for the moment.
3199         for run_number in files_info.keys():
3200             files_without_sites = [i for (i, j) in \
3201                                    files_info[run_number].items() \
3202                                    if len(j[1]) < 1]
3203             if len(files_without_sites) > 0:
3204                 self.logger.warning("Removing %d file(s)" \
3205                                     " with empty site names" % \
3206                                     len(files_without_sites))
3207                 for file_name in files_without_sites:
3208                     del files_info[run_number][file_name]
3209                     # files_info[run_number][file_name] = (files_info \
3210                     #                                     [run_number] \
3211                     #                                      [file_name][0], [])
3212 
3213         # And another bit of a kludge.
3214         num_events_catalog = {}
3215         for run_number in files_info.keys():
3216             site_names = list(set([j for i in files_info[run_number].values() for j in i[1]]))
3217 
3218             # NOTE: The term `mirrored' does not have the usual
3219             # meaning here. It basically means that we can apply
3220             # single-step harvesting.
3221             mirrored = None
3222             if len(site_names) > 1:
3223                 # Now we somehow need to figure out if we're dealing
3224                 # with a mirrored or a spread-out dataset. The rule we
3225                 # use here is that we're dealing with a spread-out
3226                 # dataset unless we can find at least one site
3227                 # containing exactly the full list of files for this
3228                 # dataset that DBS knows about. In that case we just
3229                 # use only that site.
3230                 all_file_names = files_info[run_number].keys()
3231                 all_file_names = set(all_file_names)
3232                 sites_with_complete_copies = []
3233                 for site_name in site_names:
3234                     files_at_site = [i for (i, (j, k)) \
3235                                      in files_info[run_number].items() \
3236                                      if site_name in k]
3237                     files_at_site = set(files_at_site)
3238                     if files_at_site == all_file_names:
3239                         sites_with_complete_copies.append(site_name)
3240                 if len(sites_with_complete_copies) < 1:
3241                     # This dataset/run is available at more than one
3242                     # site, but no one has a complete copy. So this is
3243                     # a spread-out sample.
3244                     mirrored = False
3245                 else:
3246                     if len(sites_with_complete_copies) > 1:
3247                         # This sample is available (and complete) at
3248                         # more than one site. Definitely mirrored.
3249                         mirrored = True
3250                     else:
3251                         # This dataset/run is available at more than
3252                         # one site and at least one of them has a
3253                         # complete copy. Even if this is only a single
3254                         # site, let's call this `mirrored' and run the
3255                         # single-step harvesting.
3256                         mirrored = True
3257 
3258 ##                site_names_ref = set(files_info[run_number].values()[0][1])
3259 ##                for site_names_tmp in files_info[run_number].values()[1:]:
3260 ##                    if set(site_names_tmp[1]) != site_names_ref:
3261 ##                        mirrored = False
3262 ##                        break
3263 
3264                 if mirrored:
3265                     self.logger.debug("    -> run appears to be `mirrored'")
3266                 else:
3267                     self.logger.debug("    -> run appears to be spread-out")
3268 
3269                 if mirrored and \
3270                        len(sites_with_complete_copies) != len(site_names):
3271                     # Remove any references to incomplete sites if we
3272                     # have at least one complete site (and if there
3273                     # are incomplete sites).
3274                     for (file_name, (i, sites)) in files_info[run_number].items():
3275                         complete_sites = [site for site in sites \
3276                                           if site in sites_with_complete_copies]
3277                         files_info[run_number][file_name] = (i, complete_sites)
3278                     site_names = sites_with_complete_copies
3279 
3280             self.logger.debug("  for run #%d:" % run_number)
3281             num_events_catalog[run_number] = {}
3282             num_events_catalog[run_number]["all_sites"] = sum([i[0] for i in files_info[run_number].values()])
3283             if len(site_names) < 1:
3284                 self.logger.debug("    run is not available at any site")
3285                 self.logger.debug("      (but should contain %d events" % \
3286                                   num_events_catalog[run_number]["all_sites"])
3287             else:
3288                 self.logger.debug("    at all sites combined there are %d events" % \
3289                                   num_events_catalog[run_number]["all_sites"])
3290                 for site_name in site_names:
3291                     num_events_catalog[run_number][site_name] = sum([i[0] for i in files_info[run_number].values() if site_name in i[1]])
3292                     self.logger.debug("    at site `%s' there are %d events" % \
3293                                       (site_name, num_events_catalog[run_number][site_name]))
3294             num_events_catalog[run_number]["mirrored"] = mirrored
3295 
3296         # End of dbs_check_dataset_spread.
3297         return num_events_catalog
3298 
3299     # Beginning of old version.
3300 ##    def dbs_check_dataset_num_events(self, dataset_name):
3301 ##        """Figure out the number of events in each run of this dataset.
3302 
3303 ##        This is a more efficient way of doing this than calling
3304 ##        dbs_resolve_number_of_events for each run.
3305 
3306 ##        # BUG BUG BUG
3307 ##        # This might very well not work at all for spread-out samples. (?)
3308 ##        # BUG BUG BUG end
3309 
3310 ##        """
3311 
3312 ##        # DEBUG DEBUG DEBUG
3313 ##        # If we get here DBS should have been set up already.
3314 ##        assert not self.dbs_api is None
3315 ##        # DEBUG DEBUG DEBUG end
3316 
3317 ##        api = self.dbs_api
3318 ##        dbs_query = "find run.number, file.name, file.numevents where dataset = %s " \
3319 ##                    "and dataset.status = VALID" % \
3320 ##                    dataset_name
3321 ##        try:
3322 ##            api_result = api.executeQuery(dbs_query)
3323 ##        except DbsApiException:
3324 ##            msg = "ERROR: Could not execute DBS query"
3325 ##            self.logger.fatal(msg)
3326 ##            raise Error(msg)
3327 
3328 ##        try:
3329 ##            files_info = {}
3330 ##            class Handler(xml.sax.handler.ContentHandler):
3331 ##                def startElement(self, name, attrs):
3332 ##                    if name == "result":
3333 ##                        run_number = int(attrs["RUNS_RUNNUMBER"])
3334 ##                        file_name = str(attrs["FILES_LOGICALFILENAME"])
3335 ##                        nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3336 ##                        try:
3337 ##                            files_info[run_number][file_name] = nevents
3338 ##                        except KeyError:
3339 ##                            files_info[run_number] = {file_name: nevents}
3340 ##            xml.sax.parseString(api_result, Handler())
3341 ##        except SAXParseException:
3342 ##            msg = "ERROR: Could not parse DBS server output"
3343 ##            self.logger.fatal(msg)
3344 ##            raise Error(msg)
3345 
3346 ##        num_events_catalog = {}
3347 ##        for run_number in files_info.keys():
3348 ##            num_events_catalog[run_number] = sum(files_info[run_number].values())
3349 
3350 ##        # End of dbs_check_dataset_num_events.
3351 ##        return num_events_catalog
3352     # End of old version.
3353 
3354     ##########
3355 
3356     def build_dataset_list(self, input_method, input_name):
3357         """Build a list of all datasets to be processed.
3358 
3359         """
3360 
3361         dataset_names = []
3362 
3363         # It may be, but only for the list of datasets to ignore, that
3364         # the input method and name are None because nothing was
3365         # specified. In that case just an empty list is returned.
3366         if input_method is None:
3367             pass
3368         elif input_method == "dataset":
3369             # Input comes from a dataset name directly on the command
3370             # line. But, this can also contain wildcards so we need
3371             # DBS to translate it conclusively into a list of explicit
3372             # dataset names.
3373             self.logger.info("Asking DBS for dataset names")
3374             dataset_names = self.dbs_resolve_dataset_name(input_name)
3375         elif input_method == "datasetfile":
3376             # In this case a file containing a list of dataset names
3377             # is specified. Still, each line may contain wildcards so
3378             # this step also needs help from DBS.
3379             # NOTE: Lines starting with a `#' are ignored.
3380             self.logger.info("Reading input from list file `%s'" % \
3381                              input_name)
3382             try:
3383                 listfile = open("/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name, "r")
3384                 print("open listfile")
3385                 for dataset in listfile:
3386                     # Skip empty lines.
3387                     dataset_stripped = dataset.strip()
3388                     if len(dataset_stripped) < 1:
3389                         continue
3390                     # Skip lines starting with a `#'.
3391                     if dataset_stripped[0] != "#":
3392                         dataset_names.extend(self. \
3393                                              dbs_resolve_dataset_name(dataset_stripped))
3394                 listfile.close()
3395             except IOError:
3396                 msg = "ERROR: Could not open input list file `%s'" % \
3397                       input_name
3398                 self.logger.fatal(msg)
3399                 raise Error(msg)
3400         else:
3401             # DEBUG DEBUG DEBUG
3402             # We should never get here.
3403             assert False, "Unknown input method `%s'" % input_method
3404             # DEBUG DEBUG DEBUG end
3405 
3406         # Remove duplicates from the dataset list.
3407         # NOTE: There should not be any duplicates in any list coming
3408         # from DBS, but maybe the user provided a list file with less
3409         # care.
3410         # Store for later use.
3411         dataset_names = sorted(set(dataset_names))
3412 
3413 
3414         # End of build_dataset_list.
3415         return dataset_names
3416 
3417     ##########
3418 
3419     def build_dataset_use_list(self):
3420         """Build a list of datasets to process.
3421 
3422         """
3423 
3424         self.logger.info("Building list of datasets to consider...")
3425 
3426         input_method = self.input_method["datasets"]["use"]
3427         input_name = self.input_name["datasets"]["use"]
3428         dataset_names = self.build_dataset_list(input_method,
3429                                                 input_name)
3430         self.datasets_to_use = dict(list(zip(dataset_names,
3431                                         [None] * len(dataset_names))))
3432 
3433         self.logger.info("  found %d dataset(s) to process:" % \
3434                          len(dataset_names))
3435         for dataset in dataset_names:
3436             self.logger.info("  `%s'" % dataset)
3437 
3438         # End of build_dataset_use_list.
3439 
3440     ##########
3441 
3442     def build_dataset_ignore_list(self):
3443         """Build a list of datasets to ignore.
3444 
3445         NOTE: We should always have a list of datasets to process, but
3446         it may be that we don't have a list of datasets to ignore.
3447 
3448         """
3449 
3450         self.logger.info("Building list of datasets to ignore...")
3451 
3452         input_method = self.input_method["datasets"]["ignore"]
3453         input_name = self.input_name["datasets"]["ignore"]
3454         dataset_names = self.build_dataset_list(input_method,
3455                                                 input_name)
3456         self.datasets_to_ignore = dict(list(zip(dataset_names,
3457                                            [None] * len(dataset_names))))
3458 
3459         self.logger.info("  found %d dataset(s) to ignore:" % \
3460                          len(dataset_names))
3461         for dataset in dataset_names:
3462             self.logger.info("  `%s'" % dataset)
3463 
3464         # End of build_dataset_ignore_list.
3465 
3466     ##########
3467 
3468     def build_runs_list(self, input_method, input_name):
3469 
3470         runs = []
3471 
3472         # A list of runs (either to use or to ignore) is not
3473         # required. This protects against `empty cases.'
3474         if input_method is None:
3475             pass
3476         elif input_method == "runs":
3477             # A list of runs was specified directly from the command
3478             # line.
3479             self.logger.info("Reading list of runs from the " \
3480                              "command line")
3481             runs.extend([int(i.strip()) \
3482                          for i in input_name.split(",") \
3483                          if len(i.strip()) > 0])
3484         elif input_method == "runslistfile":
3485             # We were passed a file containing a list of runs.
3486             self.logger.info("Reading list of runs from file `%s'" % \
3487                              input_name)
3488             try:
3489                 listfile = open(input_name, "r")
3490                 for run in listfile:
3491                     # Skip empty lines.
3492                     run_stripped = run.strip()
3493                     if len(run_stripped) < 1:
3494                         continue
3495                     # Skip lines starting with a `#'.
3496                     if run_stripped[0] != "#":
3497                         runs.append(int(run_stripped))
3498                 listfile.close()
3499             except IOError:
3500                 msg = "ERROR: Could not open input list file `%s'" % \
3501                       input_name
3502                 self.logger.fatal(msg)
3503                 raise Error(msg)
3504 
3505         else:
3506             # DEBUG DEBUG DEBUG
3507             # We should never get here.
3508             assert False, "Unknown input method `%s'" % input_method
3509             # DEBUG DEBUG DEBUG end
3510 
3511         # Remove duplicates, sort and done.
3512         runs = list(set(runs))
3513 
3514         # End of build_runs_list().
3515         return runs
3516 
3517     ##########
3518 
3519     def build_runs_use_list(self):
3520         """Build a list of runs to process.
3521 
3522         """
3523 
3524         self.logger.info("Building list of runs to consider...")
3525 
3526         input_method = self.input_method["runs"]["use"]
3527         input_name = self.input_name["runs"]["use"]
3528         runs = self.build_runs_list(input_method, input_name)
3529         self.runs_to_use = dict(list(zip(runs, [None] * len(runs))))
3530 
3531         self.logger.info("  found %d run(s) to process:" % \
3532                          len(runs))
3533         if len(runs) > 0:
3534             self.logger.info("  %s" % ", ".join([str(i) for i in runs]))
3535 
3536         # End of build_runs_list().
3537 
3538     ##########
3539 
3540     def build_runs_ignore_list(self):
3541         """Build a list of runs to ignore.
3542 
3543         NOTE: We should always have a list of runs to process, but
3544         it may be that we don't have a list of runs to ignore.
3545 
3546         """
3547 
3548         self.logger.info("Building list of runs to ignore...")
3549 
3550         input_method = self.input_method["runs"]["ignore"]
3551         input_name = self.input_name["runs"]["ignore"]
3552         runs = self.build_runs_list(input_method, input_name)
3553         self.runs_to_ignore = dict(list(zip(runs, [None] * len(runs))))
3554 
3555         self.logger.info("  found %d run(s) to ignore:" % \
3556                          len(runs))
3557         if len(runs) > 0:
3558             self.logger.info("  %s" % ", ".join([str(i) for i in runs]))
3559 
3560         # End of build_runs_ignore_list().
3561 
3562     ##########
3563 
3564     def process_dataset_ignore_list(self):
3565         """Update the list of datasets taking into account the ones to
3566         ignore.
3567 
3568         Both lists have been generated before from DBS and both are
3569         assumed to be unique.
3570 
3571         NOTE: The advantage of creating the ignore list from DBS (in
3572         case a regexp is given) and matching that instead of directly
3573         matching the ignore criterion against the list of datasets (to
3574         consider) built from DBS is that in the former case we're sure
3575         that all regexps are treated exactly as DBS would have done
3576         without the cmsHarvester.
3577 
3578         NOTE: This only removes complete samples. Exclusion of single
3579         runs is done by the book keeping. So the assumption is that a
3580         user never wants to harvest just part (i.e. n out of N runs)
3581         of a sample.
3582 
3583         """
3584 
3585         self.logger.info("Processing list of datasets to ignore...")
3586 
3587         self.logger.debug("Before processing ignore list there are %d " \
3588                           "datasets in the list to be processed" % \
3589                           len(self.datasets_to_use))
3590 
3591         # Simple approach: just loop and search.
3592         dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3593         for dataset_name in self.datasets_to_use.keys():
3594             if dataset_name in self.datasets_to_ignore.keys():
3595                 del dataset_names_filtered[dataset_name]
3596 
3597         self.logger.info("  --> Removed %d dataset(s)" % \
3598                          (len(self.datasets_to_use) -
3599                           len(dataset_names_filtered)))
3600 
3601         self.datasets_to_use = dataset_names_filtered
3602 
3603         self.logger.debug("After processing ignore list there are %d " \
3604                           "datasets in the list to be processed" % \
3605                           len(self.datasets_to_use))
3606 
3607     # End of process_dataset_ignore_list.
3608 
3609     ##########
3610 
3611     def process_runs_use_and_ignore_lists(self):
3612 
3613         self.logger.info("Processing list of runs to use and ignore...")
3614 
3615         # This basically adds all runs in a dataset to be processed,
3616         # except for any runs that are not specified in the `to use'
3617         # list and any runs that are specified in the `to ignore'
3618         # list.
3619 
3620         # NOTE: It is assumed that those lists make sense. The input
3621         # should be checked against e.g. overlapping `use' and
3622         # `ignore' lists.
3623 
3624         runs_to_use = self.runs_to_use
3625         runs_to_ignore = self.runs_to_ignore
3626 
3627         for dataset_name in self.datasets_to_use:
3628             runs_in_dataset = self.datasets_information[dataset_name]["runs"]
3629 
3630             # First some sanity checks.
3631             runs_to_use_tmp = []
3632             for run in runs_to_use:
3633                 if not run in runs_in_dataset:
3634                     self.logger.warning("Dataset `%s' does not contain " \
3635                                         "requested run %d " \
3636                                         "--> ignoring `use' of this run" % \
3637                                         (dataset_name, run))
3638                 else:
3639                     runs_to_use_tmp.append(run)
3640 
3641             if len(runs_to_use) > 0:
3642                 runs = runs_to_use_tmp
3643                 self.logger.info("Using %d out of %d runs " \
3644                                  "of dataset `%s'" % \
3645                                  (len(runs), len(runs_in_dataset),
3646                                   dataset_name))
3647             else:
3648                 runs = runs_in_dataset
3649 
3650             if len(runs_to_ignore) > 0:
3651                 runs_tmp = []
3652                 for run in runs:
3653                     if not run in runs_to_ignore:
3654                         runs_tmp.append(run)
3655                 self.logger.info("Ignoring %d out of %d runs " \
3656                                  "of dataset `%s'" % \
3657                                  (len(runs)- len(runs_tmp),
3658                                   len(runs_in_dataset),
3659                                   dataset_name))
3660                 runs = runs_tmp
3661 
3662             if self.todofile != "YourToDofile.txt":
3663                 runs_todo = []
3664                 print("Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile)
3665                 cmd="grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3666                 (status, output)=subprocess.getstatusoutput(cmd)
3667                 for run in runs:
3668                     run_str="%s" %run
3669                     if run_str in output:
3670                         runs_todo.append(run)
3671                 self.logger.info("Using %d runs " \
3672                                  "of dataset `%s'" % \
3673                                  (len(runs_todo),
3674                                   dataset_name))
3675                 runs=runs_todo
3676 
3677             Json_runs = []
3678             if self.Jsonfilename != "YourJSON.txt":
3679                 good_runs = []
3680                 self.Jsonlumi = True
3681                 # We were passed a Jsonfile containing a dictionary of
3682                 # run/lunisection-pairs
3683                 self.logger.info("Reading runs and lumisections from file `%s'" % \
3684                                 self.Jsonfilename)
3685                 try:
3686                     Jsonfile = open(self.Jsonfilename, "r")
3687                     for names in Jsonfile:
3688                         dictNames= eval(str(names))
3689                         for key in dictNames:
3690                             intkey=int(key)
3691                             Json_runs.append(intkey)
3692                     Jsonfile.close()
3693                 except IOError:
3694                     msg = "ERROR: Could not open Jsonfile `%s'" % \
3695                           input_name
3696                     self.logger.fatal(msg)
3697                     raise Error(msg)
3698                 for run in runs:
3699                     if run in Json_runs:
3700                         good_runs.append(run)
3701                 self.logger.info("Using %d runs " \
3702                                  "of dataset `%s'" % \
3703                                  (len(good_runs),
3704                                   dataset_name))
3705                 runs=good_runs
3706             if (self.Jsonrunfilename != "YourJSON.txt") and (self.Jsonfilename == "YourJSON.txt"):
3707                 good_runs = []
3708                 # We were passed a Jsonfile containing a dictionary of
3709                 # run/lunisection-pairs
3710                 self.logger.info("Reading runs from file `%s'" % \
3711                                 self.Jsonrunfilename)
3712                 try:
3713                     Jsonfile = open(self.Jsonrunfilename, "r")
3714                     for names in Jsonfile:
3715                         dictNames= eval(str(names))
3716                         for key in dictNames:
3717                             intkey=int(key)
3718                             Json_runs.append(intkey)
3719                     Jsonfile.close()
3720                 except IOError:
3721                     msg = "ERROR: Could not open Jsonfile `%s'" % \
3722                           input_name
3723                     self.logger.fatal(msg)
3724                     raise Error(msg)
3725                 for run in runs:
3726                     if run in Json_runs: 
3727                         good_runs.append(run)
3728                 self.logger.info("Using %d runs " \
3729                                  "of dataset `%s'" % \
3730                                  (len(good_runs),
3731                                   dataset_name))
3732                 runs=good_runs
3733 
3734             self.datasets_to_use[dataset_name] = runs
3735 
3736         # End of process_runs_use_and_ignore_lists().
3737 
3738     ##########
3739 
3740     def singlify_datasets(self):
3741         """Remove all but the largest part of all datasets.
3742 
3743         This allows us to harvest at least part of these datasets
3744         using single-step harvesting until the two-step approach
3745         works.
3746 
3747         """
3748 
3749         # DEBUG DEBUG DEBUG
3750         assert self.harvesting_mode == "single-step-allow-partial"
3751         # DEBUG DEBUG DEBUG end
3752 
3753         for dataset_name in self.datasets_to_use:
3754             for run_number in self.datasets_information[dataset_name]["runs"]:
3755                 max_events = max(self.datasets_information[dataset_name]["sites"][run_number].values())
3756                 sites_with_max_events = [i[0] for i in self.datasets_information[dataset_name]["sites"][run_number].items() if i[1] == max_events]
3757                 self.logger.warning("Singlifying dataset `%s', " \
3758                                     "run %d" % \
3759                                     (dataset_name, run_number))
3760                 cmssw_version = self.datasets_information[dataset_name] \
3761                                 ["cmssw_version"]
3762                 selected_site = self.pick_a_site(sites_with_max_events,
3763                                                  cmssw_version)
3764 
3765                 # Let's tell the user that we're manhandling this dataset.
3766                 nevents_old = self.datasets_information[dataset_name]["num_events"][run_number]
3767                 self.logger.warning("  --> " \
3768                                     "only harvesting partial statistics: " \
3769                                     "%d out of %d events (5.1%f%%) " \
3770                                     "at site `%s'" % \
3771                                     (max_events,
3772                                      nevents_old,
3773                                      100. * max_events / nevents_old,
3774                                      selected_site))
3775                 self.logger.warning("!!! Please note that the number of " \
3776                                     "events in the output path name will " \
3777                                     "NOT reflect the actual statistics in " \
3778                                     "the harvested results !!!")
3779 
3780                 # We found the site with the highest statistics and
3781                 # the corresponding number of events. (CRAB gets upset
3782                 # if we ask for more events than there are at a given
3783                 # site.) Now update this information in our main
3784                 # datasets_information variable.
3785                 self.datasets_information[dataset_name]["sites"][run_number] = {selected_site: max_events}
3786                 self.datasets_information[dataset_name]["num_events"][run_number] = max_events
3787                 #self.datasets_information[dataset_name]["sites"][run_number] = [selected_site]
3788 
3789         # End of singlify_datasets.
3790 
3791     ##########
3792 
3793     def check_dataset_list(self):
3794         """Check list of dataset names for impossible ones.
3795 
3796         Two kinds of checks are done:
3797         - Checks for things that do not make sense. These lead to
3798           errors and skipped datasets.
3799         - Sanity checks. For these warnings are issued but the user is
3800           considered to be the authoritative expert.
3801 
3802         Checks performed:
3803         - The CMSSW version encoded in the dataset name should match
3804           self.cmssw_version. This is critical.
3805         - There should be some events in the dataset/run. This is
3806           critical in the sense that CRAB refuses to create jobs for
3807           zero events. And yes, this does happen in practice. E.g. the
3808           reprocessed CRAFT08 datasets contain runs with zero events.
3809         - A cursory check is performed to see if the harvesting type
3810           makes sense for the data type. This should prevent the user
3811           from inadvertently running RelVal for data.
3812         - It is not possible to run single-step harvesting jobs on
3813           samples that are not fully contained at a single site.
3814         - Each dataset/run has to be available at at least one site.
3815 
3816         """
3817 
3818         self.logger.info("Performing sanity checks on dataset list...")
3819 
3820         dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3821 
3822         for dataset_name in self.datasets_to_use.keys():
3823 
3824             # Check CMSSW version.
3825             version_from_dataset = self.datasets_information[dataset_name] \
3826                                    ["cmssw_version"]
3827             if version_from_dataset != self.cmssw_version:
3828                 msg = "  CMSSW version mismatch for dataset `%s' " \
3829                       "(%s vs. %s)" % \
3830                       (dataset_name,
3831                        self.cmssw_version, version_from_dataset)
3832                 if self.force_running:
3833                     # Expert mode: just warn, then continue.
3834                     self.logger.warning("%s " \
3835                                         "--> `force mode' active: " \
3836                                         "run anyway" % msg)
3837                 else:
3838                     del dataset_names_after_checks[dataset_name]
3839                     self.logger.warning("%s " \
3840                                         "--> skipping" % msg)
3841                     continue
3842 
3843             ###
3844 
3845             # Check that the harvesting type makes sense for the
3846             # sample. E.g. normally one would not run the DQMOffline
3847             # harvesting on Monte Carlo.
3848             # TODO TODO TODO
3849             # This should be further refined.
3850             suspicious = False
3851             datatype = self.datasets_information[dataset_name]["datatype"]
3852             if datatype == "data":
3853                 # Normally only DQM harvesting is run on data.
3854                 if self.harvesting_type != "DQMOffline":
3855                     suspicious = True
3856             elif datatype == "mc":
3857                 if self.harvesting_type == "DQMOffline":
3858                     suspicious = True
3859             else:
3860                 # Doh!
3861                 assert False, "ERROR Impossible data type `%s' " \
3862                        "for dataset `%s'" % \
3863                        (datatype, dataset_name)
3864             if suspicious:
3865                 msg = "  Normally one does not run `%s' harvesting " \
3866                       "on %s samples, are you sure?" % \
3867                       (self.harvesting_type, datatype)
3868                 if self.force_running:
3869                     self.logger.warning("%s " \
3870                                         "--> `force mode' active: " \
3871                                         "run anyway" % msg)
3872                 else:
3873                     del dataset_names_after_checks[dataset_name]
3874                     self.logger.warning("%s " \
3875                                         "--> skipping" % msg)
3876                     continue
3877 
3878             # TODO TODO TODO end
3879 
3880             ###
3881 
3882             # BUG BUG BUG
3883             # For the moment, due to a problem with DBS, I cannot
3884             # figure out the GlobalTag for data by myself. (For MC
3885             # it's no problem.) This means that unless a GlobalTag was
3886             # specified from the command line, we will have to skip
3887             # any data datasets.
3888 
3889             if datatype == "data":
3890                 if self.globaltag is None:
3891                     msg = "For data datasets (like `%s') " \
3892                           "we need a GlobalTag" % \
3893                           dataset_name
3894                     del dataset_names_after_checks[dataset_name]
3895                     self.logger.warning("%s " \
3896                                         "--> skipping" % msg)
3897                     continue
3898 
3899             # BUG BUG BUG end
3900 
3901             ###
3902 
3903             # Check if the GlobalTag exists and (if we're using
3904             # reference histograms) if it's ready to be used with
3905             # reference histograms.
3906             globaltag = self.datasets_information[dataset_name]["globaltag"]
3907             if not globaltag in self.globaltag_check_cache:
3908                 if self.check_globaltag(globaltag):
3909                     self.globaltag_check_cache.append(globaltag)
3910                 else:
3911                     msg = "Something is wrong with GlobalTag `%s' " \
3912                           "used by dataset `%s'!" % \
3913                           (globaltag, dataset_name)
3914                     if self.use_ref_hists:
3915                         msg += "\n(Either it does not exist or it " \
3916                                "does not contain the required key to " \
3917                                "be used with reference histograms.)"
3918                     else:
3919                         msg += "\n(It probably just does not exist.)"
3920                     self.logger.fatal(msg)
3921                     raise Usage(msg)
3922 
3923             ###
3924 
3925             # Require that each run is available at least somewhere.
3926             runs_without_sites = [i for (i, j) in \
3927                                   self.datasets_information[dataset_name] \
3928                                   ["sites"].items() \
3929                                   if len(j) < 1 and \
3930                                   i in self.datasets_to_use[dataset_name]]
3931             if len(runs_without_sites) > 0:
3932                 for run_without_sites in runs_without_sites:
3933                     try:
3934                         dataset_names_after_checks[dataset_name].remove(run_without_sites)
3935                     except KeyError:
3936                         pass
3937                 self.logger.warning("  removed %d unavailable run(s) " \
3938                                     "from dataset `%s'" % \
3939                                     (len(runs_without_sites), dataset_name))
3940                 self.logger.debug("    (%s)" % \
3941                                   ", ".join([str(i) for i in \
3942                                              runs_without_sites]))
3943 
3944             ###
3945 
3946             # Unless we're running two-step harvesting: only allow
3947             # samples located on a single site.
3948             if not self.harvesting_mode == "two-step":
3949                 for run_number in self.datasets_to_use[dataset_name]:
3950                     # DEBUG DEBUG DEBUG
3951 ##                    if self.datasets_information[dataset_name]["num_events"][run_number] != 0:
3952 ##                        pdb.set_trace()
3953                     # DEBUG DEBUG DEBUG end
3954                     num_sites = len(self.datasets_information[dataset_name] \
3955                                 ["sites"][run_number])
3956                     if num_sites > 1 and \
3957                            not self.datasets_information[dataset_name] \
3958                            ["mirrored"][run_number]:
3959                         # Cannot do this with a single-step job, not
3960                         # even in force mode. It just does not make
3961                         # sense.
3962                         msg = "  Dataset `%s', run %d is spread across more " \
3963                               "than one site.\n" \
3964                               "  Cannot run single-step harvesting on " \
3965                               "samples spread across multiple sites" % \
3966                               (dataset_name, run_number)
3967                         try:
3968                             dataset_names_after_checks[dataset_name].remove(run_number)
3969                         except KeyError:
3970                             pass
3971                         self.logger.warning("%s " \
3972                                             "--> skipping" % msg)
3973 
3974             ###
3975 
3976             # Require that the dataset/run is non-empty.
3977             # NOTE: To avoid reconsidering empty runs/datasets next
3978             # time around, we do include them in the book keeping.
3979             # BUG BUG BUG
3980             # This should sum only over the runs that we use!
3981             tmp = [j for (i, j) in self.datasets_information \
3982                    [dataset_name]["num_events"].items() \
3983                    if i in self.datasets_to_use[dataset_name]]
3984             num_events_dataset = sum(tmp)
3985             # BUG BUG BUG end
3986             if num_events_dataset < 1:
3987                 msg = "  dataset `%s' is empty" % dataset_name
3988                 del dataset_names_after_checks[dataset_name]
3989                 self.logger.warning("%s " \
3990                                     "--> skipping" % msg)
3991                 # Update the book keeping with all the runs in the dataset.
3992                 # DEBUG DEBUG DEBUG
3993                 #assert set([j for (i, j) in self.datasets_information \
3994                 #            [dataset_name]["num_events"].items() \
3995                 #            if i in self.datasets_to_use[dataset_name]]) == \
3996                 #           set([0])
3997                 # DEBUG DEBUG DEBUG end
3998                 #self.book_keeping_information[dataset_name] = self.datasets_information \
3999                 #                                              [dataset_name]["num_events"]
4000                 continue
4001 
4002             tmp = [i for i in \
4003                    self.datasets_information[dataset_name] \
4004                    ["num_events"].items() if i[1] < 1]
4005             tmp = [i for i in tmp if i[0] in self.datasets_to_use[dataset_name]]
4006             empty_runs = dict(tmp)
4007             if len(empty_runs) > 0:
4008                 for empty_run in empty_runs:
4009                     try:
4010                         dataset_names_after_checks[dataset_name].remove(empty_run)
4011                     except KeyError:
4012                         pass
4013                 self.logger.info("  removed %d empty run(s) from dataset `%s'" % \
4014                                  (len(empty_runs), dataset_name))
4015                 self.logger.debug("    (%s)" % \
4016                                   ", ".join([str(i) for i in empty_runs]))
4017 
4018         ###
4019 
4020         # If we emptied out a complete dataset, remove the whole
4021         # thing.
4022         dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4023         for (dataset_name, runs) in dataset_names_after_checks.items():
4024             if len(runs) < 1:
4025                 self.logger.warning("  Removing dataset without any runs " \
4026                                     "(left) `%s'" % \
4027                                     dataset_name)
4028                 del dataset_names_after_checks_tmp[dataset_name]
4029         dataset_names_after_checks = dataset_names_after_checks_tmp
4030 
4031         ###
4032 
4033         self.logger.warning("  --> Removed %d dataset(s)" % \
4034                             (len(self.datasets_to_use) -
4035                              len(dataset_names_after_checks)))
4036 
4037         # Now store the modified version of the dataset list.
4038         self.datasets_to_use = dataset_names_after_checks
4039 
4040         # End of check_dataset_list.
4041 
4042     ##########
4043 
4044     def escape_dataset_name(self, dataset_name):
4045         """Escape a DBS dataset name.
4046 
4047         Escape a DBS dataset name such that it does not cause trouble
4048         with the file system. This means turning each `/' into `__',
4049         except for the first one which is just removed.
4050 
4051         """
4052 
4053         escaped_dataset_name = dataset_name
4054         escaped_dataset_name = escaped_dataset_name.strip("/")
4055         escaped_dataset_name = escaped_dataset_name.replace("/", "__")
4056 
4057         return escaped_dataset_name
4058 
4059     ##########
4060 
4061     # BUG BUG BUG
4062     # This is a bit of a redundant method, isn't it?
4063     def create_config_file_name(self, dataset_name, run_number):
4064         """Generate the name of the configuration file to be run by
4065         CRAB.
4066 
4067         Depending on the harvesting mode (single-step or two-step)
4068         this is the name of the real harvesting configuration or the
4069         name of the first-step ME summary extraction configuration.
4070 
4071         """
4072 
4073         if self.harvesting_mode == "single-step":
4074             config_file_name = self.create_harvesting_config_file_name(dataset_name)
4075         elif self.harvesting_mode == "single-step-allow-partial":
4076             config_file_name = self.create_harvesting_config_file_name(dataset_name)
4077 ##            # Only add the alarming piece to the file name if this is
4078 ##            # a spread-out dataset.
4079 ##            pdb.set_trace()
4080 ##            if self.datasets_information[dataset_name] \
4081 ##                   ["mirrored"][run_number] == False:
4082 ##                config_file_name = config_file_name.replace(".py", "_partial.py")
4083         elif self.harvesting_mode == "two-step":
4084             config_file_name = self.create_me_summary_config_file_name(dataset_name)
4085         else:
4086             assert False, "ERROR Unknown harvesting mode `%s'" % \
4087                    self.harvesting_mode
4088 
4089         # End of create_config_file_name.
4090         return config_file_name
4091     # BUG BUG BUG end
4092 
4093     ##########
4094 
4095     def create_harvesting_config_file_name(self, dataset_name):
4096         "Generate the name to be used for the harvesting config file."
4097 
4098         file_name_base = "harvesting.py"
4099         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4100         config_file_name = file_name_base.replace(".py",
4101                                                   "_%s.py" % \
4102                                                   dataset_name_escaped)
4103 
4104         # End of create_harvesting_config_file_name.
4105         return config_file_name
4106 
4107     ##########
4108 
4109     def create_me_summary_config_file_name(self, dataset_name):
4110         "Generate the name of the ME summary extraction config file."
4111 
4112         file_name_base = "me_extraction.py"
4113         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4114         config_file_name = file_name_base.replace(".py",
4115                                                   "_%s.py" % \
4116                                                   dataset_name_escaped)
4117 
4118         # End of create_me_summary_config_file_name.
4119         return config_file_name
4120 
4121     ##########
4122 
4123     def create_output_file_name(self, dataset_name, run_number=None):
4124         """Create the name of the output file name to be used.
4125 
4126         This is the name of the output file of the `first step'. In
4127         the case of single-step harvesting this is already the final
4128         harvesting output ROOT file. In the case of two-step
4129         harvesting it is the name of the intermediary ME summary
4130         file.
4131 
4132         """
4133 
4134         # BUG BUG BUG
4135         # This method has become a bit of a mess. Originally it was
4136         # nice to have one entry point for both single- and two-step
4137         # output file names. However, now the former needs the run
4138         # number, while the latter does not even know about run
4139         # numbers. This should be fixed up a bit.
4140         # BUG BUG BUG end
4141 
4142         if self.harvesting_mode == "single-step":
4143             # DEBUG DEBUG DEBUG
4144             assert not run_number is None
4145             # DEBUG DEBUG DEBUG end
4146             output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4147         elif self.harvesting_mode == "single-step-allow-partial":
4148             # DEBUG DEBUG DEBUG
4149             assert not run_number is None
4150             # DEBUG DEBUG DEBUG end
4151             output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4152         elif self.harvesting_mode == "two-step":
4153             # DEBUG DEBUG DEBUG
4154             assert run_number is None
4155             # DEBUG DEBUG DEBUG end
4156             output_file_name = self.create_me_summary_output_file_name(dataset_name)
4157         else:
4158             # This should not be possible, but hey...
4159             assert False, "ERROR Unknown harvesting mode `%s'" % \
4160                    self.harvesting_mode
4161 
4162         # End of create_harvesting_output_file_name.
4163         return output_file_name
4164 
4165     ##########
4166 
4167     def create_harvesting_output_file_name(self, dataset_name, run_number):
4168         """Generate the name to be used for the harvesting output file.
4169 
4170         This harvesting output file is the _final_ ROOT output file
4171         containing the harvesting results. In case of two-step
4172         harvesting there is an intermediate ME output file as well.
4173 
4174         """
4175 
4176         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4177 
4178         # Hmmm, looking at the code for the DQMFileSaver this might
4179         # actually be the place where the first part of this file
4180         # naming scheme comes from.
4181         # NOTE: It looks like the `V0001' comes from the DQM
4182         # version. This is something that cannot be looked up from
4183         # here, so let's hope it does not change too often.
4184         output_file_name = "DQM_V0001_R%09d__%s.root" % \
4185                            (run_number, dataset_name_escaped)
4186         if self.harvesting_mode.find("partial") > -1:
4187             # Only add the alarming piece to the file name if this is
4188             # a spread-out dataset.
4189             if self.datasets_information[dataset_name] \
4190                    ["mirrored"][run_number] == False:
4191                 output_file_name = output_file_name.replace(".root", \
4192                                                             "_partial.root")
4193 
4194         # End of create_harvesting_output_file_name.
4195         return output_file_name
4196 
4197     ##########
4198 
4199     def create_me_summary_output_file_name(self, dataset_name):
4200         """Generate the name of the intermediate ME file name to be
4201         used in two-step harvesting.
4202 
4203         """
4204 
4205         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4206         output_file_name = "me_summary_%s.root" % \
4207                            dataset_name_escaped
4208 
4209         # End of create_me_summary_output_file_name.
4210         return output_file_name
4211 
4212     ##########
4213 
4214     def create_multicrab_block_name(self, dataset_name, run_number, index):
4215         """Create the block name to use for this dataset/run number.
4216 
4217         This is what appears in the brackets `[]' in multicrab.cfg. It
4218         is used as the name of the job and to create output
4219         directories.
4220 
4221         """
4222 
4223         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4224         block_name = "%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4225 
4226         # End of create_multicrab_block_name.
4227         return block_name
4228 
4229     ##########
4230 
4231     def create_crab_config(self):
4232         """Create a CRAB configuration for a given job.
4233 
4234         NOTE: This is _not_ a complete (as in: submittable) CRAB
4235         configuration. It is used to store the common settings for the
4236         multicrab configuration.
4237 
4238         NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4239 
4240         NOTE: According to CRAB, you `Must define exactly two of
4241         total_number_of_events, events_per_job, or
4242         number_of_jobs.'. For single-step harvesting we force one job,
4243         for the rest we don't really care.
4244 
4245         # BUG BUG BUG
4246         # With the current version of CRAB (2.6.1), in which Daniele
4247         # fixed the behaviour of no_block_boundary for me, one _has to
4248         # specify_ the total_number_of_events and one single site in
4249         # the se_white_list.
4250         # BUG BUG BUG end
4251 
4252         """
4253 
4254         tmp = []
4255 
4256         # This is the stuff we will need to fill in.
4257         castor_prefix = self.castor_prefix
4258 
4259         tmp.append(self.config_file_header())
4260         tmp.append("")
4261 
4262         ## CRAB
4263         ##------
4264         tmp.append("[CRAB]")
4265         tmp.append("jobtype = cmssw")
4266         tmp.append("")
4267 
4268         ## GRID
4269         ##------
4270         tmp.append("[GRID]")
4271         tmp.append("virtual_organization=cms")
4272         tmp.append("")
4273 
4274         ## USER
4275         ##------
4276         tmp.append("[USER]")
4277         tmp.append("copy_data = 1")
4278         tmp.append("")
4279 
4280         ## CMSSW
4281         ##-------
4282         tmp.append("[CMSSW]")
4283         tmp.append("# This reveals data hosted on T1 sites,")
4284         tmp.append("# which is normally hidden by CRAB.")
4285         tmp.append("show_prod = 1")
4286         tmp.append("number_of_jobs = 1")
4287         if self.Jsonlumi == True:
4288             tmp.append("lumi_mask = %s" % self.Jsonfilename)
4289             tmp.append("total_number_of_lumis = -1")
4290         else:
4291             if self.harvesting_type == "DQMOffline":
4292                 tmp.append("total_number_of_lumis = -1")
4293             else:
4294                 tmp.append("total_number_of_events = -1")
4295         if self.harvesting_mode.find("single-step") > -1:
4296             tmp.append("# Force everything to run in one job.")
4297             tmp.append("no_block_boundary = 1")
4298         tmp.append("")
4299 
4300         ## CAF
4301         ##-----
4302         tmp.append("[CAF]")
4303 
4304         crab_config = "\n".join(tmp)
4305 
4306         # End of create_crab_config.
4307         return crab_config
4308 
4309     ##########
4310 
4311     def create_multicrab_config(self):
4312         """Create a multicrab.cfg file for all samples.
4313 
4314         This creates the contents for a multicrab.cfg file that uses
4315         the crab.cfg file (generated elsewhere) for the basic settings
4316         and contains blocks for each run of each dataset.
4317 
4318         # BUG BUG BUG
4319         # The fact that it's necessary to specify the se_white_list
4320         # and the total_number_of_events is due to our use of CRAB
4321         # version 2.6.1. This should no longer be necessary in the
4322         # future.
4323         # BUG BUG BUG end
4324 
4325         """
4326 
4327         cmd="who i am | cut -f1 -d' '"
4328         (status, output)=subprocess.getstatusoutput(cmd)
4329         UserName = output
4330 
4331         if self.caf_access == True:
4332             print("Extracting %s as user name" %UserName) 
4333 
4334         number_max_sites = self.nr_max_sites + 1
4335 
4336         multicrab_config_lines = []
4337         multicrab_config_lines.append(self.config_file_header())
4338         multicrab_config_lines.append("")
4339         multicrab_config_lines.append("[MULTICRAB]")
4340         multicrab_config_lines.append("cfg = crab.cfg")
4341         multicrab_config_lines.append("")
4342 
4343         dataset_names = sorted(self.datasets_to_use.keys())
4344 
4345         for dataset_name in dataset_names:
4346             runs = self.datasets_to_use[dataset_name]
4347             dataset_name_escaped = self.escape_dataset_name(dataset_name)
4348             castor_prefix = self.castor_prefix
4349 
4350             for run in runs:
4351 
4352                 # CASTOR output dir.
4353                 castor_dir = self.datasets_information[dataset_name] \
4354                                  ["castor_path"][run]
4355 
4356                 cmd = "rfdir %s" % castor_dir
4357                 (status, output) = subprocess.getstatusoutput(cmd)
4358 
4359                 if len(output) <= 0:
4360 
4361                     # DEBUG DEBUG DEBUG
4362                     # We should only get here if we're treating a
4363                     # dataset/run that is fully contained at a single
4364                     # site.
4365                     assert (len(self.datasets_information[dataset_name] \
4366                             ["sites"][run]) == 1) or \
4367                             self.datasets_information[dataset_name]["mirrored"]
4368                     # DEBUG DEBUG DEBUG end
4369 
4370                     site_names = self.datasets_information[dataset_name] \
4371                              ["sites"][run].keys()
4372 
4373                     for i in range(1, number_max_sites, 1):
4374                         if len(site_names) > 0: 
4375                             index = "site_%02d" % (i)
4376 
4377                             config_file_name = self. \
4378                                        create_config_file_name(dataset_name, run)
4379                             output_file_name = self. \
4380                                        create_output_file_name(dataset_name, run)
4381 
4382 
4383                             # If we're looking at a mirrored dataset we just pick
4384                             # one of the sites. Otherwise there is nothing to
4385                             # choose.
4386 
4387                             # Loop variable
4388                             loop = 0
4389 
4390                             if len(site_names) > 1:
4391                                 cmssw_version = self.datasets_information[dataset_name] \
4392                                             ["cmssw_version"]
4393                                 self.logger.info("Picking site for mirrored dataset " \
4394                                              "`%s', run %d" % \
4395                                              (dataset_name, run))
4396                                 site_name = self.pick_a_site(site_names, cmssw_version)
4397                                 if site_name in site_names:
4398                                     site_names.remove(site_name)
4399 
4400                             else:
4401                                 site_name = site_names[0]
4402                                 site_names.remove(site_name)
4403 
4404                             if site_name is self.no_matching_site_found_str:
4405                                 if loop < 1:
4406                                     break
4407 
4408                             nevents = self.datasets_information[dataset_name]["num_events"][run]
4409 
4410                             # The block name.
4411                             multicrab_block_name = self.create_multicrab_block_name( \
4412                                 dataset_name, run, index)
4413                             multicrab_config_lines.append("[%s]" % \
4414                                                       multicrab_block_name)
4415 
4416                             ## CRAB
4417                             ##------
4418                             if site_name == "caf.cern.ch":
4419                                 multicrab_config_lines.append("CRAB.use_server=0")
4420                                 multicrab_config_lines.append("CRAB.scheduler=caf")
4421                             else:
4422                                 multicrab_config_lines.append("scheduler = glite")
4423 
4424                             ## GRID
4425                             ##------
4426                             if site_name == "caf.cern.ch":
4427                                 pass
4428                             else:
4429                                 multicrab_config_lines.append("GRID.se_white_list = %s" % \
4430                                                       site_name)
4431                                 multicrab_config_lines.append("# This removes the default blacklisting of T1 sites.")
4432                                 multicrab_config_lines.append("GRID.remove_default_blacklist = 1")
4433                                 multicrab_config_lines.append("GRID.rb = CERN")
4434                                 if not self.non_t1access:
4435                                     multicrab_config_lines.append("GRID.role = t1access")
4436 
4437                             ## USER
4438                             ##------
4439 
4440                             castor_dir = castor_dir.replace(castor_prefix, "")
4441                             multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4442                             multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4443                                                       castor_dir)
4444                             multicrab_config_lines.append("USER.check_user_remote_dir=0")
4445 
4446                             if site_name == "caf.cern.ch":
4447                                 multicrab_config_lines.append("USER.storage_path=%s" % castor_prefix)
4448                                 #multicrab_config_lines.append("USER.storage_element=T2_CH_CAF")
4449                                 #castor_dir = castor_dir.replace("/cms/store/caf/user/%s" %UserName, "")
4450                                 #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4451                                 #             castor_dir)
4452                             else:
4453                                 multicrab_config_lines.append("USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4454                                 #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4455                                 #             castor_dir)
4456                                 #multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4457 
4458                             ## CMSSW
4459                             ##-------
4460                             multicrab_config_lines.append("CMSSW.pset = %s" % \
4461                                                        config_file_name)
4462                             multicrab_config_lines.append("CMSSW.datasetpath = %s" % \
4463                                                       dataset_name)
4464                             multicrab_config_lines.append("CMSSW.runselection = %d" % \
4465                                                   run)
4466 
4467                             if self.Jsonlumi == True:
4468                                 pass
4469                             else:
4470                                 if self.harvesting_type == "DQMOffline":
4471                                     pass
4472                                 else:
4473                                     multicrab_config_lines.append("CMSSW.total_number_of_events = %d" % \
4474                                                                   nevents)
4475                             # The output file name.
4476                             multicrab_config_lines.append("CMSSW.output_file = %s" % \
4477                                                       output_file_name)
4478 
4479                             ## CAF
4480                             ##-----
4481                             if site_name == "caf.cern.ch":
4482                                 multicrab_config_lines.append("CAF.queue=cmscaf1nd")
4483 
4484 
4485                             # End of block.
4486                             multicrab_config_lines.append("")
4487 
4488                             loop = loop + 1
4489 
4490                             self.all_sites_found = True
4491 
4492         multicrab_config = "\n".join(multicrab_config_lines)
4493 
4494         # End of create_multicrab_config.
4495         return multicrab_config
4496 
4497     ##########
4498 
4499     def check_globaltag(self, globaltag=None):
4500         """Check if globaltag exists.
4501 
4502         Check if globaltag exists as GlobalTag in the database given
4503         by self.frontier_connection_name['globaltag']. If globaltag is
4504         None, self.globaltag is used instead.
4505 
4506         If we're going to use reference histograms this method also
4507         checks for the existence of the required key in the GlobalTag.
4508 
4509         """
4510 
4511         if globaltag is None:
4512             globaltag = self.globaltag
4513 
4514         # All GlobalTags should end in `::All', right?
4515         if globaltag.endswith("::All"):
4516             globaltag = globaltag[:-5]
4517 
4518         connect_name = self.frontier_connection_name["globaltag"]
4519         # BUG BUG BUG
4520         # There is a bug in cmscond_tagtree_list: some magic is
4521         # missing from the implementation requiring one to specify
4522         # explicitly the name of the squid to connect to. Since the
4523         # cmsHarvester can only be run from the CERN network anyway,
4524         # cmsfrontier:8000 is hard-coded in here. Not nice but it
4525         # works.
4526         connect_name = connect_name.replace("frontier://",
4527                                             "frontier://cmsfrontier:8000/")
4528         # BUG BUG BUG end
4529         connect_name += self.db_account_name_cms_cond_globaltag()
4530 
4531         tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4532 
4533         #----------
4534 
4535         tag_contains_ref_hist_key = False
4536         if self.use_ref_hists and tag_exists:
4537             # Check for the key required to use reference histograms.
4538             tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4539 
4540         #----------
4541 
4542         if self.use_ref_hists:
4543             ret_val = tag_exists and tag_contains_ref_hist_key
4544         else:
4545             ret_val = tag_exists
4546 
4547         #----------
4548 
4549         # End of check_globaltag.
4550         return ret_val
4551 
4552     ##########
4553 
4554     def check_globaltag_exists(self, globaltag, connect_name):
4555         """Check if globaltag exists.
4556 
4557         """
4558 
4559         self.logger.info("Checking existence of GlobalTag `%s'" % \
4560                          globaltag)
4561         self.logger.debug("  (Using database connection `%s')" % \
4562                           connect_name)
4563 
4564         cmd = "cmscond_tagtree_list -c %s -T %s" % \
4565               (connect_name, globaltag)
4566         (status, output) = subprocess.getstatusoutput(cmd)
4567         if status != 0 or \
4568                output.find("error") > -1:
4569             msg = "Could not check existence of GlobalTag `%s' in `%s'" % \
4570                   (globaltag, connect_name)
4571             if output.find(".ALL_TABLES not found") > -1:
4572                 msg = "%s\n" \
4573                       "Missing database account `%s'" % \
4574                       (msg, output.split(".ALL_TABLES")[0].split()[-1])
4575             self.logger.fatal(msg)
4576             self.logger.debug("Command used:")
4577             self.logger.debug("  %s" % cmd)
4578             self.logger.debug("Output received:")
4579             self.logger.debug(output)
4580             raise Error(msg)
4581         if output.find("does not exist") > -1:
4582             self.logger.debug("GlobalTag `%s' does not exist in `%s':" % \
4583                               (globaltag, connect_name))
4584             self.logger.debug("Output received:")
4585             self.logger.debug(output)
4586             tag_exists = False
4587         else:
4588             tag_exists = True
4589         self.logger.info("  GlobalTag exists? -> %s" % tag_exists)
4590 
4591         # End of check_globaltag_exists.
4592         return tag_exists
4593 
4594     ##########
4595 
4596     def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name):
4597         """Check if globaltag contains the required RefHistos key.
4598 
4599         """
4600 
4601         # Check for the key required to use reference histograms.
4602         tag_contains_key = None
4603         ref_hist_key = "RefHistos"
4604         self.logger.info("Checking existence of reference " \
4605                          "histogram key `%s' in GlobalTag `%s'" % \
4606                          (ref_hist_key, globaltag))
4607         self.logger.debug("  (Using database connection `%s')" % \
4608                               connect_name)
4609         cmd = "cmscond_tagtree_list -c %s -T %s -n %s" % \
4610               (connect_name, globaltag, ref_hist_key)
4611         (status, output) = subprocess.getstatusoutput(cmd)
4612         if status != 0 or \
4613                output.find("error") > -1:
4614             msg = "Could not check existence of key `%s'" % \
4615                   (ref_hist_key, connect_name)
4616             self.logger.fatal(msg)
4617             self.logger.debug("Command used:")
4618             self.logger.debug("  %s" % cmd)
4619             self.logger.debug("Output received:")
4620             self.logger.debug("  %s" % output)
4621             raise Error(msg)
4622         if len(output) < 1:
4623             self.logger.debug("Required key for use of reference " \
4624                               "histograms `%s' does not exist " \
4625                               "in GlobalTag `%s':" % \
4626                               (ref_hist_key, globaltag))
4627             self.logger.debug("Output received:")
4628             self.logger.debug(output)
4629             tag_contains_key = False
4630         else:
4631             tag_contains_key = True
4632 
4633         self.logger.info("  GlobalTag contains `%s' key? -> %s" % \
4634                          (ref_hist_key, tag_contains_key))
4635 
4636         # End of check_globaltag_contains_ref_hist_key.
4637         return tag_contains_key
4638 
4639     ##########
4640 
4641     def check_ref_hist_tag(self, tag_name):
4642         """Check the existence of tag_name in database connect_name.
4643 
4644         Check if tag_name exists as a reference histogram tag in the
4645         database given by self.frontier_connection_name['refhists'].
4646 
4647         """
4648 
4649         connect_name = self.frontier_connection_name["refhists"]
4650         connect_name += self.db_account_name_cms_cond_dqm_summary()
4651 
4652         self.logger.debug("Checking existence of reference " \
4653                           "histogram tag `%s'" % \
4654                           tag_name)
4655         self.logger.debug("  (Using database connection `%s')" % \
4656                           connect_name)
4657 
4658         cmd = "cmscond_list_iov -c %s" % \
4659               connect_name
4660         (status, output) = subprocess.getstatusoutput(cmd)
4661         if status != 0:
4662             msg = "Could not check existence of tag `%s' in `%s'" % \
4663                   (tag_name, connect_name)
4664             self.logger.fatal(msg)
4665             self.logger.debug("Command used:")
4666             self.logger.debug("  %s" % cmd)
4667             self.logger.debug("Output received:")
4668             self.logger.debug(output)
4669             raise Error(msg)
4670         if not tag_name in output.split():
4671             self.logger.debug("Reference histogram tag `%s' " \
4672                               "does not exist in `%s'" % \
4673                               (tag_name, connect_name))
4674             self.logger.debug(" Existing tags: `%s'" % \
4675                               "', `".join(output.split()))
4676             tag_exists = False
4677         else:
4678             tag_exists = True
4679         self.logger.debug("  Reference histogram tag exists? " \
4680                           "-> %s" % tag_exists)
4681 
4682         # End of check_ref_hist_tag.
4683         return tag_exists
4684 
4685     ##########
4686 
4687     def create_harvesting_config(self, dataset_name):
4688         """Create the Python harvesting configuration for harvesting.
4689 
4690         The basic configuration is created by
4691         Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4692         what cmsDriver.py does.) After that we add some specials
4693         ourselves.
4694 
4695         NOTE: On one hand it may not be nice to circumvent
4696         cmsDriver.py, on the other hand cmsDriver.py does not really
4697         do anything itself. All the real work is done by the
4698         ConfigBuilder so there is not much risk that we miss out on
4699         essential developments of cmsDriver in the future.
4700 
4701         """
4702 
4703         # Setup some options needed by the ConfigBuilder.
4704         config_options = defaultOptions
4705 
4706         # These are fixed for all kinds of harvesting jobs. Some of
4707         # them are not needed for the harvesting config, but to keep
4708         # the ConfigBuilder happy.
4709         config_options.name = "harvesting"
4710         config_options.scenario = "pp"
4711         config_options.number = 1
4712         config_options.arguments = self.ident_string()
4713         config_options.evt_type = config_options.name
4714         config_options.customisation_file = None
4715         config_options.filein = "dummy_value"
4716         config_options.filetype = "EDM"
4717         # This seems to be new in CMSSW 3.3.X, no clue what it does.
4718         config_options.gflash = "dummy_value"
4719         # This seems to be new in CMSSW 3.3.0.pre6, no clue what it
4720         # does.
4721         #config_options.himix = "dummy_value"
4722         config_options.dbsquery = ""
4723 
4724         ###
4725 
4726         # These options depend on the type of harvesting we're doing
4727         # and are stored in self.harvesting_info.
4728 
4729         config_options.step = "HARVESTING:%s" % \
4730                               self.harvesting_info[self.harvesting_type] \
4731                               ["step_string"]
4732         config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4733                                   ["beamspot"]
4734         config_options.eventcontent = self.harvesting_info \
4735                                       [self.harvesting_type] \
4736                                       ["eventcontent"]
4737         config_options.harvesting = self.harvesting_info \
4738                                     [self.harvesting_type] \
4739                                     ["harvesting"]
4740 
4741         ###
4742 
4743         # This one is required (see also above) for each dataset.
4744 
4745         datatype = self.datasets_information[dataset_name]["datatype"]
4746         config_options.isMC = (datatype.lower() == "mc")
4747         config_options.isData = (datatype.lower() == "data")
4748         globaltag = self.datasets_information[dataset_name]["globaltag"]
4749 
4750         config_options.conditions = self.format_conditions_string(globaltag)
4751 
4752         ###
4753 
4754         if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4755             # This is the case for 3.3.X.
4756             config_builder = ConfigBuilder(config_options, with_input=True)
4757         else:
4758             # This is the case in older CMSSW versions.
4759             config_builder = ConfigBuilder(config_options)
4760         config_builder.prepare(True)
4761         config_contents = config_builder.pythonCfgCode
4762 
4763         ###
4764 
4765         # Add our signature to the top of the configuration.  and add
4766         # some markers to the head and the tail of the Python code
4767         # generated by the ConfigBuilder.
4768         marker_lines = []
4769         sep = "#" * 30
4770         marker_lines.append(sep)
4771         marker_lines.append("# Code between these markers was generated by")
4772         marker_lines.append("# Configuration.PyReleaseValidation." \
4773                             "ConfigBuilder")
4774 
4775         marker_lines.append(sep)
4776         marker = "\n".join(marker_lines)
4777 
4778         tmp = [self.config_file_header()]
4779         tmp.append("")
4780         tmp.append(marker)
4781         tmp.append("")
4782         tmp.append(config_contents)
4783         tmp.append("")
4784         tmp.append(marker)
4785         tmp.append("")
4786         config_contents = "\n".join(tmp)
4787 
4788         ###
4789 
4790         # Now we add some stuff of our own.
4791         customisations = [""]
4792 
4793         customisations.append("# Now follow some customisations")
4794         customisations.append("")
4795         connect_name = self.frontier_connection_name["globaltag"]
4796         connect_name += self.db_account_name_cms_cond_globaltag()
4797         customisations.append("process.GlobalTag.connect = \"%s\"" % \
4798                               connect_name)
4799 
4800 
4801         if self.saveByLumiSection == True:
4802             customisations.append("process.dqmSaver.saveByLumiSection = 1")
4803         ##
4804         ##
4805 
4806         customisations.append("")
4807 
4808         # About the reference histograms... For data there is only one
4809         # set of references and those are picked up automatically
4810         # based on the GlobalTag. For MC we have to do some more work
4811         # since the reference histograms to be used depend on the MC
4812         # sample at hand. In this case we glue in an es_prefer snippet
4813         # to pick up the references. We do this only for RelVals since
4814         # for MC there are no meaningful references so far.
4815 
4816         # NOTE: Due to the lack of meaningful references for
4817         # MC samples reference histograms are explicitly
4818         # switched off in this case.
4819 
4820         use_es_prefer = (self.harvesting_type == "RelVal")
4821         use_refs = use_es_prefer or \
4822                    (not self.harvesting_type == "MC")
4823         # Allow global override.
4824         use_refs = use_refs and self.use_ref_hists
4825 
4826         if not use_refs:
4827             # Disable reference histograms explicitly. The histograms
4828             # are loaded by the dqmRefHistoRootFileGetter
4829             # EDAnalyzer. This analyzer can be run from several
4830             # sequences. Here we remove it from each sequence that
4831             # exists.
4832             customisations.append("print \"Not using reference histograms\"")
4833             customisations.append("if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4834             customisations.append("    for (sequence_name, sequence) in process.sequences.items():")
4835             customisations.append("        if sequence.remove(process.dqmRefHistoRootFileGetter):")
4836             customisations.append("            print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4837             customisations.append("                  sequence_name")
4838             customisations.append("process.dqmSaver.referenceHandling = \"skip\"")
4839         else:
4840             # This makes sure all reference histograms are saved to
4841             # the output ROOT file.
4842             customisations.append("process.dqmSaver.referenceHandling = \"all\"")
4843             if use_es_prefer:
4844                 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4845                 customisations.append(es_prefer_snippet)
4846 
4847         # Make sure we get the `workflow' correct. As far as I can see
4848         # this is only important for the output file name.
4849         workflow_name = dataset_name
4850         if self.harvesting_mode == "single-step-allow-partial":
4851             workflow_name += "_partial"
4852         customisations.append("process.dqmSaver.workflow = \"%s\"" % \
4853                               workflow_name)
4854 
4855         # BUG BUG BUG
4856         # This still does not work. The current two-step harvesting
4857         # efforts are on hold waiting for the solution to come from
4858         # elsewhere. (In this case the elsewhere is Daniele Spiga.)
4859 
4860 ##        # In case this file is the second step (the real harvesting
4861 ##        # step) of the two-step harvesting we have to tell it to use
4862 ##        # our local files.
4863 ##        if self.harvesting_mode == "two-step":
4864 ##            castor_dir = self.datasets_information[dataset_name] \
4865 ##                         ["castor_path"][run]
4866 ##            customisations.append("")
4867 ##            customisations.append("# This is the second step (the real")
4868 ##            customisations.append("# harvesting step) of a two-step")
4869 ##            customisations.append("# harvesting procedure.")
4870 ##            # BUG BUG BUG
4871 ##            # To be removed in production version.
4872 ##            customisations.append("import pdb")
4873 ##            # BUG BUG BUG end
4874 ##            customisations.append("import subprocess")
4875 ##            customisations.append("import os")
4876 ##            customisations.append("castor_dir = \"%s\"" % castor_dir)
4877 ##            customisations.append("cmd = \"rfdir %s\" % castor_dir")
4878 ##            customisations.append("(status, output) = subprocess.getstatusoutput(cmd)")
4879 ##            customisations.append("if status != 0:")
4880 ##            customisations.append("    print \"ERROR\"")
4881 ##            customisations.append("    raise Exception, \"ERROR\"")
4882 ##            customisations.append("file_names = [os.path.join(\"rfio:%s\" % path, i) for i in output.split() if i.startswith(\"EDM_summary\") and i.endswith(\".root\")]")
4883 ##            #customisations.append("pdb.set_trace()")
4884 ##            customisations.append("process.source.fileNames = cms.untracked.vstring(*file_names)")
4885 ##            customisations.append("")
4886 
4887         # BUG BUG BUG end
4888 
4889         config_contents = config_contents + "\n".join(customisations)
4890 
4891         ###
4892 
4893         # End of create_harvesting_config.
4894         return config_contents
4895 
4896 ##    ##########
4897 
4898 ##    def create_harvesting_config_two_step(self, dataset_name):
4899 ##        """Create the Python harvesting configuration for two-step
4900 ##        harvesting.
4901 
4902 ##        """
4903 
4904 ##        # BUG BUG BUG
4905 ##        config_contents = self.create_harvesting_config_single_step(dataset_name)
4906 ##        # BUG BUG BUG end
4907 
4908 ##        # End of create_harvesting_config_two_step.
4909 ##        return config_contents
4910 
4911     ##########
4912 
4913     def create_me_extraction_config(self, dataset_name):
4914         """
4915 
4916         """
4917 
4918         # Big chunk of hard-coded Python. Not such a big deal since
4919         # this does not do much and is not likely to break.
4920         tmp = []
4921         tmp.append(self.config_file_header())
4922         tmp.append("")
4923         tmp.append("import FWCore.ParameterSet.Config as cms")
4924         tmp.append("")
4925         tmp.append("process = cms.Process(\"ME2EDM\")")
4926         tmp.append("")
4927         tmp.append("# Import of standard configurations")
4928         tmp.append("process.load(\"Configuration/EventContent/EventContent_cff\")")
4929         tmp.append("")
4930         tmp.append("# We don't really process any events, just keep this set to one to")
4931         tmp.append("# make sure things work.")
4932         tmp.append("process.maxEvents = cms.untracked.PSet(")
4933         tmp.append("    input = cms.untracked.int32(1)")
4934         tmp.append("    )")
4935         tmp.append("")
4936         tmp.append("process.options = cms.untracked.PSet(")
4937         tmp.append("    Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4938         tmp.append("    )")
4939         tmp.append("")
4940         tmp.append("process.source = cms.Source(\"PoolSource\",")
4941         tmp.append("                            processingMode = \\")
4942         tmp.append("                            cms.untracked.string(\"RunsAndLumis\"),")
4943         tmp.append("                            fileNames = \\")
4944         tmp.append("                            cms.untracked.vstring(\"no_file_specified\")")
4945         tmp.append("                            )")
4946         tmp.append("")
4947         tmp.append("# Output definition: drop everything except for the monitoring.")
4948         tmp.append("process.output = cms.OutputModule(")
4949         tmp.append("    \"PoolOutputModule\",")
4950         tmp.append("    outputCommands = \\")
4951         tmp.append("    cms.untracked.vstring(\"drop *\", \\")
4952         tmp.append("                          \"keep *_MEtoEDMConverter_*_*\"),")
4953         output_file_name = self. \
4954                            create_output_file_name(dataset_name)
4955         tmp.append("    fileName = \\")
4956         tmp.append("    cms.untracked.string(\"%s\")," % output_file_name)
4957         tmp.append("    dataset = cms.untracked.PSet(")
4958         tmp.append("    dataTier = cms.untracked.string(\"RECO\"),")
4959         tmp.append("    filterName = cms.untracked.string(\"\")")
4960         tmp.append("    )")
4961         tmp.append("    )")
4962         tmp.append("")
4963         tmp.append("# Additional output definition")
4964         tmp.append("process.out_step = cms.EndPath(process.output)")
4965         tmp.append("")
4966         tmp.append("# Schedule definition")
4967         tmp.append("process.schedule = cms.Schedule(process.out_step)")
4968         tmp.append("")
4969 
4970         config_contents = "\n".join(tmp)
4971 
4972         # End of create_me_extraction_config.
4973         return config_contents
4974 
4975     ##########
4976 
4977 ##    def create_harvesting_config(self, dataset_name):
4978 ##        """Create the Python harvesting configuration for a given job.
4979 
4980 ##        NOTE: The reason to have a single harvesting configuration per
4981 ##        sample is to be able to specify the GlobalTag corresponding to
4982 ##        each sample. Since it has been decided that (apart from the
4983 ##        prompt reco) datasets cannot contain runs with different
4984 ##        GlobalTags, we don't need a harvesting config per run.
4985 
4986 ##        NOTE: This is the place where we distinguish between
4987 ##        single-step and two-step harvesting modes (at least for the
4988 ##        Python job configuration).
4989 
4990 ##        """
4991 
4992 ##        ###
4993 
4994 ##        if self.harvesting_mode == "single-step":
4995 ##            config_contents = self.create_harvesting_config_single_step(dataset_name)
4996 ##        elif self.harvesting_mode == "two-step":
4997 ##            config_contents = self.create_harvesting_config_two_step(dataset_name)
4998 ##        else:
4999 ##            # Impossible harvesting mode, we should never get here.
5000 ##            assert False, "ERROR: unknown harvesting mode `%s'" % \
5001 ##                   self.harvesting_mode
5002 
5003 ##        ###
5004 
5005 ##        # End of create_harvesting_config.
5006 ##        return config_contents
5007 
5008     ##########
5009 
5010     def write_crab_config(self):
5011         """Write a CRAB job configuration Python file.
5012 
5013         """
5014 
5015         self.logger.info("Writing CRAB configuration...")
5016 
5017         file_name_base = "crab.cfg"
5018 
5019         # Create CRAB configuration.
5020         crab_contents = self.create_crab_config()
5021 
5022         # Write configuration to file.
5023         crab_file_name = file_name_base
5024         try:
5025             crab_file = file(crab_file_name, "w")
5026             crab_file.write(crab_contents)
5027             crab_file.close()
5028         except IOError:
5029             self.logger.fatal("Could not write " \
5030                               "CRAB configuration to file `%s'" % \
5031                               crab_file_name)
5032             raise Error("ERROR: Could not write to file `%s'!" % \
5033                         crab_file_name)
5034 
5035         # End of write_crab_config.
5036 
5037     ##########
5038 
5039     def write_multicrab_config(self):
5040         """Write a multi-CRAB job configuration Python file.
5041 
5042         """
5043 
5044         self.logger.info("Writing multi-CRAB configuration...")
5045 
5046         file_name_base = "multicrab.cfg"
5047 
5048         # Create multi-CRAB configuration.
5049         multicrab_contents = self.create_multicrab_config()
5050 
5051         # Write configuration to file.
5052         multicrab_file_name = file_name_base
5053         try:
5054             multicrab_file = file(multicrab_file_name, "w")
5055             multicrab_file.write(multicrab_contents)
5056             multicrab_file.close()
5057         except IOError:
5058             self.logger.fatal("Could not write " \
5059                               "multi-CRAB configuration to file `%s'" % \
5060                               multicrab_file_name)
5061             raise Error("ERROR: Could not write to file `%s'!" % \
5062                         multicrab_file_name)
5063 
5064         # End of write_multicrab_config.
5065 
5066     ##########
5067 
5068     def write_harvesting_config(self, dataset_name):
5069         """Write a harvesting job configuration Python file.
5070 
5071         NOTE: This knows nothing about single-step or two-step
5072         harvesting. That's all taken care of by
5073         create_harvesting_config.
5074 
5075         """
5076 
5077         self.logger.debug("Writing harvesting configuration for `%s'..." % \
5078                           dataset_name)
5079 
5080         # Create Python configuration.
5081         config_contents = self.create_harvesting_config(dataset_name)
5082 
5083         # Write configuration to file.
5084         config_file_name = self. \
5085                            create_harvesting_config_file_name(dataset_name)
5086         try:
5087             config_file = file(config_file_name, "w")
5088             config_file.write(config_contents)
5089             config_file.close()
5090         except IOError:
5091             self.logger.fatal("Could not write " \
5092                               "harvesting configuration to file `%s'" % \
5093                               config_file_name)
5094             raise Error("ERROR: Could not write to file `%s'!" % \
5095                         config_file_name)
5096 
5097         # End of write_harvesting_config.
5098 
5099     ##########
5100 
5101     def write_me_extraction_config(self, dataset_name):
5102         """Write an ME-extraction configuration Python file.
5103 
5104         This `ME-extraction' (ME = Monitoring Element) is the first
5105         step of the two-step harvesting.
5106 
5107         """
5108 
5109         self.logger.debug("Writing ME-extraction configuration for `%s'..." % \
5110                           dataset_name)
5111 
5112         # Create Python configuration.
5113         config_contents = self.create_me_extraction_config(dataset_name)
5114 
5115         # Write configuration to file.
5116         config_file_name = self. \
5117                            create_me_summary_config_file_name(dataset_name)
5118         try:
5119             config_file = file(config_file_name, "w")
5120             config_file.write(config_contents)
5121             config_file.close()
5122         except IOError:
5123             self.logger.fatal("Could not write " \
5124                               "ME-extraction configuration to file `%s'" % \
5125                               config_file_name)
5126             raise Error("ERROR: Could not write to file `%s'!" % \
5127                         config_file_name)
5128 
5129         # End of write_me_extraction_config.
5130 
5131     ##########
5132 
5133 
5134     def ref_hist_mappings_needed(self, dataset_name=None):
5135         """Check if we need to load and check the reference mappings.
5136 
5137         For data the reference histograms should be taken
5138         automatically from the GlobalTag, so we don't need any
5139         mappings. For RelVals we need to know a mapping to be used in
5140         the es_prefer code snippet (different references for each of
5141         the datasets.)
5142 
5143         WARNING: This implementation is a bit convoluted.
5144 
5145         """
5146 
5147         # If no dataset name given, do everything, otherwise check
5148         # only this one dataset.
5149         if not dataset_name is None:
5150             data_type = self.datasets_information[dataset_name] \
5151                         ["datatype"]
5152             mappings_needed = (data_type == "mc")
5153             # DEBUG DEBUG DEBUG
5154             if not mappings_needed:
5155                 assert data_type == "data"
5156             # DEBUG DEBUG DEBUG end
5157         else:
5158             tmp = [self.ref_hist_mappings_needed(dataset_name) \
5159                    for dataset_name in \
5160                    self.datasets_information.keys()]
5161             mappings_needed = (True in tmp)
5162 
5163         # End of ref_hist_mappings_needed.
5164         return mappings_needed
5165 
5166     ##########
5167 
5168     def load_ref_hist_mappings(self):
5169         """Load the reference histogram mappings from file.
5170 
5171         The dataset name to reference histogram name mappings are read
5172         from a text file specified in self.ref_hist_mappings_file_name.
5173 
5174         """
5175 
5176         # DEBUG DEBUG DEBUG
5177         assert len(self.ref_hist_mappings) < 1, \
5178                "ERROR Should not be RE-loading " \
5179                "reference histogram mappings!"
5180         # DEBUG DEBUG DEBUG end
5181 
5182         self.logger.info("Loading reference histogram mappings " \
5183                          "from file `%s'" % \
5184                          self.ref_hist_mappings_file_name)
5185 
5186         mappings_lines = None
5187         try:
5188             mappings_file = file(self.ref_hist_mappings_file_name, "r")
5189             mappings_lines = mappings_file.readlines()
5190             mappings_file.close()
5191         except IOError:
5192             msg = "ERROR: Could not open reference histogram mapping "\
5193                   "file `%s'" % self.ref_hist_mappings_file_name
5194             self.logger.fatal(msg)
5195             raise Error(msg)
5196 
5197         ##########
5198 
5199         # The format we expect is: two white-space separated pieces
5200         # per line. The first the dataset name for which the reference
5201         # should be used, the second one the name of the reference
5202         # histogram in the database.
5203 
5204         for mapping in mappings_lines:
5205             # Skip comment lines.
5206             if not mapping.startswith("#"):
5207                 mapping = mapping.strip()
5208                 if len(mapping) > 0:
5209                     mapping_pieces = mapping.split()
5210                     if len(mapping_pieces) != 2:
5211                         msg = "ERROR: The reference histogram mapping " \
5212                               "file contains a line I don't " \
5213                               "understand:\n  %s" % mapping
5214                         self.logger.fatal(msg)
5215                         raise Error(msg)
5216                     dataset_name = mapping_pieces[0].strip()
5217                     ref_hist_name = mapping_pieces[1].strip()
5218                     # We don't want people to accidentally specify
5219                     # multiple mappings for the same dataset. Just
5220                     # don't accept those cases.
5221                     if dataset_name in self.ref_hist_mappings:
5222                         msg = "ERROR: The reference histogram mapping " \
5223                               "file contains multiple mappings for " \
5224                               "dataset `%s'."
5225                         self.logger.fatal(msg)
5226                         raise Error(msg)
5227 
5228                     # All is well that ends well.
5229                     self.ref_hist_mappings[dataset_name] = ref_hist_name
5230 
5231         ##########
5232 
5233         self.logger.info("  Successfully loaded %d mapping(s)" % \
5234                          len(self.ref_hist_mappings))
5235         max_len = max([len(i) for i in self.ref_hist_mappings.keys()])
5236         for (map_from, map_to) in self.ref_hist_mappings.items():
5237             self.logger.info("    %-*s -> %s" % \
5238                               (max_len, map_from, map_to))
5239 
5240         # End of load_ref_hist_mappings.
5241 
5242     ##########
5243 
5244     def check_ref_hist_mappings(self):
5245         """Make sure all necessary reference histograms exist.
5246 
5247         Check that for each of the datasets to be processed a
5248         reference histogram is specified and that that histogram
5249         exists in the database.
5250 
5251         NOTE: There's a little complication here. Since this whole
5252         thing was designed to allow (in principle) harvesting of both
5253         data and MC datasets in one go, we need to be careful to check
5254         the availability fof reference mappings only for those
5255         datasets that need it.
5256 
5257         """
5258 
5259         self.logger.info("Checking reference histogram mappings")
5260 
5261         for dataset_name in self.datasets_to_use:
5262             try:
5263                 ref_hist_name = self.ref_hist_mappings[dataset_name]
5264             except KeyError:
5265                 msg = "ERROR: No reference histogram mapping found " \
5266                       "for dataset `%s'" % \
5267                       dataset_name
5268                 self.logger.fatal(msg)
5269                 raise Error(msg)
5270 
5271             if not self.check_ref_hist_tag(ref_hist_name):
5272                 msg = "Reference histogram tag `%s' " \
5273                       "(used for dataset `%s') does not exist!" % \
5274                       (ref_hist_name, dataset_name)
5275                 self.logger.fatal(msg)
5276                 raise Usage(msg)
5277 
5278         self.logger.info("  Done checking reference histogram mappings.")
5279 
5280         # End of check_ref_hist_mappings.
5281 
5282     ##########
5283 
5284     def build_datasets_information(self):
5285         """Obtain all information on the datasets that we need to run.
5286 
5287         Use DBS to figure out all required information on our
5288         datasets, like the run numbers and the GlobalTag. All
5289         information is stored in the datasets_information member
5290         variable.
5291 
5292         """
5293 
5294         # Get a list of runs in the dataset.
5295         # NOTE: The harvesting has to be done run-by-run, so we
5296         # split up datasets based on the run numbers. Strictly
5297         # speaking this is not (yet?) necessary for Monte Carlo
5298         # since all those samples use run number 1. Still, this
5299         # general approach should work for all samples.
5300 
5301         # Now loop over all datasets in the list and process them.
5302         # NOTE: This processing has been split into several loops
5303         # to be easier to follow, sacrificing a bit of efficiency.
5304         self.datasets_information = {}
5305         self.logger.info("Collecting information for all datasets to process")
5306         dataset_names = sorted(self.datasets_to_use.keys())
5307         for dataset_name in dataset_names:
5308 
5309             # Tell the user which dataset: nice with many datasets.
5310             sep_line = "-" * 30
5311             self.logger.info(sep_line)
5312             self.logger.info("  `%s'" % dataset_name)
5313             self.logger.info(sep_line)
5314 
5315             runs = self.dbs_resolve_runs(dataset_name)
5316             self.logger.info("    found %d run(s)" % len(runs))
5317             if len(runs) > 0:
5318                 self.logger.debug("      run number(s): %s" % \
5319                                   ", ".join([str(i) for i in runs]))
5320             else:
5321                 # DEBUG DEBUG DEBUG
5322                 # This should never happen after the DBS checks.
5323                 self.logger.warning("  --> skipping dataset "
5324                                     "without any runs")
5325                 assert False, "Panic: found a dataset without runs " \
5326                        "after DBS checks!"
5327                 # DEBUG DEBUG DEBUG end
5328 
5329             cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5330             self.logger.info("    found CMSSW version `%s'" % cmssw_version)
5331 
5332             # Figure out if this is data or MC.
5333             datatype = self.dbs_resolve_datatype(dataset_name)
5334             self.logger.info("    sample is data or MC? --> %s" % \
5335                              datatype)
5336 
5337             ###
5338 
5339             # Try and figure out the GlobalTag to be used.
5340             if self.globaltag is None:
5341                 globaltag = self.dbs_resolve_globaltag(dataset_name)
5342             else:
5343                 globaltag = self.globaltag
5344 
5345             self.logger.info("    found GlobalTag `%s'" % globaltag)
5346 
5347             # DEBUG DEBUG DEBUG
5348             if globaltag == "":
5349                 # Actually we should not even reach this point, after
5350                 # our dataset sanity checks.
5351                 assert datatype == "data", \
5352                        "ERROR Empty GlobalTag for MC dataset!!!"
5353             # DEBUG DEBUG DEBUG end
5354 
5355             ###
5356 
5357             # DEBUG DEBUG DEBUG
5358             #tmp = self.dbs_check_dataset_spread_old(dataset_name)
5359             # DEBUG DEBUG DEBUG end
5360             sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5361 
5362             # Extract the total event counts.
5363             num_events = {}
5364             for run_number in sites_catalog.keys():
5365                 num_events[run_number] = sites_catalog \
5366                                          [run_number]["all_sites"]
5367                 del sites_catalog[run_number]["all_sites"]
5368 
5369             # Extract the information about whether or not datasets
5370             # are mirrored.
5371             mirror_catalog = {}
5372             for run_number in sites_catalog.keys():
5373                 mirror_catalog[run_number] = sites_catalog \
5374                                              [run_number]["mirrored"]
5375                 del sites_catalog[run_number]["mirrored"]
5376 
5377             # BUG BUG BUG
5378             # I think I could now get rid of that and just fill the
5379             # "sites" entry with the `inverse' of this
5380             # num_events_catalog(?).
5381             #num_sites = self.dbs_resolve_dataset_number_of_sites(dataset_name)
5382             #sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5383             #sites_catalog = dict(zip(num_events_catalog.keys(),
5384             #                         [[j for i in num_events_catalog.values() for j in i.keys()]]))
5385             # BUG BUG BUG end
5386 
5387 ##            # DEBUG DEBUG DEBUG
5388 ##            # This is probably only useful to make sure we don't muck
5389 ##            # things up, right?
5390 ##            # Figure out across how many sites this sample has been spread.
5391 ##            if num_sites == 1:
5392 ##                self.logger.info("    sample is contained at a single site")
5393 ##            else:
5394 ##                self.logger.info("    sample is spread across %d sites" % \
5395 ##                                 num_sites)
5396 ##            if num_sites < 1:
5397 ##                # NOTE: This _should not_ happen with any valid dataset.
5398 ##                self.logger.warning("  --> skipping dataset which is not " \
5399 ##                                    "hosted anywhere")
5400 ##            # DEBUG DEBUG DEBUG end
5401 
5402             # Now put everything in a place where we can find it again
5403             # if we need it.
5404             self.datasets_information[dataset_name] = {}
5405             self.datasets_information[dataset_name]["runs"] = runs
5406             self.datasets_information[dataset_name]["cmssw_version"] = \
5407                                                                      cmssw_version
5408             self.datasets_information[dataset_name]["globaltag"] = globaltag
5409             self.datasets_information[dataset_name]["datatype"] = datatype
5410             self.datasets_information[dataset_name]["num_events"] = num_events
5411             self.datasets_information[dataset_name]["mirrored"] = mirror_catalog
5412             self.datasets_information[dataset_name]["sites"] = sites_catalog
5413 
5414             # Each run of each dataset has a different CASTOR output
5415             # path.
5416             castor_path_common = self.create_castor_path_name_common(dataset_name)
5417             self.logger.info("    output will go into `%s'" % \
5418                              castor_path_common)
5419 
5420             castor_paths = dict(list(zip(runs,
5421                                     [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5422                                      for i in runs])))
5423             for path_name in castor_paths.values():
5424                 self.logger.debug("      %s" % path_name)
5425             self.datasets_information[dataset_name]["castor_path"] = \
5426                                                                    castor_paths
5427 
5428         # End of build_datasets_information.
5429 
5430     ##########
5431 
5432     def show_exit_message(self):
5433         """Tell the user what to do now, after this part is done.
5434 
5435         This should provide the user with some (preferably
5436         copy-pasteable) instructions on what to do now with the setups
5437         and files that have been created.
5438 
5439         """
5440 
5441         # TODO TODO TODO
5442         # This could be improved a bit.
5443         # TODO TODO TODO end
5444 
5445         sep_line = "-" * 60
5446 
5447         self.logger.info("")
5448         self.logger.info(sep_line)
5449         self.logger.info("  Configuration files have been created.")
5450         self.logger.info("  From here on please follow the usual CRAB instructions.")
5451         self.logger.info("  Quick copy-paste instructions are shown below.")
5452         self.logger.info(sep_line)
5453 
5454         self.logger.info("")
5455         self.logger.info("    Create all CRAB jobs:")
5456         self.logger.info("      multicrab -create")
5457         self.logger.info("")
5458         self.logger.info("    Submit all CRAB jobs:")
5459         self.logger.info("      multicrab -submit")
5460         self.logger.info("")
5461         self.logger.info("    Check CRAB status:")
5462         self.logger.info("      multicrab -status")
5463         self.logger.info("")
5464 
5465         self.logger.info("")
5466         self.logger.info("  For more information please see the CMS Twiki:")
5467         self.logger.info("    %s" % twiki_url)
5468         self.logger.info(sep_line)
5469 
5470         # If there were any jobs for which we could not find a
5471         # matching site show a warning message about that.
5472         if not self.all_sites_found:
5473             self.logger.warning("  For some of the jobs no matching " \
5474                                 "site could be found")
5475             self.logger.warning("  --> please scan your multicrab.cfg" \
5476                                 "for occurrences of `%s'." % \
5477                                 self.no_matching_site_found_str)
5478             self.logger.warning("      You will have to fix those " \
5479                                 "by hand, sorry.")
5480 
5481         # End of show_exit_message.
5482 
5483     ##########
5484 
5485     def run(self):
5486         "Main entry point of the CMS harvester."
5487 
5488         # Start with a positive thought.
5489         exit_code = 0
5490 
5491         try:
5492 
5493             try:
5494 
5495                 # Parse all command line options and arguments
5496                 self.parse_cmd_line_options()
5497                 # and check that they make sense.
5498                 self.check_input_status()
5499 
5500                 # Check if CMSSW is setup.
5501                 self.check_cmssw()
5502 
5503                 # Check if DBS is setup,
5504                 self.check_dbs()
5505                 # and if all is fine setup the Python side.
5506                 self.setup_dbs()
5507 
5508                 # Fill our dictionary with all the required info we
5509                 # need to understand harvesting jobs. This needs to be
5510                 # done after the CMSSW version is known.
5511                 self.setup_harvesting_info()
5512 
5513                 # Obtain list of dataset names to consider
5514                 self.build_dataset_use_list()
5515                 # and the list of dataset names to ignore.
5516                 self.build_dataset_ignore_list()
5517 
5518                 # The same for the runs lists (if specified).
5519                 self.build_runs_use_list()
5520                 self.build_runs_ignore_list()
5521 
5522                 # Process the list of datasets to ignore and fold that
5523                 # into the list of datasets to consider.
5524                 # NOTE: The run-based selection is done later since
5525                 # right now we don't know yet which runs a dataset
5526                 # contains.
5527                 self.process_dataset_ignore_list()
5528 
5529                 # Obtain all required information on the datasets,
5530                 # like run numbers and GlobalTags.
5531                 self.build_datasets_information()
5532 
5533                 if self.use_ref_hists and \
5534                        self.ref_hist_mappings_needed():
5535                     # Load the dataset name to reference histogram
5536                     # name mappings from file.
5537                     self.load_ref_hist_mappings()
5538                     # Now make sure that for all datasets we want to
5539                     # process there is a reference defined. Otherwise
5540                     # just bomb out before wasting any more time.
5541                     self.check_ref_hist_mappings()
5542                 else:
5543                     self.logger.info("No need to load reference " \
5544                                      "histogram mappings file")
5545 
5546                 # OBSOLETE OBSOLETE OBSOLETE
5547 ##                # TODO TODO TODO
5548 ##                # Need to think about where this should go, but
5549 ##                # somewhere we have to move over the fact that we want
5550 ##                # to process all runs for each dataset that we're
5551 ##                # considering. This basically means copying over the
5552 ##                # information from self.datasets_information[]["runs"]
5553 ##                # to self.datasets_to_use[].
5554 ##                for dataset_name in self.datasets_to_use.keys():
5555 ##                    self.datasets_to_use[dataset_name] = self.datasets_information[dataset_name]["runs"]
5556 ##                # TODO TODO TODO end
5557                 # OBSOLETE OBSOLETE OBSOLETE end
5558 
5559                 self.process_runs_use_and_ignore_lists()
5560 
5561                 # If we've been asked to sacrifice some parts of
5562                 # spread-out samples in order to be able to partially
5563                 # harvest them, we'll do that here.
5564                 if self.harvesting_mode == "single-step-allow-partial":
5565                     self.singlify_datasets()
5566 
5567                 # Check dataset name(s)
5568                 self.check_dataset_list()
5569                 # and see if there is anything left to do.
5570                 if len(self.datasets_to_use) < 1:
5571                     self.logger.info("After all checks etc. " \
5572                                      "there are no datasets (left?) " \
5573                                      "to process")
5574                 else:
5575 
5576                     self.logger.info("After all checks etc. we are left " \
5577                                      "with %d dataset(s) to process " \
5578                                      "for a total of %d runs" % \
5579                                      (len(self.datasets_to_use),
5580                                       sum([len(i) for i in \
5581                                            self.datasets_to_use.values()])))
5582 
5583                     # NOTE: The order in which things are done here is
5584                     # important. At the end of the job, independent on
5585                     # how it ends (exception, CTRL-C, normal end) the
5586                     # book keeping is written to file. At that time it
5587                     # should be clear which jobs are done and can be
5588                     # submitted. This means we first create the
5589                     # general files, and then the per-job config
5590                     # files.
5591 
5592                     # TODO TODO TODO
5593                     # It would be good to modify the book keeping a
5594                     # bit. Now we write the crab.cfg (which is the
5595                     # same for all samples and runs) and the
5596                     # multicrab.cfg (which contains blocks for all
5597                     # runs of all samples) without updating our book
5598                     # keeping. The only place we update the book
5599                     # keeping is after writing the harvesting config
5600                     # file for a given dataset. Since there is only
5601                     # one single harvesting configuration for each
5602                     # dataset, we have no book keeping information on
5603                     # a per-run basis.
5604                     # TODO TODO TODO end
5605 
5606                     # Check if the CASTOR output area exists. If
5607                     # necessary create it.
5608                     self.create_and_check_castor_dirs()
5609 
5610                     # Create one crab and one multicrab configuration
5611                     # for all jobs together.
5612                     self.write_crab_config()
5613                     self.write_multicrab_config()
5614 
5615                     # Loop over all datasets and create harvesting
5616                     # config files for all of them. One harvesting
5617                     # config per dataset is enough. The same file will
5618                     # be re-used by CRAB for each run.
5619                     # NOTE: We always need a harvesting
5620                     # configuration. For the two-step harvesting we
5621                     # also need a configuration file for the first
5622                     # step: the monitoring element extraction.
5623                     for dataset_name in self.datasets_to_use.keys():
5624                         try:
5625                             self.write_harvesting_config(dataset_name)
5626                             if self.harvesting_mode == "two-step":
5627                                 self.write_me_extraction_config(dataset_name)
5628                         except:
5629                             # Doh! Just re-raise the damn thing.
5630                             raise
5631                         else:
5632 ##                            tmp = self.datasets_information[dataset_name] \
5633 ##                                  ["num_events"]
5634                             tmp = {}
5635                             for run_number in self.datasets_to_use[dataset_name]:
5636                                 tmp[run_number] = self.datasets_information \
5637                                                   [dataset_name]["num_events"][run_number]
5638                             if dataset_name in self.book_keeping_information:
5639                                 self.book_keeping_information[dataset_name].update(tmp)
5640                             else:
5641                                 self.book_keeping_information[dataset_name] = tmp
5642 
5643                     # Explain the user what to do now.
5644                     self.show_exit_message()
5645 
5646             except Usage as err:
5647                 # self.logger.fatal(err.msg)
5648                 # self.option_parser.print_help()
5649                 pass
5650 
5651             except Error as err:
5652                 # self.logger.fatal(err.msg)
5653                 exit_code = 1
5654 
5655             except Exception as err:
5656                 # Hmmm, ignore keyboard interrupts from the
5657                 # user. These are not a `serious problem'. We also
5658                 # skip SystemExit, which is the exception thrown when
5659                 # one calls sys.exit(). This, for example, is done by
5660                 # the option parser after calling print_help(). We
5661                 # also have to catch all `no such option'
5662                 # complaints. Everything else we catch here is a
5663                 # `serious problem'.
5664                 if isinstance(err, SystemExit):
5665                     self.logger.fatal(err.code)
5666                 elif not isinstance(err, KeyboardInterrupt):
5667                     self.logger.fatal("!" * 50)
5668                     self.logger.fatal("  This looks like a serious problem.")
5669                     self.logger.fatal("  If you are sure you followed all " \
5670                                       "instructions")
5671                     self.logger.fatal("  please copy the below stack trace together")
5672                     self.logger.fatal("  with a description of what you were doing to")
5673                     self.logger.fatal("  jeroen.hegeman@cern.ch.")
5674                     self.logger.fatal("  %s" % self.ident_string())
5675                     self.logger.fatal("!" * 50)
5676                     self.logger.fatal(str(err))
5677                     import traceback
5678                     traceback_string = traceback.format_exc()
5679                     for line in traceback_string.split("\n"):
5680                         self.logger.fatal(line)
5681                     self.logger.fatal("!" * 50)
5682                     exit_code = 2
5683 
5684         # This is the stuff that we should really do, no matter
5685         # what. Of course cleaning up after ourselves is also done
5686         # from this place.  This alsokeeps track of the book keeping
5687         # so far. (This means that if half of the configuration files
5688         # were created before e.g. the disk was full, we should still
5689         # have a consistent book keeping file.
5690         finally:
5691 
5692             self.cleanup()
5693 
5694         ###
5695 
5696         if self.crab_submission == True:
5697             os.system("multicrab -create")
5698             os.system("multicrab -submit")
5699 
5700         # End of run.
5701         return exit_code
5702 
5703     # End of CMSHarvester.
5704 
5705 ###########################################################################
5706 ## Main entry point.
5707 ###########################################################################
5708 
5709 if __name__ == "__main__":
5710     "Main entry point for harvesting."
5711 
5712     CMSHarvester().run()
5713 
5714     # Done.
5715 
5716 ###########################################################################