Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2021-10-29 04:41:17

0001 #!/usr/bin/env python3
0002 
0003 ###########################################################################
0004 ## File       : cmsHarvest.py
0005 ## Authors    : Jeroen Hegeman (jeroen.hegeman@cern.ch)
0006 ##              Niklas Pietsch (niklas.pietsch@desy.de)
0007 ##              Franseco Costanza (francesco.costanza@desy.de)
0008 ## Last change: 20100308
0009 ##
0010 ## Purpose    : Main program to run all kinds of harvesting.
0011 ##              For more information please refer to the CMS Twiki url
0012 ##              mentioned just below here.
0013 ###########################################################################
0014 
0015 """Main program to run all kinds of harvesting.
0016 
0017 These are the basic kinds of harvesting implemented (contact me if
0018 your favourite is missing):
0019 
0020 - RelVal : Run for release validation samples. Makes heavy use of MC
0021            truth information.
0022 
0023 - RelValFS: FastSim RelVal.
0024 
0025 - MC : Run for MC samples.
0026 
0027 - DQMOffline : Run for real data (could also be run for MC).
0028 
0029 For the mappings of these harvesting types to sequence names please
0030 see the setup_harvesting_info() and option_handler_list_types()
0031 methods.
0032 
0033 """
0034 from __future__ import print_function
0035 
0036 ###########################################################################
0037 
0038 from builtins import range
0039 __version__ = "3.8.2p1" # (version jump to match release)
0040 __author__ = "Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
0041              "Niklas Pietsch (niklas.pietsch@desy.de)"
0042 
0043 twiki_url = "https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
0044 
0045 ###########################################################################
0046 
0047 ###########################################################################
0048 ## TODO list
0049 ##
0050 ## !!! Some code refactoring is in order. A lot of the code that loads
0051 ## and builds dataset and run lists is duplicated. !!!
0052 ##
0053 ## - SPECIAL (future):
0054 ##   After discussing all these harvesting issues yet again with Luca,
0055 ##   it looks like we'll need something special to handle harvesting
0056 ##   for (collisions) data in reprocessing. Stuff with a special DBS
0057 ##   instance in which `rolling' reports of reprocessed datasets is
0058 ##   publised. In this case we will have to check (w.r.t. the parent
0059 ##   dataset) how much of a given run is ready, and submit once we're
0060 ##   satisfied (let's say 90%).
0061 ##
0062 ## - We could get rid of most of the `and dataset.status = VALID'
0063 ##   pieces in the DBS queries.
0064 ## - Change to a more efficient grid scheduler.
0065 ## - Implement incremental harvesting. Requires some changes to the
0066 ##   book keeping to store the harvested number of events for each
0067 ##   run. Also requires some changes to the dataset checking to see if
0068 ##   additional statistics have become available.
0069 ## - Emphasize the warnings in case we're running in force
0070 ##   mode. Otherwise they may get lost a bit in the output.
0071 ## - Fix the creation of the CASTOR dirs. The current approach works
0072 ##   but is a) too complicated and b) too inefficient.
0073 ## - Fully implement all harvesting types.
0074 ##   --> Discuss with Andreas what exactly should be in there. And be
0075 ##       careful with the run numbers!
0076 ## - Add the (second-step) harvesting config file to the (first-step)
0077 ##   ME extraction job to make sure it does not get lost.
0078 ## - Improve sanity checks on harvesting type vs. data type.
0079 ## - Implement reference histograms.
0080 ##   1) User-specified reference dataset.
0081 ##   2) Educated guess based on dataset name.
0082 ##   3) References from GlobalTag.
0083 ##   4) No reference at all.
0084 ## - Is this options.evt_type used anywhere?
0085 ## - Combine all these dbs_resolve_xxx into a single call to DBS(?).
0086 ## - Implement CRAB server use?
0087 ## - Add implementation of email address of user. (Only necessary for
0088 ##   CRAB server.)
0089 ###########################################################################
0090 
0091 import os
0092 import sys
0093 import subprocess
0094 import re
0095 import logging
0096 import optparse
0097 import datetime
0098 import copy
0099 from inspect import getargspec
0100 from random import choice
0101 
0102 
0103 # These we need to communicate with DBS global DBSAPI
0104 from DBSAPI.dbsApi import DbsApi
0105 import DBSAPI.dbsException
0106 import DBSAPI.dbsApiException
0107 from functools import reduce
0108 # and these we need to parse the DBS output.
0109 global xml
0110 global SAXParseException
0111 import xml.sax
0112 from xml.sax import SAXParseException
0113 
0114 import Configuration.PyReleaseValidation
0115 from Configuration.PyReleaseValidation.ConfigBuilder import \
0116      ConfigBuilder, defaultOptions
0117 # from Configuration.PyReleaseValidation.cmsDriverOptions import options, python_config_filename
0118 
0119 #import FWCore.ParameterSet.Config as cms
0120 
0121 # Debugging stuff.
0122 import pdb
0123 try:
0124     import debug_hook
0125 except ImportError:
0126     pass
0127 
0128 ###########################################################################
0129 ## Helper class: Usage exception.
0130 ###########################################################################
0131 class Usage(Exception):
0132     def __init__(self, msg):
0133         self.msg = msg
0134     def __str__(self):
0135         return repr(self.msg)
0136 
0137     # End of Usage.
0138 
0139 ###########################################################################
0140 ## Helper class: Error exception.
0141 ###########################################################################
0142 class Error(Exception):
0143     def __init__(self, msg):
0144         self.msg = msg
0145     def __str__(self):
0146         return repr(self.msg)
0147 
0148 ###########################################################################
0149 ## Helper class: CMSHarvesterHelpFormatter.
0150 ###########################################################################
0151 
0152 class CMSHarvesterHelpFormatter(optparse.IndentedHelpFormatter):
0153     """Helper class to add some customised help output to cmsHarvester.
0154 
0155     We want to add some instructions, as well as a pointer to the CMS
0156     Twiki.
0157 
0158     """
0159 
0160     def format_usage(self, usage):
0161 
0162         usage_lines = []
0163 
0164         sep_line = "-" * 60
0165         usage_lines.append(sep_line)
0166         usage_lines.append("Welcome to the CMS harvester, a (hopefully useful)")
0167         usage_lines.append("tool to create harvesting configurations.")
0168         usage_lines.append("For more information please have a look at the CMS Twiki:")
0169         usage_lines.append("  %s" % twiki_url)
0170         usage_lines.append(sep_line)
0171         usage_lines.append("")
0172 
0173         # Since we only add to the output, we now just append the
0174         # original output from IndentedHelpFormatter.
0175         usage_lines.append(optparse.IndentedHelpFormatter. \
0176                            format_usage(self, usage))
0177 
0178         formatted_usage = "\n".join(usage_lines)
0179         return formatted_usage
0180 
0181     # End of CMSHarvesterHelpFormatter.
0182 
0183 ###########################################################################
0184 ## Helper class: DBSXMLHandler.
0185 ###########################################################################
0186 
0187 class DBSXMLHandler(xml.sax.handler.ContentHandler):
0188     """XML handler class to parse DBS results.
0189 
0190     The tricky thing here is that older DBS versions (2.0.5 and
0191     earlier) return results in a different XML format than newer
0192     versions. Previously the result values were returned as attributes
0193     to the `result' element. The new approach returns result values as
0194     contents of named elements.
0195 
0196     The old approach is handled directly in startElement(), the new
0197     approach in characters().
0198 
0199     NOTE: All results are returned in the form of string values of
0200           course!
0201 
0202     """
0203 
0204     # This is the required mapping from the name of the variable we
0205     # ask for to what we call it ourselves. (Effectively this is the
0206     # mapping between the old attribute key name and the new element
0207     # name.)
0208     mapping = {
0209         "dataset"        : "PATH",
0210         "dataset.tag"    : "PROCESSEDDATASET_GLOBALTAG",
0211         "datatype.type"  : "PRIMARYDSTYPE_TYPE",
0212         "run"            : "RUNS_RUNNUMBER",
0213         "run.number"     : "RUNS_RUNNUMBER",
0214         "file.name"      : "FILES_LOGICALFILENAME",
0215         "file.numevents" : "FILES_NUMBEROFEVENTS",
0216         "algo.version"   : "APPVERSION_VERSION",
0217         "site"           : "STORAGEELEMENT_SENAME",
0218         }
0219 
0220     def __init__(self, tag_names):
0221         # This is a list used as stack to keep track of where we are
0222         # in the element tree.
0223         self.element_position = []
0224         self.tag_names = tag_names
0225         self.results = {}
0226 
0227     def startElement(self, name, attrs):
0228         self.element_position.append(name)
0229 
0230         self.current_value = []
0231 
0232         #----------
0233 
0234         # This is to catch results from DBS 2.0.5 and earlier.
0235         if name == "result":
0236             for name in self.tag_names:
0237                 key = DBSXMLHandler.mapping[name]
0238                 value = str(attrs[key])
0239                 try:
0240                     self.results[name].append(value)
0241                 except KeyError:
0242                     self.results[name] = [value]
0243 
0244         #----------
0245 
0246     def endElement(self, name):
0247         assert self.current_element() == name, \
0248                "closing unopenend element `%s'" % name
0249 
0250         if self.current_element() in self.tag_names:
0251             contents = "".join(self.current_value)
0252             if self.current_element() in self.results:
0253                 self.results[self.current_element()].append(contents)
0254             else:
0255                 self.results[self.current_element()] = [contents]
0256 
0257         self.element_position.pop()
0258 
0259     def characters(self, content):
0260         # NOTE: It is possible that the actual contents of the tag
0261         # gets split into multiple pieces. This method will be called
0262         # for each of the pieces. This means we have to concatenate
0263         # everything ourselves.
0264         if self.current_element() in self.tag_names:
0265             self.current_value.append(content)
0266 
0267     def current_element(self):
0268         return self.element_position[-1]
0269 
0270     def check_results_validity(self):
0271         """Make sure that all results arrays have equal length.
0272 
0273         We should have received complete rows from DBS. I.e. all
0274         results arrays in the handler should be of equal length.
0275 
0276         """
0277 
0278         results_valid = True
0279 
0280         res_names = self.results.keys()
0281         if len(res_names) > 1:
0282             for res_name in res_names[1:]:
0283                 res_tmp = self.results[res_name]
0284                 if len(res_tmp) != len(self.results[res_names[0]]):
0285                     results_valid = False
0286 
0287         return results_valid
0288 
0289     # End of DBSXMLHandler.
0290 
0291 ###########################################################################
0292 ## CMSHarvester class.
0293 ###########################################################################
0294 
0295 class CMSHarvester(object):
0296     """Class to perform CMS harvesting.
0297 
0298     More documentation `obviously' to follow.
0299 
0300     """
0301 
0302     ##########
0303 
0304     def __init__(self, cmd_line_opts=None):
0305         "Initialize class and process command line options."
0306 
0307         self.version = __version__
0308 
0309         # These are the harvesting types allowed. See the head of this
0310         # file for more information.
0311         self.harvesting_types = [
0312             "RelVal",
0313             "RelValFS",
0314             "MC",
0315             "DQMOffline",
0316             ]
0317 
0318         # These are the possible harvesting modes:
0319         #   - Single-step: harvesting takes place on-site in a single
0320         #   step. For each samples only a single ROOT file containing
0321         #   the harvesting results is returned.
0322         #   - Single-step-allow-partial: special hack to allow
0323         #   harvesting of partial statistics using single-step
0324         #   harvesting on spread-out samples.
0325         #   - Two-step: harvesting takes place in two steps. The first
0326         #   step returns a series of monitoring elenent summaries for
0327         #   each sample. The second step then merges these summaries
0328         #   locally and does the real harvesting. This second step
0329         #   produces the ROOT file containing the harvesting results.
0330         self.harvesting_modes = [
0331             "single-step",
0332             "single-step-allow-partial",
0333             "two-step"
0334             ]
0335 
0336         # It is possible to specify a GlobalTag that will override any
0337         # choices (regarding GlobalTags) made by the cmsHarvester.
0338         # BUG BUG BUG
0339         # For the moment, until I figure out a way to obtain the
0340         # GlobalTag with which a given data (!) dataset was created,
0341         # it is necessary to specify a GlobalTag for harvesting of
0342         # data.
0343         # BUG BUG BUG end
0344         self.globaltag = None
0345 
0346         # It's also possible to switch off the use of reference
0347         # histograms altogether.
0348         self.use_ref_hists = True
0349 
0350         # The database name and account are hard-coded. They are not
0351         # likely to change before the end-of-life of this tool. But of
0352         # course there is a way to override this from the command
0353         # line. One can even override the Frontier connection used for
0354         # the GlobalTag and for the reference histograms
0355         # independently. Please only use this for testing purposes.
0356         self.frontier_connection_name = {}
0357         self.frontier_connection_name["globaltag"] = "frontier://" \
0358                                                      "FrontierProd/"
0359         self.frontier_connection_name["refhists"] = "frontier://" \
0360                                                     "FrontierProd/"
0361         self.frontier_connection_overridden = {}
0362         for key in self.frontier_connection_name.keys():
0363             self.frontier_connection_overridden[key] = False
0364 
0365         # This contains information specific to each of the harvesting
0366         # types. Used to create the harvesting configuration. It is
0367         # filled by setup_harvesting_info().
0368         self.harvesting_info = None
0369 
0370         ###
0371 
0372         # These are default `unused' values that will be filled in
0373         # depending on the command line options.
0374 
0375         # The type of harvesting we're doing. See
0376         # self.harvesting_types for allowed types.
0377         self.harvesting_type = None
0378 
0379         # The harvesting mode, popularly known as single-step
0380         # vs. two-step. The thing to remember at this point is that
0381         # single-step is only possible for samples located completely
0382         # at a single site (i.e. SE).
0383         self.harvesting_mode = None
0384         # BUG BUG BUG
0385         # Default temporarily set to two-step until we can get staged
0386         # jobs working with CRAB.
0387         self.harvesting_mode_default = "single-step"
0388         # BUG BUG BUG end
0389 
0390         # The input method: are we reading a dataset name (or regexp)
0391         # directly from the command line or are we reading a file
0392         # containing a list of dataset specifications. Actually we
0393         # keep one of each for both datasets and runs.
0394         self.input_method = {}
0395         self.input_method["datasets"] = {}
0396         self.input_method["datasets"]["use"] = None
0397         self.input_method["datasets"]["ignore"] = None
0398         self.input_method["runs"] = {}
0399         self.input_method["runs"]["use"] = None
0400         self.input_method["runs"]["ignore"] = None
0401         self.input_method["runs"]["ignore"] = None
0402         # The name of whatever input we're using.
0403         self.input_name = {}
0404         self.input_name["datasets"] = {}
0405         self.input_name["datasets"]["use"] = None
0406         self.input_name["datasets"]["ignore"] = None
0407         self.input_name["runs"] = {}
0408         self.input_name["runs"]["use"] = None
0409         self.input_name["runs"]["ignore"] = None
0410 
0411         self.Jsonlumi = False
0412         self.Jsonfilename = "YourJSON.txt"
0413         self.Jsonrunfilename = "YourJSON.txt"
0414         self.todofile = "YourToDofile.txt"
0415 
0416         # If this is true, we're running in `force mode'. In this case
0417         # the sanity checks are performed but failure will not halt
0418         # everything.
0419         self.force_running = None
0420 
0421         # The base path of the output dir in CASTOR.
0422         self.castor_base_dir = None
0423         self.castor_base_dir_default = "/castor/cern.ch/" \
0424                                        "cms/store/temp/" \
0425                                        "dqm/offline/harvesting_output/"
0426 
0427         # The name of the file to be used for book keeping: which
0428         # datasets, runs, etc. we have already processed.
0429         self.book_keeping_file_name = None
0430         self.book_keeping_file_name_default = "harvesting_accounting.txt"
0431 
0432         # The dataset name to reference histogram name mapping is read
0433         # from a text file. The name of this file is kept in the
0434         # following variable.
0435         self.ref_hist_mappings_file_name = None
0436         # And this is the default value.
0437         self.ref_hist_mappings_file_name_default = "harvesting_ref_hist_mappings.txt"
0438 
0439         # Hmmm, hard-coded prefix of the CERN CASTOR area. This is the
0440         # only supported CASTOR area.
0441         # NOTE: Make sure this one starts with a `/'.
0442         self.castor_prefix = "/castor/cern.ch"
0443 
0444         # Normally the central harvesting should be done using the
0445         # `t1access' grid role. To be able to run without T1 access
0446         # the --no-t1access flag can be used. This variable keeps
0447         # track of that special mode.
0448         self.non_t1access = False
0449         self.caf_access = False
0450         self.saveByLumiSection = False
0451         self.crab_submission = False
0452         self.nr_max_sites = 1
0453 
0454         self.preferred_site = "no preference"
0455 
0456         # This will become the list of datasets and runs to consider
0457         self.datasets_to_use = {}
0458         # and this will become the list of datasets and runs to skip.
0459         self.datasets_to_ignore = {}
0460         # This, in turn, will hold all book keeping information.
0461         self.book_keeping_information = {}
0462         # And this is where the dataset name to reference histogram
0463         # name mapping is stored.
0464         self.ref_hist_mappings = {}
0465 
0466         # We're now also allowing run selection. This means we also
0467         # have to keep list of runs requested and vetoed by the user.
0468         self.runs_to_use = {}
0469         self.runs_to_ignore = {}
0470 
0471         # Cache for CMSSW version availability at different sites.
0472         self.sites_and_versions_cache = {}
0473 
0474         # Cache for checked GlobalTags.
0475         self.globaltag_check_cache = []
0476 
0477         # Global flag to see if there were any jobs for which we could
0478         # not find a matching site.
0479         self.all_sites_found = True
0480 
0481         # Helper string centrally defined.
0482         self.no_matching_site_found_str = "no_matching_site_found"
0483 
0484         # Store command line options for later use.
0485         if cmd_line_opts is None:
0486             cmd_line_opts = sys.argv[1:]
0487         self.cmd_line_opts = cmd_line_opts
0488 
0489         # Set up the logger.
0490         log_handler = logging.StreamHandler()
0491         # This is the default log formatter, the debug option switches
0492         # on some more information.
0493         log_formatter = logging.Formatter("%(message)s")
0494         log_handler.setFormatter(log_formatter)
0495         logger = logging.getLogger()
0496         logger.name = "main"
0497         logger.addHandler(log_handler)
0498         self.logger = logger
0499         # The default output mode is quite verbose.
0500         self.set_output_level("NORMAL")
0501 
0502         #logger.debug("Initialized successfully")
0503 
0504         # End of __init__.
0505 
0506     ##########
0507 
0508     def cleanup(self):
0509         "Clean up after ourselves."
0510 
0511         # NOTE: This is the safe replacement of __del__.
0512 
0513         #self.logger.debug("All done -> shutting down")
0514         logging.shutdown()
0515 
0516         # End of cleanup.
0517 
0518     ##########
0519 
0520     def time_stamp(self):
0521         "Create a timestamp to use in the created config files."
0522 
0523         time_now = datetime.datetime.utcnow()
0524         # We don't care about the microseconds.
0525         time_now = time_now.replace(microsecond = 0)
0526         time_stamp = "%sUTC" % datetime.datetime.isoformat(time_now)
0527 
0528         # End of time_stamp.
0529         return time_stamp
0530 
0531     ##########
0532 
0533     def ident_string(self):
0534         "Spit out an identification string for cmsHarvester.py."
0535 
0536         ident_str = "`cmsHarvester.py " \
0537                     "version %s': cmsHarvester.py %s" % \
0538                     (__version__,
0539                      reduce(lambda x, y: x+' '+y, sys.argv[1:]))
0540 
0541         return ident_str
0542 
0543     ##########
0544 
0545     def format_conditions_string(self, globaltag):
0546         """Create the conditions string needed for `cmsDriver'.
0547 
0548         Just glueing the FrontierConditions bit in front of it really.
0549 
0550         """
0551 
0552         # Not very robust but okay. The idea is that if the user
0553         # specified (since this cannot happen with GlobalTags coming
0554         # from DBS) something containing `conditions', they probably
0555         # know what they're doing and we should not muck things up. In
0556         # all other cases we just assume we only received the
0557         # GlobalTag part and we built the usual conditions string from
0558         # that .
0559         if globaltag.lower().find("conditions") > -1:
0560             conditions_string = globaltag
0561         else:
0562             conditions_string = "FrontierConditions_GlobalTag,%s" % \
0563                                 globaltag
0564 
0565         # End of format_conditions_string.
0566         return conditions_string
0567 
0568     ##########
0569 
0570     def db_account_name_cms_cond_globaltag(self):
0571         """Return the database account name used to store the GlobalTag.
0572 
0573         The name of the database account depends (albeit weakly) on
0574         the CMSSW release version.
0575 
0576         """
0577 
0578         # This never changed, unlike the cms_cond_31X_DQM_SUMMARY ->
0579         # cms_cond_34X_DQM transition.
0580         account_name = "CMS_COND_31X_GLOBALTAG"
0581 
0582         # End of db_account_name_cms_cond_globaltag.
0583         return account_name
0584 
0585     ##########
0586 
0587     def db_account_name_cms_cond_dqm_summary(self):
0588         """See db_account_name_cms_cond_globaltag."""
0589 
0590         account_name = None
0591         version = self.cmssw_version[6:11]
0592         if version < "3_4_0":
0593             account_name = "CMS_COND_31X_DQM_SUMMARY"
0594         else:
0595             account_name = "CMS_COND_34X"
0596 
0597         # End of db_account_name_cms_cond_dqm_summary.
0598         return account_name
0599 
0600     ##########
0601 
0602     def config_file_header(self):
0603         "Create a nice header to be used to mark the generated files."
0604 
0605         tmp = []
0606 
0607         time_stamp = self.time_stamp()
0608         ident_str = self.ident_string()
0609         tmp.append("# %s" % time_stamp)
0610         tmp.append("# WARNING: This file was created automatically!")
0611         tmp.append("")
0612         tmp.append("# Created by %s" % ident_str)
0613 
0614         header = "\n".join(tmp)
0615 
0616         # End of config_file_header.
0617         return header
0618 
0619     ##########
0620 
0621     def set_output_level(self, output_level):
0622         """Adjust the level of output generated.
0623 
0624         Choices are:
0625           - normal  : default level of output
0626           - quiet   : less output than the default
0627           - verbose : some additional information
0628           - debug   : lots more information, may be overwhelming
0629 
0630         NOTE: The debug option is a bit special in the sense that it
0631               also modifies the output format.
0632 
0633         """
0634 
0635         # NOTE: These levels are hooked up to the ones used in the
0636         #       logging module.
0637         output_levels = {
0638             "NORMAL"  : logging.INFO,
0639             "QUIET"   : logging.WARNING,
0640             "VERBOSE" : logging.INFO,
0641             "DEBUG"   : logging.DEBUG
0642             }
0643 
0644         output_level = output_level.upper()
0645 
0646         try:
0647             # Update the logger.
0648             self.log_level = output_levels[output_level]
0649             self.logger.setLevel(self.log_level)
0650         except KeyError:
0651             # Show a complaint
0652             self.logger.fatal("Unknown output level `%s'" % ouput_level)
0653             # and re-raise an exception.
0654             raise Exception
0655 
0656         # End of set_output_level.
0657 
0658     ##########
0659 
0660     def option_handler_debug(self, option, opt_str, value, parser):
0661         """Switch to debug mode.
0662 
0663         This both increases the amount of output generated, as well as
0664         changes the format used (more detailed information is given).
0665 
0666         """
0667 
0668         # Switch to a more informative log formatter for debugging.
0669         log_formatter_debug = logging.Formatter("[%(levelname)s] " \
0670                                                 # NOTE: funcName was
0671                                                 # only implemented
0672                                                 # starting with python
0673                                                 # 2.5.
0674                                                 #"%(funcName)s() " \
0675                                                 #"@%(filename)s:%(lineno)d " \
0676                                                 "%(message)s")
0677         # Hmmm, not very nice. This assumes there's only a single
0678         # handler associated with the current logger.
0679         log_handler = self.logger.handlers[0]
0680         log_handler.setFormatter(log_formatter_debug)
0681         self.set_output_level("DEBUG")
0682 
0683         # End of option_handler_debug.
0684 
0685     ##########
0686 
0687     def option_handler_quiet(self, option, opt_str, value, parser):
0688         "Switch to quiet mode: less verbose."
0689 
0690         self.set_output_level("QUIET")
0691 
0692         # End of option_handler_quiet.
0693 
0694     ##########
0695 
0696     def option_handler_force(self, option, opt_str, value, parser):
0697         """Switch on `force mode' in which case we don't brake for nobody.
0698 
0699         In so-called `force mode' all sanity checks are performed but
0700         we don't halt on failure. Of course this requires some care
0701         from the user.
0702 
0703         """
0704 
0705         self.logger.debug("Switching on `force mode'.")
0706         self.force_running = True
0707 
0708         # End of option_handler_force.
0709 
0710     ##########
0711 
0712     def option_handler_harvesting_type(self, option, opt_str, value, parser):
0713         """Set the harvesting type to be used.
0714 
0715         This checks that no harvesting type is already set, and sets
0716         the harvesting type to be used to the one specified. If a
0717         harvesting type is already set an exception is thrown. The
0718         same happens when an unknown type is specified.
0719 
0720         """
0721 
0722         # Check for (in)valid harvesting types.
0723         # NOTE: The matching is done in a bit of a complicated
0724         # way. This allows the specification of the type to be
0725         # case-insensitive while still ending up with the properly
0726         # `cased' version afterwards.
0727         value = value.lower()
0728         harvesting_types_lowered = [i.lower() for i in self.harvesting_types]
0729         try:
0730             type_index = harvesting_types_lowered.index(value)
0731             # If this works, we now have the index to the `properly
0732             # cased' version of the harvesting type.
0733         except ValueError:
0734             self.logger.fatal("Unknown harvesting type `%s'" % \
0735                               value)
0736             self.logger.fatal("  possible types are: %s" %
0737                               ", ".join(self.harvesting_types))
0738             raise Usage("Unknown harvesting type `%s'" % \
0739                         value)
0740 
0741         # Check if multiple (by definition conflicting) harvesting
0742         # types are being specified.
0743         if not self.harvesting_type is None:
0744             msg = "Only one harvesting type should be specified"
0745             self.logger.fatal(msg)
0746             raise Usage(msg)
0747         self.harvesting_type = self.harvesting_types[type_index]
0748 
0749         self.logger.info("Harvesting type to be used: `%s'" % \
0750                          self.harvesting_type)
0751 
0752         # End of option_handler_harvesting_type.
0753 
0754     ##########
0755 
0756     def option_handler_harvesting_mode(self, option, opt_str, value, parser):
0757         """Set the harvesting mode to be used.
0758 
0759         Single-step harvesting can be used for samples that are
0760         located completely at a single site (= SE). Otherwise use
0761         two-step mode.
0762 
0763         """
0764 
0765         # Check for valid mode.
0766         harvesting_mode = value.lower()
0767         if not harvesting_mode in self.harvesting_modes:
0768             msg = "Unknown harvesting mode `%s'" % harvesting_mode
0769             self.logger.fatal(msg)
0770             self.logger.fatal("  possible modes are: %s" % \
0771                               ", ".join(self.harvesting_modes))
0772             raise Usage(msg)
0773 
0774         # Check if we've been given only a single mode, otherwise
0775         # complain.
0776         if not self.harvesting_mode is None:
0777             msg = "Only one harvesting mode should be specified"
0778             self.logger.fatal(msg)
0779             raise Usage(msg)
0780         self.harvesting_mode = harvesting_mode
0781 
0782         self.logger.info("Harvesting mode to be used: `%s'" % \
0783                          self.harvesting_mode)
0784 
0785         # End of option_handler_harvesting_mode.
0786 
0787     ##########
0788 
0789     def option_handler_globaltag(self, option, opt_str, value, parser):
0790         """Set the GlobalTag to be used, overriding our own choices.
0791 
0792         By default the cmsHarvester will use the GlobalTag with which
0793         a given dataset was created also for the harvesting. The
0794         --globaltag option is the way to override this behaviour.
0795 
0796         """
0797 
0798         # Make sure that this flag only occurred once.
0799         if not self.globaltag is None:
0800             msg = "Only one GlobalTag should be specified"
0801             self.logger.fatal(msg)
0802             raise Usage(msg)
0803         self.globaltag = value
0804 
0805         self.logger.info("GlobalTag to be used: `%s'" % \
0806                          self.globaltag)
0807 
0808         # End of option_handler_globaltag.
0809 
0810     ##########
0811 
0812     def option_handler_no_ref_hists(self, option, opt_str, value, parser):
0813         "Switch use of all reference histograms off."
0814 
0815         self.use_ref_hists = False
0816 
0817         self.logger.warning("Switching off all use of reference histograms")
0818 
0819         # End of option_handler_no_ref_hists.
0820 
0821      ##########
0822 
0823     def option_handler_frontier_connection(self, option, opt_str,
0824                                            value, parser):
0825         """Override the default Frontier connection string.
0826 
0827         Please only use this for testing (e.g. when a test payload has
0828         been inserted into cms_orc_off instead of cms_orc_on).
0829 
0830         This method gets called for three different command line
0831         options:
0832         - --frontier-connection,
0833         - --frontier-connection-for-globaltag,
0834         - --frontier-connection-for-refhists.
0835         Appropriate care has to be taken to make sure things are only
0836         specified once.
0837 
0838         """
0839 
0840         # Figure out with which command line option we've been called.
0841         frontier_type = opt_str.split("-")[-1]
0842         if frontier_type == "connection":
0843             # Main option: change all connection strings.
0844             frontier_types = self.frontier_connection_name.keys()
0845         else:
0846             frontier_types = [frontier_type]
0847 
0848         # Make sure that each Frontier connection is specified only
0849         # once. (Okay, in a bit of a dodgy way...)
0850         for connection_name in frontier_types:
0851             if self.frontier_connection_overridden[connection_name] == True:
0852                 msg = "Please specify either:\n" \
0853                       "  `--frontier-connection' to change the " \
0854                       "Frontier connection used for everything, or\n" \
0855                       "either one or both of\n" \
0856                       "  `--frontier-connection-for-globaltag' to " \
0857                       "change the Frontier connection used for the " \
0858                       "GlobalTag and\n" \
0859                       "  `--frontier-connection-for-refhists' to change " \
0860                       "the Frontier connection used for the " \
0861                       "reference histograms."
0862                 self.logger.fatal(msg)
0863                 raise Usage(msg)
0864 
0865         frontier_prefix = "frontier://"
0866         if not value.startswith(frontier_prefix):
0867             msg = "Expecting Frontier connections to start with " \
0868                   "`%s'. You specified `%s'." % \
0869                   (frontier_prefix, value)
0870             self.logger.fatal(msg)
0871             raise Usage(msg)
0872         # We also kind of expect this to be either FrontierPrep or
0873         # FrontierProd (but this is just a warning).
0874         if value.find("FrontierProd") < 0 and \
0875                value.find("FrontierProd") < 0:
0876             msg = "Expecting Frontier connections to contain either " \
0877                   "`FrontierProd' or `FrontierPrep'. You specified " \
0878                   "`%s'. Are you sure?" % \
0879                   value
0880             self.logger.warning(msg)
0881 
0882         if not value.endswith("/"):
0883             value += "/"
0884 
0885         for connection_name in frontier_types:
0886             self.frontier_connection_name[connection_name] = value
0887             self.frontier_connection_overridden[connection_name] = True
0888 
0889             frontier_type_str = "unknown"
0890             if connection_name == "globaltag":
0891                 frontier_type_str = "the GlobalTag"
0892             elif connection_name == "refhists":
0893                 frontier_type_str = "the reference histograms"
0894 
0895             self.logger.warning("Overriding default Frontier " \
0896                                 "connection for %s " \
0897                                 "with `%s'" % \
0898                                 (frontier_type_str,
0899                                  self.frontier_connection_name[connection_name]))
0900 
0901         # End of option_handler_frontier_connection
0902 
0903     ##########
0904 
0905     def option_handler_input_todofile(self, option, opt_str, value, parser):
0906 
0907         self.todofile = value
0908         # End of option_handler_input_todofile.
0909 
0910     ##########
0911 
0912     def option_handler_input_Jsonfile(self, option, opt_str, value, parser):
0913 
0914         self.Jsonfilename = value
0915         # End of option_handler_input_Jsonfile.
0916 
0917     ##########
0918 
0919     def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser):
0920 
0921         self.Jsonrunfilename = value
0922         # End of option_handler_input_Jsonrunfile.
0923 
0924     ##########
0925 
0926     def option_handler_input_spec(self, option, opt_str, value, parser):
0927         """TODO TODO TODO
0928         Document this...
0929 
0930         """
0931 
0932         # Figure out if we were called for the `use these' or the
0933         # `ignore these' case.
0934         if opt_str.lower().find("ignore") > -1:
0935             spec_type = "ignore"
0936         else:
0937             spec_type = "use"
0938 
0939         # Similar: are we being called for datasets or for runs?
0940         if opt_str.lower().find("dataset") > -1:
0941             select_type = "datasets"
0942         else:
0943             select_type = "runs"
0944 
0945         if not self.input_method[select_type][spec_type] is None:
0946             msg = "Please only specify one input method " \
0947                   "(for the `%s' case)" % opt_str
0948             self.logger.fatal(msg)
0949             raise Usage(msg)
0950 
0951         input_method = opt_str.replace("-", "").replace("ignore", "")
0952         self.input_method[select_type][spec_type] = input_method
0953         self.input_name[select_type][spec_type] = value
0954 
0955         self.logger.debug("Input method for the `%s' case: %s" % \
0956                           (spec_type, input_method))
0957 
0958         # End of option_handler_input_spec
0959 
0960     ##########
0961 
0962     def option_handler_book_keeping_file(self, option, opt_str, value, parser):
0963         """Store the name of the file to be used for book keeping.
0964 
0965         The only check done here is that only a single book keeping
0966         file is specified.
0967 
0968         """
0969 
0970         file_name = value
0971 
0972         if not self.book_keeping_file_name is None:
0973             msg = "Only one book keeping file should be specified"
0974             self.logger.fatal(msg)
0975             raise Usage(msg)
0976         self.book_keeping_file_name = file_name
0977 
0978         self.logger.info("Book keeping file to be used: `%s'" % \
0979                          self.book_keeping_file_name)
0980 
0981         # End of option_handler_book_keeping_file.
0982 
0983     ##########
0984 
0985     def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser):
0986         """Store the name of the file for the ref. histogram mapping.
0987 
0988         """
0989 
0990         file_name = value
0991 
0992         if not self.ref_hist_mappings_file_name is None:
0993             msg = "Only one reference histogram mapping file " \
0994                   "should be specified"
0995             self.logger.fatal(msg)
0996             raise Usage(msg)
0997         self.ref_hist_mappings_file_name = file_name
0998 
0999         self.logger.info("Reference histogram mapping file " \
1000                          "to be used: `%s'" % \
1001                          self.ref_hist_mappings_file_name)
1002 
1003         # End of option_handler_ref_hist_mapping_file.
1004 
1005     ##########
1006 
1007     # OBSOLETE OBSOLETE OBSOLETE
1008 
1009 ##    def option_handler_dataset_name(self, option, opt_str, value, parser):
1010 ##        """Specify the name(s) of the dataset(s) to be processed.
1011 
1012 ##        It is checked to make sure that no dataset name or listfile
1013 ##        names are given yet. If all is well (i.e. we still have a
1014 ##        clean slate) the dataset name is stored for later use,
1015 ##        otherwise a Usage exception is raised.
1016 
1017 ##        """
1018 
1019 ##        if not self.input_method is None:
1020 ##            if self.input_method == "dataset":
1021 ##                raise Usage("Please only feed me one dataset specification")
1022 ##            elif self.input_method == "listfile":
1023 ##                raise Usage("Cannot specify both dataset and input list file")
1024 ##            else:
1025 ##                assert False, "Unknown input method `%s'" % self.input_method
1026 ##        self.input_method = "dataset"
1027 ##        self.input_name = value
1028 ##        self.logger.info("Input method used: %s" % self.input_method)
1029 
1030 ##        # End of option_handler_dataset_name.
1031 
1032 ##    ##########
1033 
1034 ##    def option_handler_listfile_name(self, option, opt_str, value, parser):
1035 ##        """Specify the input list file containing datasets to be processed.
1036 
1037 ##        It is checked to make sure that no dataset name or listfile
1038 ##        names are given yet. If all is well (i.e. we still have a
1039 ##        clean slate) the listfile name is stored for later use,
1040 ##        otherwise a Usage exception is raised.
1041 
1042 ##        """
1043 
1044 ##        if not self.input_method is None:
1045 ##            if self.input_method == "listfile":
1046 ##                raise Usage("Please only feed me one list file")
1047 ##            elif self.input_method == "dataset":
1048 ##                raise Usage("Cannot specify both dataset and input list file")
1049 ##            else:
1050 ##                assert False, "Unknown input method `%s'" % self.input_method
1051 ##        self.input_method = "listfile"
1052 ##        self.input_name = value
1053 ##        self.logger.info("Input method used: %s" % self.input_method)
1054 
1055 ##        # End of option_handler_listfile_name.
1056 
1057     # OBSOLETE OBSOLETE OBSOLETE end
1058 
1059     ##########
1060 
1061     def option_handler_castor_dir(self, option, opt_str, value, parser):
1062         """Specify where on CASTOR the output should go.
1063 
1064         At the moment only output to CERN CASTOR is
1065         supported. Eventually the harvested results should go into the
1066         central place for DQM on CASTOR anyway.
1067 
1068         """
1069 
1070         # Check format of specified CASTOR area.
1071         castor_dir = value
1072         #castor_dir = castor_dir.lstrip(os.path.sep)
1073         castor_prefix = self.castor_prefix
1074 
1075         # Add a leading slash if necessary and clean up the path.
1076         castor_dir = os.path.join(os.path.sep, castor_dir)
1077         self.castor_base_dir = os.path.normpath(castor_dir)
1078 
1079         self.logger.info("CASTOR (base) area to be used: `%s'" % \
1080                          self.castor_base_dir)
1081 
1082         # End of option_handler_castor_dir.
1083 
1084     ##########
1085 
1086     def option_handler_no_t1access(self, option, opt_str, value, parser):
1087         """Set the self.no_t1access flag to try and create jobs that
1088         run without special `t1access' role.
1089 
1090         """
1091 
1092         self.non_t1access = True
1093 
1094         self.logger.warning("Running in `non-t1access' mode. " \
1095                             "Will try to create jobs that run " \
1096                             "without special rights but no " \
1097                             "further promises...")
1098 
1099         # End of option_handler_no_t1access.
1100 
1101     ##########
1102 
1103     def option_handler_caf_access(self, option, opt_str, value, parser):
1104         """Set the self.caf_access flag to try and create jobs that
1105         run on the CAF.
1106 
1107         """
1108         self.caf_access = True
1109 
1110         self.logger.warning("Running in `caf_access' mode. " \
1111                             "Will try to create jobs that run " \
1112                             "on CAF but no" \
1113                             "further promises...")
1114 
1115         # End of option_handler_caf_access.
1116 
1117    ##########
1118 
1119     def option_handler_saveByLumiSection(self, option, opt_str, value, parser):
1120         """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1121         """
1122         self.saveByLumiSection = True
1123 
1124         self.logger.warning("waning concerning saveByLumiSection option")
1125 
1126         # End of option_handler_saveByLumiSection.
1127 
1128 
1129     ##########
1130 
1131     def option_handler_crab_submission(self, option, opt_str, value, parser):
1132         """Crab jobs are not created and
1133         "submitted automatically",
1134         """
1135         self.crab_submission = True
1136 
1137         # End of option_handler_crab_submission.
1138 
1139     ##########
1140 
1141     def option_handler_sites(self, option, opt_str, value, parser):
1142 
1143         self.nr_max_sites = value
1144 
1145     ##########
1146 
1147     def option_handler_preferred_site(self, option, opt_str, value, parser):
1148 
1149         self.preferred_site = value
1150 
1151     ##########
1152 
1153     def option_handler_list_types(self, option, opt_str, value, parser):
1154         """List all harvesting types and their mappings.
1155 
1156         This lists all implemented harvesting types with their
1157         corresponding mappings to sequence names. This had to be
1158         separated out from the help since it depends on the CMSSW
1159         version and was making things a bit of a mess.
1160 
1161         NOTE: There is no way (at least not that I could come up with)
1162         to code this in a neat generic way that can be read both by
1163         this method and by setup_harvesting_info(). Please try hard to
1164         keep these two methods in sync!
1165 
1166         """
1167 
1168         sep_line = "-" * 50
1169         sep_line_short = "-" * 20
1170 
1171         print(sep_line)
1172         print("The following harvesting types are available:")
1173         print(sep_line)
1174 
1175         print("`RelVal' maps to:")
1176         print("  pre-3_3_0           : HARVESTING:validationHarvesting")
1177         print("  3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting")
1178         print("  Exceptions:")
1179         print("    3_3_0_pre1-4        : HARVESTING:validationHarvesting")
1180         print("    3_3_0_pre6          : HARVESTING:validationHarvesting")
1181         print("    3_4_0_pre1          : HARVESTING:validationHarvesting")
1182 
1183         print(sep_line_short)
1184 
1185         print("`RelValFS' maps to:")
1186         print("  always              : HARVESTING:validationHarvestingFS")
1187 
1188         print(sep_line_short)
1189 
1190         print("`MC' maps to:")
1191         print("    always          : HARVESTING:validationprodHarvesting")
1192 
1193         print(sep_line_short)
1194 
1195         print("`DQMOffline' maps to:")
1196         print("  always              : HARVESTING:dqmHarvesting")
1197 
1198         print(sep_line)
1199 
1200         # We're done, let's quit. (This is the same thing optparse
1201         # does after printing the help.)
1202         raise SystemExit
1203 
1204         # End of option_handler_list_types.
1205 
1206     ##########
1207 
1208     def setup_harvesting_info(self):
1209         """Fill our dictionary with all info needed to understand
1210         harvesting.
1211 
1212         This depends on the CMSSW version since at some point the
1213         names and sequences were modified.
1214 
1215         NOTE: There is no way (at least not that I could come up with)
1216         to code this in a neat generic way that can be read both by
1217         this method and by option_handler_list_types(). Please try
1218         hard to keep these two methods in sync!
1219 
1220         """
1221 
1222         assert not self.cmssw_version is None, \
1223                "ERROR setup_harvesting() requires " \
1224                "self.cmssw_version to be set!!!"
1225 
1226         harvesting_info = {}
1227 
1228         # This is the version-independent part.
1229         harvesting_info["DQMOffline"] = {}
1230         harvesting_info["DQMOffline"]["beamspot"] = None
1231         harvesting_info["DQMOffline"]["eventcontent"] = None
1232         harvesting_info["DQMOffline"]["harvesting"] = "AtRunEnd"
1233 
1234         harvesting_info["RelVal"] = {}
1235         harvesting_info["RelVal"]["beamspot"] = None
1236         harvesting_info["RelVal"]["eventcontent"] = None
1237         harvesting_info["RelVal"]["harvesting"] = "AtRunEnd"
1238 
1239         harvesting_info["RelValFS"] = {}
1240         harvesting_info["RelValFS"]["beamspot"] = None
1241         harvesting_info["RelValFS"]["eventcontent"] = None
1242         harvesting_info["RelValFS"]["harvesting"] = "AtRunEnd"
1243 
1244         harvesting_info["MC"] = {}
1245         harvesting_info["MC"]["beamspot"] = None
1246         harvesting_info["MC"]["eventcontent"] = None
1247         harvesting_info["MC"]["harvesting"] = "AtRunEnd"
1248 
1249         # This is the version-dependent part. And I know, strictly
1250         # speaking it's not necessary to fill in all three types since
1251         # in a single run we'll only use one type anyway. This does
1252         # look more readable, however, and required less thought from
1253         # my side when I put this together.
1254 
1255         # DEBUG DEBUG DEBUG
1256         # Check that we understand our own version naming.
1257         assert self.cmssw_version.startswith("CMSSW_")
1258         # DEBUG DEBUG DEBUG end
1259 
1260         version = self.cmssw_version[6:]
1261 
1262         #----------
1263 
1264         # RelVal
1265         step_string = None
1266         if version < "3_3_0":
1267             step_string = "validationHarvesting"
1268         elif version in ["3_3_0_pre1", "3_3_0_pre2",
1269                          "3_3_0_pre3", "3_3_0_pre4",
1270                          "3_3_0_pre6", "3_4_0_pre1"]:
1271             step_string = "validationHarvesting"
1272         else:
1273             step_string = "validationHarvesting+dqmHarvesting"
1274 
1275         harvesting_info["RelVal"]["step_string"] = step_string
1276 
1277         # DEBUG DEBUG DEBUG
1278         # Let's make sure we found something.
1279         assert not step_string is None, \
1280                "ERROR Could not decide a RelVal harvesting sequence " \
1281                "for CMSSW version %s" % self.cmssw_version
1282         # DEBUG DEBUG DEBUG end
1283 
1284         #----------
1285 
1286         # RelVal
1287         step_string = "validationHarvestingFS"
1288 
1289         harvesting_info["RelValFS"]["step_string"] = step_string
1290 
1291         #----------
1292 
1293         # MC
1294         step_string = "validationprodHarvesting"
1295 
1296         harvesting_info["MC"]["step_string"] = step_string
1297 
1298         # DEBUG DEBUG DEBUG
1299         # Let's make sure we found something.
1300         assert not step_string is None, \
1301                "ERROR Could not decide a MC harvesting " \
1302                "sequence for CMSSW version %s" % self.cmssw_version
1303         # DEBUG DEBUG DEBUG end
1304 
1305         #----------
1306 
1307         # DQMOffline
1308         step_string = "dqmHarvesting"
1309 
1310         harvesting_info["DQMOffline"]["step_string"] = step_string
1311 
1312         #----------
1313 
1314         self.harvesting_info = harvesting_info
1315 
1316         self.logger.info("Based on the CMSSW version (%s) " \
1317                          "I decided to use the `HARVESTING:%s' " \
1318                          "sequence for %s harvesting" % \
1319                          (self.cmssw_version,
1320                           self.harvesting_info[self.harvesting_type]["step_string"],
1321                           self.harvesting_type))
1322 
1323         # End of setup_harvesting_info.
1324 
1325     ##########
1326 
1327     def create_castor_path_name_common(self, dataset_name):
1328         """Build the common part of the output path to be used on
1329         CASTOR.
1330 
1331         This consists of the CASTOR area base path specified by the
1332         user and a piece depending on the data type (data vs. MC), the
1333         harvesting type and the dataset name followed by a piece
1334         containing the run number and event count. (See comments in
1335         create_castor_path_name_special for details.) This method
1336         creates the common part, without run number and event count.
1337 
1338         """
1339 
1340         castor_path = self.castor_base_dir
1341 
1342         ###
1343 
1344         # The data type: data vs. mc.
1345         datatype = self.datasets_information[dataset_name]["datatype"]
1346         datatype = datatype.lower()
1347         castor_path = os.path.join(castor_path, datatype)
1348 
1349         # The harvesting type.
1350         harvesting_type = self.harvesting_type
1351         harvesting_type = harvesting_type.lower()
1352         castor_path = os.path.join(castor_path, harvesting_type)
1353 
1354         # The CMSSW release version (only the `digits'). Note that the
1355         # CMSSW version used here is the version used for harvesting,
1356         # not the one from the dataset. This does make the results
1357         # slightly harder to find. On the other hand it solves
1358         # problems in case one re-harvests a given dataset with a
1359         # different CMSSW version, which would lead to ambiguous path
1360         # names. (Of course for many cases the harvesting is done with
1361         # the same CMSSW version the dataset was created with.)
1362         release_version = self.cmssw_version
1363         release_version = release_version.lower(). \
1364                           replace("cmssw", ""). \
1365                           strip("_")
1366         castor_path = os.path.join(castor_path, release_version)
1367 
1368         # The dataset name.
1369         dataset_name_escaped = self.escape_dataset_name(dataset_name)
1370         castor_path = os.path.join(castor_path, dataset_name_escaped)
1371 
1372         ###
1373 
1374         castor_path = os.path.normpath(castor_path)
1375 
1376         # End of create_castor_path_name_common.
1377         return castor_path
1378 
1379     ##########
1380 
1381     def create_castor_path_name_special(self,
1382                                         dataset_name, run_number,
1383                                         castor_path_common):
1384         """Create the specialised part of the CASTOR output dir name.
1385 
1386         NOTE: To avoid clashes with `incremental harvesting'
1387         (re-harvesting when a dataset grows) we have to include the
1388         event count in the path name. The underlying `problem' is that
1389         CRAB does not overwrite existing output files so if the output
1390         file already exists CRAB will fail to copy back the output.
1391 
1392         NOTE: It's not possible to create different kinds of
1393         harvesting jobs in a single call to this tool. However, in
1394         principle it could be possible to create both data and MC jobs
1395         in a single go.
1396 
1397         NOTE: The number of events used in the path name is the
1398         _total_ number of events in the dataset/run at the time of
1399         harvesting. If we're doing partial harvesting the final
1400         results will reflect lower statistics. This is a) the easiest
1401         to code and b) the least likely to lead to confusion if
1402         someone ever decides to swap/copy around file blocks between
1403         sites.
1404 
1405         """
1406 
1407         castor_path = castor_path_common
1408 
1409         ###
1410 
1411         # The run number part.
1412         castor_path = os.path.join(castor_path, "run_%d" % run_number)
1413 
1414         ###
1415 
1416         # The event count (i.e. the number of events we currently see
1417         # for this dataset).
1418         #nevents = self.datasets_information[dataset_name] \
1419         #          ["num_events"][run_number]
1420         castor_path = os.path.join(castor_path, "nevents")
1421 
1422         ###
1423 
1424         castor_path = os.path.normpath(castor_path)
1425 
1426         # End of create_castor_path_name_special.
1427         return castor_path
1428 
1429     ##########
1430 
1431     def create_and_check_castor_dirs(self):
1432         """Make sure all required CASTOR output dirs exist.
1433 
1434         This checks the CASTOR base dir specified by the user as well
1435         as all the subdirs required by the current set of jobs.
1436 
1437         """
1438 
1439         self.logger.info("Checking (and if necessary creating) CASTOR " \
1440                          "output area(s)...")
1441 
1442         # Call the real checker method for the base dir.
1443         self.create_and_check_castor_dir(self.castor_base_dir)
1444 
1445         # Now call the checker for all (unique) subdirs.
1446         castor_dirs = []
1447         for (dataset_name, runs) in self.datasets_to_use.items():
1448 
1449             for run in runs:
1450                 castor_dirs.append(self.datasets_information[dataset_name] \
1451                                    ["castor_path"][run])
1452         castor_dirs_unique = sorted(set(castor_dirs))
1453         # This can take some time. E.g. CRAFT08 has > 300 runs, each
1454         # of which will get a new directory. So we show some (rough)
1455         # info in between.
1456         ndirs = len(castor_dirs_unique)
1457         step = max(ndirs / 10, 1)
1458         for (i, castor_dir) in enumerate(castor_dirs_unique):
1459             if (i + 1) % step == 0 or \
1460                    (i + 1) == ndirs:
1461                 self.logger.info("  %d/%d" % \
1462                                  (i + 1, ndirs))
1463             self.create_and_check_castor_dir(castor_dir)
1464 
1465             # Now check if the directory is empty. If (an old version
1466             # of) the output file already exists CRAB will run new
1467             # jobs but never copy the results back. We assume the user
1468             # knows what they are doing and only issue a warning in
1469             # case the directory is not empty.
1470             self.logger.debug("Checking if path `%s' is empty" % \
1471                               castor_dir)
1472             cmd = "rfdir %s" % castor_dir
1473             (status, output) = subprocess.getstatusoutput(cmd)
1474             if status != 0:
1475                 msg = "Could not access directory `%s'" \
1476                       " !!! This is bad since I should have just" \
1477                       " created it !!!" % castor_dir
1478                 self.logger.fatal(msg)
1479                 raise Error(msg)
1480             if len(output) > 0:
1481                 self.logger.warning("Output directory `%s' is not empty:" \
1482                                     " new jobs will fail to" \
1483                                     " copy back output" % \
1484                                     castor_dir)
1485 
1486         # End of create_and_check_castor_dirs.
1487 
1488     ##########
1489 
1490     def create_and_check_castor_dir(self, castor_dir):
1491         """Check existence of the give CASTOR dir, if necessary create
1492         it.
1493 
1494         Some special care has to be taken with several things like
1495         setting the correct permissions such that CRAB can store the
1496         output results. Of course this means that things like
1497         /castor/cern.ch/ and user/j/ have to be recognised and treated
1498         properly.
1499 
1500         NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1501         the moment.
1502 
1503         NOTE: This method uses some slightly tricky caching to make
1504         sure we don't keep over and over checking the same base paths.
1505 
1506         """
1507 
1508         ###
1509 
1510         # Local helper function to fully split a path into pieces.
1511         def split_completely(path):
1512             (parent_path, name) = os.path.split(path)
1513             if name == "":
1514                 return (parent_path, )
1515             else:
1516                 return split_completely(parent_path) + (name, )
1517 
1518         ###
1519 
1520         # Local helper function to check rfio (i.e. CASTOR)
1521         # directories.
1522         def extract_permissions(rfstat_output):
1523             """Parse the output from rfstat and return the
1524             5-digit permissions string."""
1525 
1526             permissions_line = [i for i in output.split("\n") \
1527                                 if i.lower().find("protection") > -1]
1528             regexp = re.compile(".*\(([0123456789]{5})\).*")
1529             match = regexp.search(rfstat_output)
1530             if not match or len(match.groups()) != 1:
1531                 msg = "Could not extract permissions " \
1532                       "from output: %s" % rfstat_output
1533                 self.logger.fatal(msg)
1534                 raise Error(msg)
1535             permissions = match.group(1)
1536 
1537             # End of extract_permissions.
1538             return permissions
1539 
1540         ###
1541 
1542         # These are the pieces of CASTOR directories that we do not
1543         # want to touch when modifying permissions.
1544 
1545         # NOTE: This is all a bit involved, basically driven by the
1546         # fact that one wants to treat the `j' directory of
1547         # `/castor/cern.ch/user/j/jhegeman/' specially.
1548         # BUG BUG BUG
1549         # This should be simplified, for example by comparing to the
1550         # CASTOR prefix or something like that.
1551         # BUG BUG BUG end
1552         castor_paths_dont_touch = {
1553             0: ["/", "castor", "cern.ch", "cms", "store", "temp",
1554                 "dqm", "offline", "user"],
1555             -1: ["user", "store"]
1556             }
1557 
1558         self.logger.debug("Checking CASTOR path `%s'" % castor_dir)
1559 
1560         ###
1561 
1562         # First we take the full CASTOR path apart.
1563         castor_path_pieces = split_completely(castor_dir)
1564 
1565         # Now slowly rebuild the CASTOR path and see if a) all
1566         # permissions are set correctly and b) the final destination
1567         # exists.
1568         path = ""
1569         check_sizes = sorted(castor_paths_dont_touch.keys())
1570         len_castor_path_pieces = len(castor_path_pieces)
1571         for piece_index in range (len_castor_path_pieces):
1572             skip_this_path_piece = False
1573             piece = castor_path_pieces[piece_index]
1574 ##            self.logger.debug("Checking CASTOR path piece `%s'" % \
1575 ##                              piece)
1576             for check_size in check_sizes:
1577                 # Do we need to do anything with this?
1578                 if (piece_index + check_size) > -1:
1579 ##                    self.logger.debug("Checking `%s' against `%s'" % \
1580 ##                                      (castor_path_pieces[piece_index + check_size],
1581 ##                                       castor_paths_dont_touch[check_size]))
1582                     if castor_path_pieces[piece_index + check_size] in castor_paths_dont_touch[check_size]:
1583 ##                        self.logger.debug("  skipping")
1584                         skip_this_path_piece = True
1585 ##                    else:
1586 ##                        # Piece not in the list, fine.
1587 ##                        self.logger.debug("  accepting")
1588             # Add piece to the path we're building.
1589 ##            self.logger.debug("!!! Skip path piece `%s'? %s" % \
1590 ##                              (piece, str(skip_this_path_piece)))
1591 ##            self.logger.debug("Adding piece to path...")
1592             path = os.path.join(path, piece)
1593 ##            self.logger.debug("Path is now `%s'" % \
1594 ##                              path)
1595 
1596             # Hmmmm, only at this point can we do some caching. Not
1597             # ideal, but okay.
1598             try:
1599                 if path in self.castor_path_checks_cache:
1600                     continue
1601             except AttributeError:
1602                 # This only happens the first time around.
1603                 self.castor_path_checks_cache = []
1604             self.castor_path_checks_cache.append(path)
1605 
1606             # Now, unless we're supposed to skip this piece of the
1607             # path, let's make sure it exists and set the permissions
1608             # correctly for use by CRAB. This means that:
1609             # - the final output directory should (at least) have
1610             #   permissions 775
1611             # - all directories above that should (at least) have
1612             #   permissions 755.
1613 
1614             # BUT: Even though the above permissions are the usual
1615             # ones to used when setting up CASTOR areas for grid job
1616             # output, there is one caveat in case multiple people are
1617             # working in the same CASTOR area. If user X creates
1618             # /a/b/c/ and user Y wants to create /a/b/d/ he/she does
1619             # not have sufficient rights. So: we set all dir
1620             # permissions to 775 to avoid this.
1621 
1622             if not skip_this_path_piece:
1623 
1624                 # Ok, first thing: let's make sure this directory
1625                 # exists.
1626                 # NOTE: The nice complication is of course that the
1627                 # usual os.path.isdir() etc. methods don't work for an
1628                 # rfio filesystem. So we call rfstat and interpret an
1629                 # error as meaning that the path does not exist.
1630                 self.logger.debug("Checking if path `%s' exists" % \
1631                                   path)
1632                 cmd = "rfstat %s" % path
1633                 (status, output) = subprocess.getstatusoutput(cmd)
1634                 if status != 0:
1635                     # Path does not exist, let's try and create it.
1636                     self.logger.debug("Creating path `%s'" % path)
1637                     cmd = "nsmkdir -m 775 %s" % path
1638                     (status, output) = subprocess.getstatusoutput(cmd)
1639                     if status != 0:
1640                         msg = "Could not create directory `%s'" % path
1641                         self.logger.fatal(msg)
1642                         raise Error(msg)
1643                     cmd = "rfstat %s" % path
1644                     (status, output) = subprocess.getstatusoutput(cmd)
1645                 # Now check that it looks like a directory. If I'm not
1646                 # mistaken one can deduce this from the fact that the
1647                 # (octal) permissions string starts with `40' (instead
1648                 # of `100').
1649                 permissions = extract_permissions(output)
1650                 if not permissions.startswith("40"):
1651                     msg = "Path `%s' is not a directory(?)" % path
1652                     self.logger.fatal(msg)
1653                     raise Error(msg)
1654 
1655                 # Figure out the current permissions for this
1656                 # (partial) path.
1657                 self.logger.debug("Checking permissions for path `%s'" % path)
1658                 cmd = "rfstat %s" % path
1659                 (status, output) = subprocess.getstatusoutput(cmd)
1660                 if status != 0:
1661                     msg = "Could not obtain permissions for directory `%s'" % \
1662                           path
1663                     self.logger.fatal(msg)
1664                     raise Error(msg)
1665                 # Take the last three digits of the permissions.
1666                 permissions = extract_permissions(output)[-3:]
1667 
1668                 # Now if necessary fix permissions.
1669                 # NOTE: Be careful never to `downgrade' permissions.
1670                 if piece_index == (len_castor_path_pieces - 1):
1671                     # This means we're looking at the final
1672                     # destination directory.
1673                     permissions_target = "775"
1674                 else:
1675                     # `Only' an intermediate directory.
1676                     permissions_target = "775"
1677 
1678                 # Compare permissions.
1679                 permissions_new = []
1680                 for (i, j) in zip(permissions, permissions_target):
1681                     permissions_new.append(str(max(int(i), int(j))))
1682                 permissions_new = "".join(permissions_new)
1683                 self.logger.debug("  current permissions: %s" % \
1684                                   permissions)
1685                 self.logger.debug("  target permissions : %s" % \
1686                                   permissions_target)
1687                 if permissions_new != permissions:
1688                     # We have to modify the permissions.
1689                     self.logger.debug("Changing permissions of `%s' " \
1690                                       "to %s (were %s)" % \
1691                                       (path, permissions_new, permissions))
1692                     cmd = "rfchmod %s %s" % (permissions_new, path)
1693                     (status, output) = subprocess.getstatusoutput(cmd)
1694                     if status != 0:
1695                         msg = "Could not change permissions for path `%s' " \
1696                               "to %s" % (path, permissions_new)
1697                         self.logger.fatal(msg)
1698                         raise Error(msg)
1699 
1700                 self.logger.debug("  Permissions ok (%s)" % permissions_new)
1701 
1702         # End of create_and_check_castor_dir.
1703 
1704     ##########
1705 
1706     def pick_a_site(self, sites, cmssw_version):
1707 
1708         # Create list of forbidden sites
1709         sites_forbidden = []
1710 
1711         if (self.preferred_site == "CAF") or (self.preferred_site == "caf.cern.ch"):
1712             self.caf_access = True
1713 
1714         if self.caf_access == False:
1715             sites_forbidden.append("caf.cern.ch")
1716 
1717         # These are the T1 sites. These are only forbidden if we're
1718         # running in non-T1 mode.
1719         # Source:
1720         # https://cmsweb.cern.ch/sitedb/sitelist/?naming_scheme=ce
1721         # Hard-coded, yes. Not nice, no.
1722 
1723         all_t1 = [
1724                 "srm-cms.cern.ch",
1725                 "ccsrm.in2p3.fr",
1726                 "cmssrm-fzk.gridka.de",
1727                 "cmssrm.fnal.gov",
1728                 "gridka-dCache.fzk.de",
1729                 "srm-cms.gridpp.rl.ac.uk",
1730                 "srm.grid.sinica.edu.tw",
1731                 "srm2.grid.sinica.edu.tw",
1732                 "srmcms.pic.es",
1733                 "storm-fe-cms.cr.cnaf.infn.it"
1734                 ]
1735 
1736         country_codes = {
1737               "CAF" : "caf.cern.ch",
1738               "CH" : "srm-cms.cern.ch",
1739               "FR" : "ccsrm.in2p3.fr",
1740               "DE" : "cmssrm-fzk.gridka.de",
1741               "GOV" : "cmssrm.fnal.gov",
1742               "DE2" : "gridka-dCache.fzk.de",
1743               "UK" : "srm-cms.gridpp.rl.ac.uk",
1744               "TW" : "srm.grid.sinica.edu.tw",
1745               "TW2" : "srm2.grid.sinica.edu.tw",
1746               "ES" : "srmcms.pic.es",
1747               "IT" : "storm-fe-cms.cr.cnaf.infn.it"
1748                 }
1749 
1750         if self.non_t1access: 
1751             sites_forbidden.extend(all_t1)
1752 
1753         for site in sites_forbidden:
1754             if site in sites:
1755                 sites.remove(site)
1756 
1757         if self.preferred_site in country_codes:
1758             self.preferred_site = country_codes[self.preferred_site]
1759 
1760         if self.preferred_site != "no preference":
1761             if self.preferred_site in sites:
1762                 sites = [self.preferred_site]
1763             else:
1764                 sites= []
1765 
1766         #print sites
1767 
1768         # Looks like we have to do some caching here, otherwise things
1769         # become waaaay toooo sloooooow. So that's what the
1770         # sites_and_versions_cache does.
1771 
1772         # NOTE: Keep this set to None!
1773         site_name = None
1774         cmd = None
1775         while len(sites) > 0 and \
1776               site_name is None:
1777 
1778             # Create list of t1_sites
1779             t1_sites = []
1780             for site in sites:
1781                 if site in all_t1:
1782                     t1_sites.append(site)
1783                 if site == "caf.cern.ch":
1784                     t1_sites.append(site)
1785 
1786             # If avilable pick preferred site
1787             #if self.preferred_site in sites:
1788             #  se_name = self.preferred_site
1789             # Else, if available pick t1 site
1790 
1791             if len(t1_sites) > 0:
1792                 se_name = choice(t1_sites)
1793             # Else pick any site
1794             else:
1795                 se_name = choice(sites)
1796 
1797             # But check that it hosts the CMSSW version we want.
1798 
1799             if se_name in self.sites_and_versions_cache and \
1800                    cmssw_version in self.sites_and_versions_cache[se_name]:
1801                 if self.sites_and_versions_cache[se_name][cmssw_version]:
1802                     site_name = se_name
1803                     break
1804                 else:
1805                     self.logger.debug("  --> rejecting site `%s'" % se_name)
1806                     sites.remove(se_name)
1807 
1808             else:
1809                 self.logger.info("Checking if site `%s' " \
1810                                  "has CMSSW version `%s'" % \
1811                                  (se_name, cmssw_version))
1812                 self.sites_and_versions_cache[se_name] = {}
1813 
1814                 # TODO TODO TODO
1815                 # Test for SCRAM architecture removed as per request
1816                 # from Andreas.
1817                 # scram_arch = os.getenv("SCRAM_ARCH")
1818                 # cmd = "lcg-info --list-ce " \
1819                 #       "--query '" \
1820                 #       "Tag=VO-cms-%s," \
1821                 #       "Tag=VO-cms-%s," \
1822                 #       "CEStatus=Production," \
1823                 #       "CloseSE=%s'" % \
1824                 #       (cmssw_version, scram_arch, se_name)
1825                 # TODO TODO TODO end
1826 
1827                 cmd = "lcg-info --list-ce " \
1828                       "--query '" \
1829                       "Tag=VO-cms-%s," \
1830                       "CEStatus=Production," \
1831                       "CloseSE=%s'" % \
1832                       (cmssw_version, se_name)
1833                 (status, output) = subprocess.getstatusoutput(cmd)
1834                 if status != 0:
1835                     self.logger.error("Could not check site information " \
1836                                       "for site `%s'" % se_name)
1837                 else:
1838                     if (len(output) > 0) or (se_name == "caf.cern.ch"):
1839                         self.sites_and_versions_cache[se_name][cmssw_version] = True
1840                         site_name = se_name
1841                         break
1842                     else:
1843                         self.sites_and_versions_cache[se_name][cmssw_version] = False
1844                         self.logger.debug("  --> rejecting site `%s'" % se_name)
1845                         sites.remove(se_name)
1846 
1847         if site_name is self.no_matching_site_found_str:
1848             self.logger.error("  --> no matching site found")
1849             self.logger.error("    --> Your release or SCRAM " \
1850                               "architecture may not be available" \
1851                               "anywhere on the (LCG) grid.")
1852             if not cmd is None:
1853                 self.logger.debug("      (command used: `%s')" % cmd)
1854         else:
1855             self.logger.debug("  --> selected site `%s'" % site_name)
1856 
1857         # Return something more descriptive (than `None') in case we
1858         # found nothing.
1859         if site_name is None:
1860             site_name = self.no_matching_site_found_str
1861             # Keep track of our global flag signifying that this
1862             # happened.
1863             self.all_sites_found = False
1864 
1865         # End of pick_a_site.
1866         return site_name
1867 
1868     ##########
1869 
1870     def parse_cmd_line_options(self):
1871 
1872         # Set up the command line parser. Note that we fix up the help
1873         # formatter so that we can add some text pointing people to
1874         # the Twiki etc.
1875         parser = optparse.OptionParser(version="%s %s" % \
1876                                        ("%prog", self.version),
1877                                        formatter=CMSHarvesterHelpFormatter())
1878 
1879         self.option_parser = parser
1880 
1881         # The debug switch.
1882         parser.add_option("-d", "--debug",
1883                           help="Switch on debug mode",
1884                           action="callback",
1885                           callback=self.option_handler_debug)
1886 
1887         # The quiet switch.
1888         parser.add_option("-q", "--quiet",
1889                           help="Be less verbose",
1890                           action="callback",
1891                           callback=self.option_handler_quiet)
1892 
1893         # The force switch. If this switch is used sanity checks are
1894         # performed but failures do not lead to aborts. Use with care.
1895         parser.add_option("", "--force",
1896                           help="Force mode. Do not abort on sanity check "
1897                           "failures",
1898                           action="callback",
1899                           callback=self.option_handler_force)
1900 
1901         # Choose between the different kinds of harvesting.
1902         parser.add_option("", "--harvesting_type",
1903                           help="Harvesting type: %s" % \
1904                           ", ".join(self.harvesting_types),
1905                           action="callback",
1906                           callback=self.option_handler_harvesting_type,
1907                           type="string",
1908                           metavar="HARVESTING_TYPE")
1909 
1910         # Choose between single-step and two-step mode.
1911         parser.add_option("", "--harvesting_mode",
1912                           help="Harvesting mode: %s (default = %s)" % \
1913                           (", ".join(self.harvesting_modes),
1914                           self.harvesting_mode_default),
1915                           action="callback",
1916                           callback=self.option_handler_harvesting_mode,
1917                           type="string",
1918                           metavar="HARVESTING_MODE")
1919 
1920         # Override the GlobalTag chosen by the cmsHarvester.
1921         parser.add_option("", "--globaltag",
1922                           help="GlobalTag to use. Default is the ones " \
1923                           "the dataset was created with for MC, for data" \
1924                           "a GlobalTag has to be specified.",
1925                           action="callback",
1926                           callback=self.option_handler_globaltag,
1927                           type="string",
1928                           metavar="GLOBALTAG")
1929 
1930         # Allow switching off of reference histograms.
1931         parser.add_option("", "--no-ref-hists",
1932                           help="Don't use any reference histograms",
1933                           action="callback",
1934                           callback=self.option_handler_no_ref_hists)
1935 
1936         # Allow the default (i.e. the one that should be used)
1937         # Frontier connection to be overridden.
1938         parser.add_option("", "--frontier-connection",
1939                           help="Use this Frontier connection to find " \
1940                           "GlobalTags and LocalTags (for reference " \
1941                           "histograms).\nPlease only use this for " \
1942                           "testing.",
1943                           action="callback",
1944                           callback=self.option_handler_frontier_connection,
1945                           type="string",
1946                           metavar="FRONTIER")
1947 
1948         # Similar to the above but specific to the Frontier connection
1949         # to be used for the GlobalTag.
1950         parser.add_option("", "--frontier-connection-for-globaltag",
1951                           help="Use this Frontier connection to find " \
1952                           "GlobalTags.\nPlease only use this for " \
1953                           "testing.",
1954                           action="callback",
1955                           callback=self.option_handler_frontier_connection,
1956                           type="string",
1957                           metavar="FRONTIER")
1958 
1959         # Similar to the above but specific to the Frontier connection
1960         # to be used for the reference histograms.
1961         parser.add_option("", "--frontier-connection-for-refhists",
1962                           help="Use this Frontier connection to find " \
1963                           "LocalTags (for reference " \
1964                           "histograms).\nPlease only use this for " \
1965                           "testing.",
1966                           action="callback",
1967                           callback=self.option_handler_frontier_connection,
1968                           type="string",
1969                           metavar="FRONTIER")
1970 
1971         # Option to specify the name (or a regexp) of the dataset(s)
1972         # to be used.
1973         parser.add_option("", "--dataset",
1974                           help="Name (or regexp) of dataset(s) to process",
1975                           action="callback",
1976                           #callback=self.option_handler_dataset_name,
1977                           callback=self.option_handler_input_spec,
1978                           type="string",
1979                           #dest="self.input_name",
1980                           metavar="DATASET")
1981 
1982         # Option to specify the name (or a regexp) of the dataset(s)
1983         # to be ignored.
1984         parser.add_option("", "--dataset-ignore",
1985                           help="Name (or regexp) of dataset(s) to ignore",
1986                           action="callback",
1987                           callback=self.option_handler_input_spec,
1988                           type="string",
1989                           metavar="DATASET-IGNORE")
1990 
1991         # Option to specify the name (or a regexp) of the run(s)
1992         # to be used.
1993         parser.add_option("", "--runs",
1994                           help="Run number(s) to process",
1995                           action="callback",
1996                           callback=self.option_handler_input_spec,
1997                           type="string",
1998                           metavar="RUNS")                   
1999 
2000         # Option to specify the name (or a regexp) of the run(s)
2001         # to be ignored.
2002         parser.add_option("", "--runs-ignore",
2003                           help="Run number(s) to ignore",
2004                           action="callback",
2005                           callback=self.option_handler_input_spec,
2006                           type="string",
2007                           metavar="RUNS-IGNORE")
2008 
2009         # Option to specify a file containing a list of dataset names
2010         # (or regexps) to be used.
2011         parser.add_option("", "--datasetfile",
2012                           help="File containing list of dataset names " \
2013                           "(or regexps) to process",
2014                           action="callback",
2015                           #callback=self.option_handler_listfile_name,
2016                           callback=self.option_handler_input_spec,
2017                           type="string",
2018                           #dest="self.input_name",
2019                           metavar="DATASETFILE")
2020 
2021         # Option to specify a file containing a list of dataset names
2022         # (or regexps) to be ignored.
2023         parser.add_option("", "--datasetfile-ignore",
2024                           help="File containing list of dataset names " \
2025                           "(or regexps) to ignore",
2026                           action="callback",
2027                           callback=self.option_handler_input_spec,
2028                           type="string",
2029                           metavar="DATASETFILE-IGNORE")
2030 
2031         # Option to specify a file containing a list of runs to be
2032         # used.
2033         parser.add_option("", "--runslistfile",
2034                           help="File containing list of run numbers " \
2035                           "to process",
2036                           action="callback",
2037                           callback=self.option_handler_input_spec,
2038                           type="string",
2039                           metavar="RUNSLISTFILE")
2040 
2041         # Option to specify a file containing a list of runs
2042         # to be ignored.
2043         parser.add_option("", "--runslistfile-ignore",
2044                           help="File containing list of run numbers " \
2045                           "to ignore",
2046                           action="callback",
2047                           callback=self.option_handler_input_spec,
2048                           type="string",
2049                           metavar="RUNSLISTFILE-IGNORE")
2050 
2051         # Option to specify a Jsonfile contaning a list of runs
2052         # to be used.
2053         parser.add_option("", "--Jsonrunfile",
2054                           help="Jsonfile containing dictionary of run/lumisections pairs. " \
2055                           "All lumisections of runs contained in dictionary are processed.",
2056                           action="callback",
2057                           callback=self.option_handler_input_Jsonrunfile,
2058                           type="string",
2059                           metavar="JSONRUNFILE")
2060 
2061         # Option to specify a Jsonfile contaning a dictionary of run/lumisections pairs
2062         # to be used.
2063         parser.add_option("", "--Jsonfile",
2064                           help="Jsonfile containing dictionary of run/lumisections pairs. " \
2065                           "Only specified lumisections of runs contained in dictionary are processed.",
2066                           action="callback",
2067                           callback=self.option_handler_input_Jsonfile,
2068                           type="string",
2069                           metavar="JSONFILE")
2070 
2071         # Option to specify a ToDo file contaning a list of runs
2072         # to be used.
2073         parser.add_option("", "--todo-file",
2074                           help="Todo file containing a list of runs to process.",
2075                           action="callback",
2076                           callback=self.option_handler_input_todofile,
2077                           type="string",
2078                           metavar="TODO-FILE")
2079 
2080         # Option to specify which file to use for the dataset name to
2081         # reference histogram name mappings.
2082         parser.add_option("", "--refhistmappingfile",
2083                           help="File to be use for the reference " \
2084                           "histogram mappings. Default: `%s'." % \
2085                           self.ref_hist_mappings_file_name_default,
2086                           action="callback",
2087                           callback=self.option_handler_ref_hist_mapping_file,
2088                           type="string",
2089                           metavar="REFHISTMAPPING-FILE")
2090 
2091         # Specify the place in CASTOR where the output should go.
2092         # NOTE: Only output to CASTOR is supported for the moment,
2093         # since the central DQM results place is on CASTOR anyway.
2094         parser.add_option("", "--castordir",
2095                           help="Place on CASTOR to store results. " \
2096                           "Default: `%s'." % \
2097                           self.castor_base_dir_default,
2098                           action="callback",
2099                           callback=self.option_handler_castor_dir,
2100                           type="string",
2101                           metavar="CASTORDIR")
2102 
2103         # Use this to try and create jobs that will run without
2104         # special `t1access' role.
2105         parser.add_option("", "--no-t1access",
2106                           help="Try to create jobs that will run " \
2107                           "without special `t1access' role",
2108                           action="callback",
2109                           callback=self.option_handler_no_t1access)
2110 
2111         # Use this to create jobs that may run on CAF
2112         parser.add_option("", "--caf-access",
2113                           help="Crab jobs may run " \
2114                           "on CAF",
2115                           action="callback",
2116                           callback=self.option_handler_caf_access)
2117 
2118         # set process.dqmSaver.saveByLumiSection=1 in harvesting cfg file
2119         parser.add_option("", "--saveByLumiSection",
2120                           help="set saveByLumiSection=1 in harvesting cfg file",
2121                           action="callback",
2122                           callback=self.option_handler_saveByLumiSection)
2123 
2124         # Use this to enable automatic creation and submission of crab jobs
2125         parser.add_option("", "--automatic-crab-submission",
2126                           help="Crab jobs are created and " \
2127                           "submitted automatically",
2128                           action="callback",
2129                           callback=self.option_handler_crab_submission)
2130 
2131         # Option to set the max number of sites, each
2132         #job is submitted to 
2133         parser.add_option("", "--max-sites",
2134                           help="Max. number of sites each job is submitted to",
2135                           action="callback",
2136                           callback=self.option_handler_sites,
2137                           type="int") 
2138 
2139         # Option to set the preferred site
2140         parser.add_option("", "--site",
2141                           help="Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2142                           srm-cms.cern.ch : CH \
2143                           ccsrm.in2p3.fr : FR \
2144                           cmssrm-fzk.gridka.de : DE \
2145                           cmssrm.fnal.gov : GOV \
2146                           gridka-dCache.fzk.de : DE2 \
2147                           rm-cms.gridpp.rl.ac.uk : UK \
2148                           srm.grid.sinica.edu.tw : TW \
2149                           srm2.grid.sinica.edu.tw : TW2 \
2150                           srmcms.pic.es : ES \
2151                           storm-fe-cms.cr.cnaf.infn.it : IT",
2152                           action="callback",
2153                           callback=self.option_handler_preferred_site,
2154                           type="string") 
2155 
2156         # This is the command line flag to list all harvesting
2157         # type-to-sequence mappings.
2158         parser.add_option("-l", "--list",
2159                           help="List all harvesting types and their" \
2160                           "corresponding sequence names",
2161                           action="callback",
2162                           callback=self.option_handler_list_types)
2163 
2164         # If nothing was specified: tell the user how to do things the
2165         # next time and exit.
2166         # NOTE: We just use the OptParse standard way of doing this by
2167         #       acting as if a '--help' was specified.
2168         if len(self.cmd_line_opts) < 1:
2169             self.cmd_line_opts = ["--help"]
2170 
2171         # Some trickery with the options. Why? Well, since these
2172         # options change the output level immediately from the option
2173         # handlers, the results differ depending on where they are on
2174         # the command line. Let's just make sure they are at the
2175         # front.
2176         # NOTE: Not very efficient or sophisticated, but it works and
2177         # it only has to be done once anyway.
2178         for i in ["-d", "--debug",
2179                   "-q", "--quiet"]:
2180             if i in self.cmd_line_opts:
2181                 self.cmd_line_opts.remove(i)
2182                 self.cmd_line_opts.insert(0, i)
2183 
2184         # Everything is set up, now parse what we were given.
2185         parser.set_defaults()
2186         (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2187 
2188         # End of parse_cmd_line_options.
2189 
2190     ##########
2191 
2192     def check_input_status(self):
2193         """Check completeness and correctness of input information.
2194 
2195         Check that all required information has been specified and
2196         that, at least as far as can be easily checked, it makes
2197         sense.
2198 
2199         NOTE: This is also where any default values are applied.
2200 
2201         """
2202 
2203         self.logger.info("Checking completeness/correctness of input...")
2204 
2205         # The cmsHarvester does not take (i.e. understand) any
2206         # arguments so there should not be any.
2207         if len(self.args) > 0:
2208             msg = "Sorry but I don't understand `%s'" % \
2209                   (" ".join(self.args))
2210             self.logger.fatal(msg)
2211             raise Usage(msg)
2212 
2213         # BUG BUG BUG
2214         # While we wait for some bugs left and right to get fixed, we
2215         # disable two-step.
2216         if self.harvesting_mode == "two-step":
2217             msg = "--------------------\n" \
2218                   "  Sorry, but for the moment (well, till it works)" \
2219                   "  the two-step mode has been disabled.\n" \
2220                   "--------------------\n"
2221             self.logger.fatal(msg)
2222             raise Error(msg)
2223         # BUG BUG BUG end
2224 
2225         # We need a harvesting method to be specified
2226         if self.harvesting_type is None:
2227             msg = "Please specify a harvesting type"
2228             self.logger.fatal(msg)
2229             raise Usage(msg)
2230         # as well as a harvesting mode.
2231         if self.harvesting_mode is None:
2232             self.harvesting_mode = self.harvesting_mode_default
2233             msg = "No harvesting mode specified --> using default `%s'" % \
2234                   self.harvesting_mode
2235             self.logger.warning(msg)
2236             #raise Usage(msg)
2237 
2238         ###
2239 
2240         # We need an input method so we can find the dataset name(s).
2241         if self.input_method["datasets"]["use"] is None:
2242             msg = "Please specify an input dataset name " \
2243                   "or a list file name"
2244             self.logger.fatal(msg)
2245             raise Usage(msg)
2246 
2247         # DEBUG DEBUG DEBUG
2248         # If we get here, we should also have an input name.
2249         assert not self.input_name["datasets"]["use"] is None
2250         # DEBUG DEBUG DEBUG end
2251 
2252         ###
2253 
2254         # The same holds for the reference histogram mapping file (if
2255         # we're using references).
2256         if self.use_ref_hists:
2257             if self.ref_hist_mappings_file_name is None:
2258                 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2259                 msg = "No reference histogram mapping file specified --> " \
2260                       "using default `%s'" % \
2261                       self.ref_hist_mappings_file_name
2262                 self.logger.warning(msg)
2263 
2264         ###
2265 
2266         # We need to know where to put the stuff (okay, the results)
2267         # on CASTOR.
2268         if self.castor_base_dir is None:
2269             self.castor_base_dir = self.castor_base_dir_default
2270             msg = "No CASTOR area specified -> using default `%s'" % \
2271                   self.castor_base_dir
2272             self.logger.warning(msg)
2273             #raise Usage(msg)
2274 
2275         # Only the CERN CASTOR area is supported.
2276         if not self.castor_base_dir.startswith(self.castor_prefix):
2277             msg = "CASTOR area does not start with `%s'" % \
2278                   self.castor_prefix
2279             self.logger.fatal(msg)
2280             if self.castor_base_dir.find("castor") > -1 and \
2281                not self.castor_base_dir.find("cern.ch") > -1:
2282                 self.logger.fatal("Only CERN CASTOR is supported")
2283             raise Usage(msg)
2284 
2285         ###
2286 
2287         # TODO TODO TODO
2288         # This should be removed in the future, once I find out how to
2289         # get the config file used to create a given dataset from DBS.
2290 
2291         # For data we need to have a GlobalTag. (For MC we can figure
2292         # it out by ourselves.)
2293         if self.globaltag is None:
2294             self.logger.warning("No GlobalTag specified. This means I cannot")
2295             self.logger.warning("run on data, only on MC.")
2296             self.logger.warning("I will skip all data datasets.")
2297 
2298         # TODO TODO TODO end
2299 
2300         # Make sure the GlobalTag ends with `::All'.
2301         if not self.globaltag is None:
2302             if not self.globaltag.endswith("::All"):
2303                 self.logger.warning("Specified GlobalTag `%s' does " \
2304                                     "not end in `::All' --> " \
2305                                     "appending this missing piece" % \
2306                                     self.globaltag)
2307                 self.globaltag = "%s::All" % self.globaltag
2308 
2309         ###
2310 
2311         # Dump some info about the Frontier connections used.
2312         for (key, value) in self.frontier_connection_name.items():
2313             frontier_type_str = "unknown"
2314             if key == "globaltag":
2315                 frontier_type_str = "the GlobalTag"
2316             elif key == "refhists":
2317                 frontier_type_str = "the reference histograms"
2318             non_str = None
2319             if self.frontier_connection_overridden[key] == True:
2320                 non_str = "non-"
2321             else:
2322                 non_str = ""
2323             self.logger.info("Using %sdefault Frontier " \
2324                              "connection for %s: `%s'" % \
2325                              (non_str, frontier_type_str, value))
2326 
2327         ###
2328 
2329         # End of check_input_status.
2330 
2331     ##########
2332 
2333     def check_cmssw(self):
2334         """Check if CMSSW is setup.
2335 
2336         """
2337 
2338         # Try to access the CMSSW_VERSION environment variable. If
2339         # it's something useful we consider CMSSW to be set up
2340         # properly. Otherwise we raise an error.
2341         cmssw_version = os.getenv("CMSSW_VERSION")
2342         if cmssw_version is None:
2343             self.logger.fatal("It seems CMSSW is not setup...")
2344             self.logger.fatal("($CMSSW_VERSION is empty)")
2345             raise Error("ERROR: CMSSW needs to be setup first!")
2346 
2347         self.cmssw_version = cmssw_version
2348         self.logger.info("Found CMSSW version %s properly set up" % \
2349                           self.cmssw_version)
2350 
2351         # End of check_cmsssw.
2352         return True
2353 
2354     ##########
2355 
2356     def check_dbs(self):
2357         """Check if DBS is setup.
2358 
2359         """
2360 
2361         # Try to access the DBSCMD_HOME environment variable. If this
2362         # looks useful we consider DBS to be set up
2363         # properly. Otherwise we raise an error.
2364         dbs_home = os.getenv("DBSCMD_HOME")
2365         if dbs_home is None:
2366             self.logger.fatal("It seems DBS is not setup...")
2367             self.logger.fatal("  $DBSCMD_HOME is empty")
2368             raise Error("ERROR: DBS needs to be setup first!")
2369 
2370 ##        # Now we try to do a very simple DBS search. If that works
2371 ##        # instead of giving us the `Unsupported API call' crap, we
2372 ##        # should be good to go.
2373 ##        # NOTE: Not ideal, I know, but it reduces the amount of
2374 ##        #       complaints I get...
2375 ##        cmd = "dbs search --query=\"find dataset where dataset = impossible\""
2376 ##        (status, output) = subprocess.getstatusoutput(cmd)
2377 ##        pdb.set_trace()
2378 ##        if status != 0 or \
2379 ##           output.lower().find("unsupported api call") > -1:
2380 ##            self.logger.fatal("It seems DBS is not setup...")
2381 ##            self.logger.fatal("  %s returns crap:" % cmd)
2382 ##            for line in output.split("\n"):
2383 ##                self.logger.fatal("  %s" % line)
2384 ##            raise Error("ERROR: DBS needs to be setup first!")
2385 
2386         self.logger.debug("Found DBS properly set up")
2387 
2388         # End of check_dbs.
2389         return True
2390 
2391     ##########
2392 
2393     def setup_dbs(self):
2394         """Setup the Python side of DBS.
2395 
2396         For more information see the DBS Python API documentation:
2397         https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2398 
2399         """
2400 
2401         try:
2402             args={}
2403             args["url"]= "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2404                          "servlet/DBSServlet"
2405             api = DbsApi(args)
2406             self.dbs_api = api
2407 
2408         except DBSAPI.dbsApiException.DbsApiException as ex:
2409             self.logger.fatal("Caught DBS API exception %s: %s "  % \
2410                               (ex.getClassName(), ex.getErrorMessage()))
2411             if ex.getErrorCode() not in (None, ""):
2412                 logger.debug("DBS exception error code: ", ex.getErrorCode())
2413             raise
2414 
2415         # End of setup_dbs.
2416 
2417     ##########
2418 
2419     def dbs_resolve_dataset_name(self, dataset_name):
2420         """Use DBS to resolve a wildcarded dataset name.
2421 
2422         """
2423 
2424         # DEBUG DEBUG DEBUG
2425         # If we get here DBS should have been set up already.
2426         assert not self.dbs_api is None
2427         # DEBUG DEBUG DEBUG end
2428 
2429         # Some minor checking to make sure that whatever we've been
2430         # given as dataset name actually sounds like a dataset name.
2431         if not (dataset_name.startswith("/") and \
2432                 dataset_name.endswith("RECO")):
2433             self.logger.warning("Dataset name `%s' does not sound " \
2434                                  "like a valid dataset name!" % \
2435                                 dataset_name)
2436 
2437         #----------
2438 
2439         api = self.dbs_api
2440         dbs_query = "find dataset where dataset like %s " \
2441                     "and dataset.status = VALID" % \
2442                     dataset_name
2443         try:
2444             api_result = api.executeQuery(dbs_query)
2445         except DBSAPI.dbsApiException.DbsApiException:
2446             msg = "ERROR: Could not execute DBS query"
2447             self.logger.fatal(msg)
2448             raise Error(msg)
2449 
2450         # Setup parsing.
2451         handler = DBSXMLHandler(["dataset"])
2452         parser = xml.sax.make_parser()
2453         parser.setContentHandler(handler)
2454 
2455         # Parse.
2456         try:
2457             xml.sax.parseString(api_result, handler)
2458         except SAXParseException:
2459             msg = "ERROR: Could not parse DBS server output"
2460             self.logger.fatal(msg)
2461             raise Error(msg)
2462 
2463         # DEBUG DEBUG DEBUG
2464         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2465         # DEBUG DEBUG DEBUG end
2466 
2467         # Extract the results.
2468         datasets = handler.results.values()[0]
2469 
2470         # End of dbs_resolve_dataset_name.
2471         return datasets
2472 
2473     ##########
2474 
2475     def dbs_resolve_cmssw_version(self, dataset_name):
2476         """Ask DBS for the CMSSW version used to create this dataset.
2477 
2478         """
2479 
2480         # DEBUG DEBUG DEBUG
2481         # If we get here DBS should have been set up already.
2482         assert not self.dbs_api is None
2483         # DEBUG DEBUG DEBUG end
2484 
2485         api = self.dbs_api
2486         dbs_query = "find algo.version where dataset = %s " \
2487                     "and dataset.status = VALID" % \
2488                     dataset_name
2489         try:
2490             api_result = api.executeQuery(dbs_query)
2491         except DBSAPI.dbsApiException.DbsApiException:
2492             msg = "ERROR: Could not execute DBS query"
2493             self.logger.fatal(msg)
2494             raise Error(msg)
2495 
2496         handler = DBSXMLHandler(["algo.version"])
2497         parser = xml.sax.make_parser()
2498         parser.setContentHandler(handler)
2499 
2500         try:
2501             xml.sax.parseString(api_result, handler)
2502         except SAXParseException:
2503             msg = "ERROR: Could not parse DBS server output"
2504             self.logger.fatal(msg)
2505             raise Error(msg)
2506 
2507         # DEBUG DEBUG DEBUG
2508         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2509         # DEBUG DEBUG DEBUG end
2510 
2511         cmssw_version = handler.results.values()[0]
2512 
2513         # DEBUG DEBUG DEBUG
2514         assert len(cmssw_version) == 1
2515         # DEBUG DEBUG DEBUG end
2516 
2517         cmssw_version = cmssw_version[0]
2518 
2519         # End of dbs_resolve_cmssw_version.
2520         return cmssw_version
2521 
2522     ##########
2523 
2524 ##    def dbs_resolve_dataset_number_of_events(self, dataset_name):
2525 ##        """Ask DBS across how many events this dataset has been spread
2526 ##        out.
2527 
2528 ##        This is especially useful to check that we do not submit a job
2529 ##        supposed to run on a complete sample that is not contained at
2530 ##        a single site.
2531 
2532 ##        """
2533 
2534 ##        # DEBUG DEBUG DEBUG
2535 ##        # If we get here DBS should have been set up already.
2536 ##        assert not self.dbs_api is None
2537 ##        # DEBUG DEBUG DEBUG end
2538 
2539 ##        api = self.dbs_api
2540 ##        dbs_query = "find count(site) where dataset = %s " \
2541 ##                    "and dataset.status = VALID" % \
2542 ##                    dataset_name
2543 ##        try:
2544 ##            api_result = api.executeQuery(dbs_query)
2545 ##        except DbsApiException:
2546 ##            raise Error("ERROR: Could not execute DBS query")
2547 
2548 ##        try:
2549 ##            num_events = []
2550 ##            class Handler(xml.sax.handler.ContentHandler):
2551 ##                def startElement(self, name, attrs):
2552 ##                    if name == "result":
2553 ##                        num_events.append(str(attrs["COUNT_STORAGEELEMENT"]))
2554 ##            xml.sax.parseString(api_result, Handler())
2555 ##        except SAXParseException:
2556 ##            raise Error("ERROR: Could not parse DBS server output")
2557 
2558 ##        # DEBUG DEBUG DEBUG
2559 ##        assert len(num_events) == 1
2560 ##        # DEBUG DEBUG DEBUG end
2561 
2562 ##        num_events = int(num_events[0])
2563 
2564 ##        # End of dbs_resolve_dataset_number_of_events.
2565 ##        return num_events
2566 
2567     ##########
2568 
2569     def dbs_resolve_runs(self, dataset_name):
2570         """Ask DBS for the list of runs in a given dataset.
2571 
2572         # NOTE: This does not (yet?) skip/remove empty runs. There is
2573         # a bug in the DBS entry run.numevents (i.e. it always returns
2574         # zero) which should be fixed in the `next DBS release'.
2575         # See also:
2576         #   https://savannah.cern.ch/bugs/?53452
2577         #   https://savannah.cern.ch/bugs/?53711
2578 
2579         """
2580 
2581         # TODO TODO TODO
2582         # We should remove empty runs as soon as the above mentioned
2583         # bug is fixed.
2584         # TODO TODO TODO end
2585 
2586         # DEBUG DEBUG DEBUG
2587         # If we get here DBS should have been set up already.
2588         assert not self.dbs_api is None
2589         # DEBUG DEBUG DEBUG end
2590 
2591         api = self.dbs_api
2592         dbs_query = "find run where dataset = %s " \
2593                     "and dataset.status = VALID" % \
2594                     dataset_name
2595         try:
2596             api_result = api.executeQuery(dbs_query)
2597         except DBSAPI.dbsApiException.DbsApiException:
2598             msg = "ERROR: Could not execute DBS query"
2599             self.logger.fatal(msg)
2600             raise Error(msg)
2601 
2602         handler = DBSXMLHandler(["run"])
2603         parser = xml.sax.make_parser()
2604         parser.setContentHandler(handler)
2605 
2606         try:
2607             xml.sax.parseString(api_result, handler)
2608         except SAXParseException:
2609             msg = "ERROR: Could not parse DBS server output"
2610             self.logger.fatal(msg)
2611             raise Error(msg)
2612 
2613         # DEBUG DEBUG DEBUG
2614         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2615         # DEBUG DEBUG DEBUG end
2616 
2617         runs = handler.results.values()[0]
2618         # Turn strings into integers.
2619         runs = sorted([int(i) for i in runs])
2620 
2621         # End of dbs_resolve_runs.
2622         return runs
2623 
2624     ##########
2625 
2626     def dbs_resolve_globaltag(self, dataset_name):
2627         """Ask DBS for the globaltag corresponding to a given dataset.
2628 
2629         # BUG BUG BUG
2630         # This does not seem to work for data datasets? E.g. for
2631         # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2632         # Probaly due to the fact that the GlobalTag changed during
2633         # datataking...
2634         BUG BUG BUG end
2635 
2636         """
2637 
2638         # DEBUG DEBUG DEBUG
2639         # If we get here DBS should have been set up already.
2640         assert not self.dbs_api is None
2641         # DEBUG DEBUG DEBUG end
2642 
2643         api = self.dbs_api
2644         dbs_query = "find dataset.tag where dataset = %s " \
2645                     "and dataset.status = VALID" % \
2646                     dataset_name
2647         try:
2648             api_result = api.executeQuery(dbs_query)
2649         except DBSAPI.dbsApiException.DbsApiException:
2650             msg = "ERROR: Could not execute DBS query"
2651             self.logger.fatal(msg)
2652             raise Error(msg)
2653 
2654         handler = DBSXMLHandler(["dataset.tag"])
2655         parser = xml.sax.make_parser()
2656         parser.setContentHandler(parser)
2657 
2658         try:
2659             xml.sax.parseString(api_result, handler)
2660         except SAXParseException:
2661             msg = "ERROR: Could not parse DBS server output"
2662             self.logger.fatal(msg)
2663             raise Error(msg)
2664 
2665         # DEBUG DEBUG DEBUG
2666         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2667         # DEBUG DEBUG DEBUG end
2668 
2669         globaltag = handler.results.values()[0]
2670 
2671         # DEBUG DEBUG DEBUG
2672         assert len(globaltag) == 1
2673         # DEBUG DEBUG DEBUG end
2674 
2675         globaltag = globaltag[0]
2676 
2677         # End of dbs_resolve_globaltag.
2678         return globaltag
2679 
2680     ##########
2681 
2682     def dbs_resolve_datatype(self, dataset_name):
2683         """Ask DBS for the the data type (data or mc) of a given
2684         dataset.
2685 
2686         """
2687 
2688         # DEBUG DEBUG DEBUG
2689         # If we get here DBS should have been set up already.
2690         assert not self.dbs_api is None
2691         # DEBUG DEBUG DEBUG end
2692 
2693         api = self.dbs_api
2694         dbs_query = "find datatype.type where dataset = %s " \
2695                     "and dataset.status = VALID" % \
2696                     dataset_name
2697         try:
2698             api_result = api.executeQuery(dbs_query)
2699         except DBSAPI.dbsApiException.DbsApiException:
2700             msg = "ERROR: Could not execute DBS query"
2701             self.logger.fatal(msg)
2702             raise Error(msg)
2703 
2704         handler = DBSXMLHandler(["datatype.type"])
2705         parser = xml.sax.make_parser()
2706         parser.setContentHandler(handler)
2707 
2708         try:
2709             xml.sax.parseString(api_result, handler)
2710         except SAXParseException:
2711             msg = "ERROR: Could not parse DBS server output"
2712             self.logger.fatal(msg)
2713             raise Error(msg)
2714 
2715         # DEBUG DEBUG DEBUG
2716         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2717         # DEBUG DEBUG DEBUG end
2718 
2719         datatype = handler.results.values()[0]
2720 
2721         # DEBUG DEBUG DEBUG
2722         assert len(datatype) == 1
2723         # DEBUG DEBUG DEBUG end
2724 
2725         datatype = datatype[0]
2726 
2727         # End of dbs_resolve_datatype.
2728         return datatype
2729 
2730     ##########
2731 
2732     # OBSOLETE OBSOLETE OBSOLETE
2733     # This method is no longer used.
2734 
2735     def dbs_resolve_number_of_events(self, dataset_name, run_number=None):
2736         """Determine the number of events in a given dataset (and run).
2737 
2738         Ask DBS for the number of events in a dataset. If a run number
2739         is specified the number of events returned is that in that run
2740         of that dataset. If problems occur we throw an exception.
2741 
2742         # BUG BUG BUG
2743         # Since DBS does not return the number of events correctly,
2744         # neither for runs nor for whole datasets, we have to work
2745         # around that a bit...
2746         # BUG BUG BUG end
2747 
2748         """
2749 
2750         # DEBUG DEBUG DEBUG
2751         # If we get here DBS should have been set up already.
2752         assert not self.dbs_api is None
2753         # DEBUG DEBUG DEBUG end
2754 
2755         api = self.dbs_api
2756         dbs_query = "find file.name, file.numevents where dataset = %s " \
2757                     "and dataset.status = VALID" % \
2758                     dataset_name
2759         if not run_number is None:
2760             dbs_query = dbq_query + (" and run = %d" % run_number)
2761         try:
2762             api_result = api.executeQuery(dbs_query)
2763         except DBSAPI.dbsApiException.DbsApiException:
2764             msg = "ERROR: Could not execute DBS query"
2765             self.logger.fatal(msg)
2766             raise Error(msg)
2767 
2768         handler = DBSXMLHandler(["file.name", "file.numevents"])
2769         parser = xml.sax.make_parser()
2770         parser.setContentHandler(handler)
2771 
2772         try:
2773             xml.sax.parseString(api_result, handler)
2774         except SAXParseException:
2775             msg = "ERROR: Could not parse DBS server output"
2776             self.logger.fatal(msg)
2777             raise Error(msg)
2778 
2779         # DEBUG DEBUG DEBUG
2780         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2781         # DEBUG DEBUG DEBUG end
2782 
2783         num_events = sum(handler.results["file.numevents"])
2784 
2785         # End of dbs_resolve_number_of_events.
2786         return num_events
2787 
2788     # OBSOLETE OBSOLETE OBSOLETE end
2789 
2790     ##########
2791 
2792 ##    def dbs_resolve_dataset_number_of_sites(self, dataset_name):
2793 ##        """Ask DBS across how many sites this dataset has been spread
2794 ##        out.
2795 
2796 ##        This is especially useful to check that we do not submit a job
2797 ##        supposed to run on a complete sample that is not contained at
2798 ##        a single site.
2799 
2800 ##        """
2801 
2802 ##        # DEBUG DEBUG DEBUG
2803 ##        # If we get here DBS should have been set up already.
2804 ##        assert not self.dbs_api is None
2805 ##        # DEBUG DEBUG DEBUG end
2806 
2807 ##        api = self.dbs_api
2808 ##        dbs_query = "find count(site) where dataset = %s " \
2809 ##                    "and dataset.status = VALID" % \
2810 ##                    dataset_name
2811 ##        try:
2812 ##            api_result = api.executeQuery(dbs_query)
2813 ##        except DbsApiException:
2814 ##            raise Error("ERROR: Could not execute DBS query")
2815 
2816 ##        try:
2817 ##            num_sites = []
2818 ##            class Handler(xml.sax.handler.ContentHandler):
2819 ##                def startElement(self, name, attrs):
2820 ##                    if name == "result":
2821 ##                        num_sites.append(str(attrs["COUNT_STORAGEELEMENT"]))
2822 ##            xml.sax.parseString(api_result, Handler())
2823 ##        except SAXParseException:
2824 ##            raise Error("ERROR: Could not parse DBS server output")
2825 
2826 ##        # DEBUG DEBUG DEBUG
2827 ##        assert len(num_sites) == 1
2828 ##        # DEBUG DEBUG DEBUG end
2829 
2830 ##        num_sites = int(num_sites[0])
2831 
2832 ##        # End of dbs_resolve_dataset_number_of_sites.
2833 ##        return num_sites
2834 
2835     ##########
2836 
2837 ##    def dbs_check_dataset_spread(self, dataset_name):
2838 ##        """Figure out across how many sites this dataset is spread.
2839 
2840 ##        NOTE: This is something we need to figure out per run, since
2841 ##        we want to submit harvesting jobs per run.
2842 
2843 ##        Basically three things can happen with a given dataset:
2844 ##        - the whole dataset is available on a single site,
2845 ##        - the whole dataset is available (mirrored) at multiple sites,
2846 ##        - the dataset is spread across multiple sites and there is no
2847 ##          single site containing the full dataset in one place.
2848 
2849 ##        NOTE: If all goes well, it should not be possible that
2850 ##        anything but a _full_ dataset is mirrored. So we ignore the
2851 ##        possibility in which for example one site contains the full
2852 ##        dataset and two others mirror half of it.
2853 ##        ANOTHER NOTE: According to some people this last case _could_
2854 ##        actually happen. I will not design for it, but make sure it
2855 ##        ends up as a false negative, in which case we just loose some
2856 ##        efficiency and treat the dataset (unnecessarily) as
2857 ##        spread-out.
2858 
2859 ##        We don't really care about the first two possibilities, but in
2860 ##        the third case we need to make sure to run the harvesting in
2861 ##        two-step mode.
2862 
2863 ##        This method checks with DBS which of the above cases is true
2864 ##        for the dataset name given, and returns a 1 for the first two
2865 ##        cases, and the number of sites across which the dataset is
2866 ##        spread for the third case.
2867 
2868 ##        The way in which this is done is by asking how many files each
2869 ##        site has for the dataset. In the first case there is only one
2870 ##        site, in the second case all sites should have the same number
2871 ##        of files (i.e. the total number of files in the dataset) and
2872 ##        in the third case the file counts from all sites should add up
2873 ##        to the total file count for the dataset.
2874 
2875 ##        """
2876 
2877 ##        # DEBUG DEBUG DEBUG
2878 ##        # If we get here DBS should have been set up already.
2879 ##        assert not self.dbs_api is None
2880 ##        # DEBUG DEBUG DEBUG end
2881 
2882 ##        api = self.dbs_api
2883 ##        dbs_query = "find run, run.numevents, site, file.count " \
2884 ##                    "where dataset = %s " \
2885 ##                    "and dataset.status = VALID" % \
2886 ##                    dataset_name
2887 ##        try:
2888 ##            api_result = api.executeQuery(dbs_query)
2889 ##        except DbsApiException:
2890 ##            msg = "ERROR: Could not execute DBS query"
2891 ##            self.logger.fatal(msg)
2892 ##            raise Error(msg)
2893 
2894 ##        # Index things by run number. No cross-check is done to make
2895 ##        # sure we get results for each and every run in the
2896 ##        # dataset. I'm not sure this would make sense since we'd be
2897 ##        # cross-checking DBS info with DBS info anyway. Note that we
2898 ##        # use the file count per site to see if we're dealing with an
2899 ##        # incomplete vs. a mirrored dataset.
2900 ##        sample_info = {}
2901 ##        try:
2902 ##            class Handler(xml.sax.handler.ContentHandler):
2903 ##                def startElement(self, name, attrs):
2904 ##                    if name == "result":
2905 ##                        run_number = int(attrs["RUNS_RUNNUMBER"])
2906 ##                        site_name = str(attrs["STORAGEELEMENT_SENAME"])
2907 ##                        file_count = int(attrs["COUNT_FILES"])
2908 ##                        # BUG BUG BUG
2909 ##                        # Doh! For some reason DBS never returns any other
2910 ##                        # event count than zero.
2911 ##                        event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
2912 ##                        # BUG BUG BUG end
2913 ##                        info = (site_name, file_count, event_count)
2914 ##                        try:
2915 ##                            sample_info[run_number].append(info)
2916 ##                        except KeyError:
2917 ##                            sample_info[run_number] = [info]
2918 ##            xml.sax.parseString(api_result, Handler())
2919 ##        except SAXParseException:
2920 ##            msg = "ERROR: Could not parse DBS server output"
2921 ##            self.logger.fatal(msg)
2922 ##            raise Error(msg)
2923 
2924 ##        # Now translate this into a slightly more usable mapping.
2925 ##        sites = {}
2926 ##        for (run_number, site_info) in sample_info.items():
2927 ##            # Quick-n-dirty trick to see if all file counts are the
2928 ##            # same.
2929 ##            unique_file_counts = set([i[1] for i in site_info])
2930 ##            if len(unique_file_counts) == 1:
2931 ##                # Okay, so this must be a mirrored dataset.
2932 ##                # We have to pick one but we have to be careful. We
2933 ##                # cannot submit to things like a T0, a T1, or CAF.
2934 ##                site_names = [self.pick_a_site([i[0] for i in site_info])]
2935 ##                nevents = [site_info[0][2]]
2936 ##            else:
2937 ##                # Looks like this is a spread-out sample.
2938 ##                site_names = [i[0] for i in site_info]
2939 ##                nevents = [i[2] for i in site_info]
2940 ##            sites[run_number] = zip(site_names, nevents)
2941 
2942 ##        self.logger.debug("Sample `%s' spread is:" % dataset_name)
2943 ##        run_numbers = sites.keys()
2944 ##        run_numbers.sort()
2945 ##        for run_number in run_numbers:
2946 ##            self.logger.debug("  run # %6d: %d sites (%s)" % \
2947 ##                              (run_number,
2948 ##                               len(sites[run_number]),
2949 ##                               ", ".join([i[0] for i in sites[run_number]])))
2950 
2951 ##        # End of dbs_check_dataset_spread.
2952 ##        return sites
2953 
2954 ##    # DEBUG DEBUG DEBUG
2955 ##    # Just kept for debugging now.
2956 ##    def dbs_check_dataset_spread_old(self, dataset_name):
2957 ##        """Figure out across how many sites this dataset is spread.
2958 
2959 ##        NOTE: This is something we need to figure out per run, since
2960 ##        we want to submit harvesting jobs per run.
2961 
2962 ##        Basically three things can happen with a given dataset:
2963 ##        - the whole dataset is available on a single site,
2964 ##        - the whole dataset is available (mirrored) at multiple sites,
2965 ##        - the dataset is spread across multiple sites and there is no
2966 ##          single site containing the full dataset in one place.
2967 
2968 ##        NOTE: If all goes well, it should not be possible that
2969 ##        anything but a _full_ dataset is mirrored. So we ignore the
2970 ##        possibility in which for example one site contains the full
2971 ##        dataset and two others mirror half of it.
2972 ##        ANOTHER NOTE: According to some people this last case _could_
2973 ##        actually happen. I will not design for it, but make sure it
2974 ##        ends up as a false negative, in which case we just loose some
2975 ##        efficiency and treat the dataset (unnecessarily) as
2976 ##        spread-out.
2977 
2978 ##        We don't really care about the first two possibilities, but in
2979 ##        the third case we need to make sure to run the harvesting in
2980 ##        two-step mode.
2981 
2982 ##        This method checks with DBS which of the above cases is true
2983 ##        for the dataset name given, and returns a 1 for the first two
2984 ##        cases, and the number of sites across which the dataset is
2985 ##        spread for the third case.
2986 
2987 ##        The way in which this is done is by asking how many files each
2988 ##        site has for the dataset. In the first case there is only one
2989 ##        site, in the second case all sites should have the same number
2990 ##        of files (i.e. the total number of files in the dataset) and
2991 ##        in the third case the file counts from all sites should add up
2992 ##        to the total file count for the dataset.
2993 
2994 ##        """
2995 
2996 ##        # DEBUG DEBUG DEBUG
2997 ##        # If we get here DBS should have been set up already.
2998 ##        assert not self.dbs_api is None
2999 ##        # DEBUG DEBUG DEBUG end
3000 
3001 ##        api = self.dbs_api
3002 ##        dbs_query = "find run, run.numevents, site, file.count " \
3003 ##                    "where dataset = %s " \
3004 ##                    "and dataset.status = VALID" % \
3005 ##                    dataset_name
3006 ##        try:
3007 ##            api_result = api.executeQuery(dbs_query)
3008 ##        except DbsApiException:
3009 ##            msg = "ERROR: Could not execute DBS query"
3010 ##            self.logger.fatal(msg)
3011 ##            raise Error(msg)
3012 
3013 ##        # Index things by run number. No cross-check is done to make
3014 ##        # sure we get results for each and every run in the
3015 ##        # dataset. I'm not sure this would make sense since we'd be
3016 ##        # cross-checking DBS info with DBS info anyway. Note that we
3017 ##        # use the file count per site to see if we're dealing with an
3018 ##        # incomplete vs. a mirrored dataset.
3019 ##        sample_info = {}
3020 ##        try:
3021 ##            class Handler(xml.sax.handler.ContentHandler):
3022 ##                def startElement(self, name, attrs):
3023 ##                    if name == "result":
3024 ##                        run_number = int(attrs["RUNS_RUNNUMBER"])
3025 ##                        site_name = str(attrs["STORAGEELEMENT_SENAME"])
3026 ##                        file_count = int(attrs["COUNT_FILES"])
3027 ##                        # BUG BUG BUG
3028 ##                        # Doh! For some reason DBS never returns any other
3029 ##                        # event count than zero.
3030 ##                        event_count = int(attrs["RUNS_NUMBEROFEVENTS"])
3031 ##                        # BUG BUG BUG end
3032 ##                        info = (site_name, file_count, event_count)
3033 ##                        try:
3034 ##                            sample_info[run_number].append(info)
3035 ##                        except KeyError:
3036 ##                            sample_info[run_number] = [info]
3037 ##            xml.sax.parseString(api_result, Handler())
3038 ##        except SAXParseException:
3039 ##            msg = "ERROR: Could not parse DBS server output"
3040 ##            self.logger.fatal(msg)
3041 ##            raise Error(msg)
3042 
3043 ##        # Now translate this into a slightly more usable mapping.
3044 ##        sites = {}
3045 ##        for (run_number, site_info) in sample_info.items():
3046 ##            # Quick-n-dirty trick to see if all file counts are the
3047 ##            # same.
3048 ##            unique_file_counts = set([i[1] for i in site_info])
3049 ##            if len(unique_file_counts) == 1:
3050 ##                # Okay, so this must be a mirrored dataset.
3051 ##                # We have to pick one but we have to be careful. We
3052 ##                # cannot submit to things like a T0, a T1, or CAF.
3053 ##                site_names = [self.pick_a_site([i[0] for i in site_info])]
3054 ##                nevents = [site_info[0][2]]
3055 ##            else:
3056 ##                # Looks like this is a spread-out sample.
3057 ##                site_names = [i[0] for i in site_info]
3058 ##                nevents = [i[2] for i in site_info]
3059 ##            sites[run_number] = zip(site_names, nevents)
3060 
3061 ##        self.logger.debug("Sample `%s' spread is:" % dataset_name)
3062 ##        run_numbers = sites.keys()
3063 ##        run_numbers.sort()
3064 ##        for run_number in run_numbers:
3065 ##            self.logger.debug("  run # %6d: %d site(s) (%s)" % \
3066 ##                              (run_number,
3067 ##                               len(sites[run_number]),
3068 ##                               ", ".join([i[0] for i in sites[run_number]])))
3069 
3070 ##        # End of dbs_check_dataset_spread_old.
3071 ##        return sites
3072 ##    # DEBUG DEBUG DEBUG end
3073 
3074     ##########
3075 
3076     def dbs_check_dataset_spread(self, dataset_name):
3077         """Figure out the number of events in each run of this dataset.
3078 
3079         This is a more efficient way of doing this than calling
3080         dbs_resolve_number_of_events for each run.
3081 
3082         """
3083 
3084         self.logger.debug("Checking spread of dataset `%s'" % dataset_name)
3085 
3086         # DEBUG DEBUG DEBUG
3087         # If we get here DBS should have been set up already.
3088         assert not self.dbs_api is None
3089         # DEBUG DEBUG DEBUG end
3090 
3091         api = self.dbs_api
3092         dbs_query = "find run.number, site, file.name, file.numevents " \
3093                     "where dataset = %s " \
3094                     "and dataset.status = VALID" % \
3095                     dataset_name
3096         try:
3097             api_result = api.executeQuery(dbs_query)
3098         except DBSAPI.dbsApiException.DbsApiException:
3099             msg = "ERROR: Could not execute DBS query"
3100             self.logger.fatal(msg)
3101             raise Error(msg)
3102 
3103         handler = DBSXMLHandler(["run.number", "site", "file.name", "file.numevents"])
3104         parser = xml.sax.make_parser()
3105         parser.setContentHandler(handler)
3106 
3107         try:
3108             # OBSOLETE OBSOLETE OBSOLETE
3109 ##            class Handler(xml.sax.handler.ContentHandler):
3110 ##                def startElement(self, name, attrs):
3111 ##                    if name == "result":
3112 ##                        site_name = str(attrs["STORAGEELEMENT_SENAME"])
3113 ##                        # TODO TODO TODO
3114 ##                        # Ugly hack to get around cases like this:
3115 ##                        #   $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3116 ##                        #   Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3117 ##                        #   Processing ... \
3118 ##                        #   PATH    STORAGEELEMENT_SENAME   COUNT_FILES
3119 ##                        #   _________________________________________________________________________________
3120 ##                        #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO          1
3121 ##                        #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO  cmssrm.fnal.gov 12
3122 ##                        #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO  srm-cms.cern.ch 12
3123 ##                        if len(site_name) < 1:
3124 ##                            return
3125 ##                        # TODO TODO TODO end
3126 ##                        run_number = int(attrs["RUNS_RUNNUMBER"])
3127 ##                        file_name = str(attrs["FILES_LOGICALFILENAME"])
3128 ##                        nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3129 ##                        # I know, this is a bit of a kludge.
3130 ##                        if not files_info.has_key(run_number):
3131 ##                            # New run.
3132 ##                            files_info[run_number] = {}
3133 ##                            files_info[run_number][file_name] = (nevents,
3134 ##                                                                 [site_name])
3135 ##                        elif not files_info[run_number].has_key(file_name):
3136 ##                            # New file for a known run.
3137 ##                            files_info[run_number][file_name] = (nevents,
3138 ##                                                                 [site_name])
3139 ##                        else:
3140 ##                            # New entry for a known file for a known run.
3141 ##                            # DEBUG DEBUG DEBUG
3142 ##                            # Each file should have the same number of
3143 ##                            # events independent of the site it's at.
3144 ##                            assert nevents == files_info[run_number][file_name][0]
3145 ##                            # DEBUG DEBUG DEBUG end
3146 ##                            files_info[run_number][file_name][1].append(site_name)
3147             # OBSOLETE OBSOLETE OBSOLETE end
3148             xml.sax.parseString(api_result, handler)
3149         except SAXParseException:
3150             msg = "ERROR: Could not parse DBS server output"
3151             self.logger.fatal(msg)
3152             raise Error(msg)
3153 
3154         # DEBUG DEBUG DEBUG
3155         assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
3156         # DEBUG DEBUG DEBUG end
3157 
3158         # Now reshuffle all results a bit so we can more easily use
3159         # them later on. (Remember that all arrays in the results
3160         # should have equal length.)
3161         files_info = {}
3162         for (index, site_name) in enumerate(handler.results["site"]):
3163             # Ugly hack to get around cases like this:
3164             #   $ dbs search --query="find dataset, site, file.count where dataset=/RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO"
3165             #   Using DBS instance at: http://cmsdbsprod.cern.ch/cms_dbs_prod_global/servlet/DBSServlet
3166             #   Processing ... \
3167             #   PATH    STORAGEELEMENT_SENAME   COUNT_FILES
3168             #   _________________________________________________________________________________
3169             #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO          1
3170             #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO  cmssrm.fnal.gov 12
3171             #   /RelValQCD_Pt_3000_3500/CMSSW_3_3_0_pre1-STARTUP31X_V4-v1/GEN-SIM-RECO  srm-cms.cern.ch 12
3172             if len(site_name) < 1:
3173                 continue
3174             run_number = int(handler.results["run.number"][index])
3175             file_name = handler.results["file.name"][index]
3176             nevents = int(handler.results["file.numevents"][index])
3177 
3178             # I know, this is a bit of a kludge.
3179             if run_number not in files_info:
3180                 # New run.
3181                 files_info[run_number] = {}
3182                 files_info[run_number][file_name] = (nevents,
3183                                                      [site_name])
3184             elif file_name not in files_info[run_number]:
3185                 # New file for a known run.
3186                 files_info[run_number][file_name] = (nevents,
3187                                                      [site_name])
3188             else:
3189                 # New entry for a known file for a known run.
3190                 # DEBUG DEBUG DEBUG
3191                 # Each file should have the same number of
3192                 # events independent of the site it's at.
3193                 assert nevents == files_info[run_number][file_name][0]
3194                 # DEBUG DEBUG DEBUG end
3195                 files_info[run_number][file_name][1].append(site_name)
3196 
3197         # Remove any information for files that are not available
3198         # anywhere. NOTE: After introducing the ugly hack above, this
3199         # is a bit redundant, but let's keep it for the moment.
3200         for run_number in files_info.keys():
3201             files_without_sites = [i for (i, j) in \
3202                                    files_info[run_number].items() \
3203                                    if len(j[1]) < 1]
3204             if len(files_without_sites) > 0:
3205                 self.logger.warning("Removing %d file(s)" \
3206                                     " with empty site names" % \
3207                                     len(files_without_sites))
3208                 for file_name in files_without_sites:
3209                     del files_info[run_number][file_name]
3210                     # files_info[run_number][file_name] = (files_info \
3211                     #                                     [run_number] \
3212                     #                                      [file_name][0], [])
3213 
3214         # And another bit of a kludge.
3215         num_events_catalog = {}
3216         for run_number in files_info.keys():
3217             site_names = list(set([j for i in files_info[run_number].values() for j in i[1]]))
3218 
3219             # NOTE: The term `mirrored' does not have the usual
3220             # meaning here. It basically means that we can apply
3221             # single-step harvesting.
3222             mirrored = None
3223             if len(site_names) > 1:
3224                 # Now we somehow need to figure out if we're dealing
3225                 # with a mirrored or a spread-out dataset. The rule we
3226                 # use here is that we're dealing with a spread-out
3227                 # dataset unless we can find at least one site
3228                 # containing exactly the full list of files for this
3229                 # dataset that DBS knows about. In that case we just
3230                 # use only that site.
3231                 all_file_names = files_info[run_number].keys()
3232                 all_file_names = set(all_file_names)
3233                 sites_with_complete_copies = []
3234                 for site_name in site_names:
3235                     files_at_site = [i for (i, (j, k)) \
3236                                      in files_info[run_number].items() \
3237                                      if site_name in k]
3238                     files_at_site = set(files_at_site)
3239                     if files_at_site == all_file_names:
3240                         sites_with_complete_copies.append(site_name)
3241                 if len(sites_with_complete_copies) < 1:
3242                     # This dataset/run is available at more than one
3243                     # site, but no one has a complete copy. So this is
3244                     # a spread-out sample.
3245                     mirrored = False
3246                 else:
3247                     if len(sites_with_complete_copies) > 1:
3248                         # This sample is available (and complete) at
3249                         # more than one site. Definitely mirrored.
3250                         mirrored = True
3251                     else:
3252                         # This dataset/run is available at more than
3253                         # one site and at least one of them has a
3254                         # complete copy. Even if this is only a single
3255                         # site, let's call this `mirrored' and run the
3256                         # single-step harvesting.
3257                         mirrored = True
3258 
3259 ##                site_names_ref = set(files_info[run_number].values()[0][1])
3260 ##                for site_names_tmp in files_info[run_number].values()[1:]:
3261 ##                    if set(site_names_tmp[1]) != site_names_ref:
3262 ##                        mirrored = False
3263 ##                        break
3264 
3265                 if mirrored:
3266                     self.logger.debug("    -> run appears to be `mirrored'")
3267                 else:
3268                     self.logger.debug("    -> run appears to be spread-out")
3269 
3270                 if mirrored and \
3271                        len(sites_with_complete_copies) != len(site_names):
3272                     # Remove any references to incomplete sites if we
3273                     # have at least one complete site (and if there
3274                     # are incomplete sites).
3275                     for (file_name, (i, sites)) in files_info[run_number].items():
3276                         complete_sites = [site for site in sites \
3277                                           if site in sites_with_complete_copies]
3278                         files_info[run_number][file_name] = (i, complete_sites)
3279                     site_names = sites_with_complete_copies
3280 
3281             self.logger.debug("  for run #%d:" % run_number)
3282             num_events_catalog[run_number] = {}
3283             num_events_catalog[run_number]["all_sites"] = sum([i[0] for i in files_info[run_number].values()])
3284             if len(site_names) < 1:
3285                 self.logger.debug("    run is not available at any site")
3286                 self.logger.debug("      (but should contain %d events" % \
3287                                   num_events_catalog[run_number]["all_sites"])
3288             else:
3289                 self.logger.debug("    at all sites combined there are %d events" % \
3290                                   num_events_catalog[run_number]["all_sites"])
3291                 for site_name in site_names:
3292                     num_events_catalog[run_number][site_name] = sum([i[0] for i in files_info[run_number].values() if site_name in i[1]])
3293                     self.logger.debug("    at site `%s' there are %d events" % \
3294                                       (site_name, num_events_catalog[run_number][site_name]))
3295             num_events_catalog[run_number]["mirrored"] = mirrored
3296 
3297         # End of dbs_check_dataset_spread.
3298         return num_events_catalog
3299 
3300     # Beginning of old version.
3301 ##    def dbs_check_dataset_num_events(self, dataset_name):
3302 ##        """Figure out the number of events in each run of this dataset.
3303 
3304 ##        This is a more efficient way of doing this than calling
3305 ##        dbs_resolve_number_of_events for each run.
3306 
3307 ##        # BUG BUG BUG
3308 ##        # This might very well not work at all for spread-out samples. (?)
3309 ##        # BUG BUG BUG end
3310 
3311 ##        """
3312 
3313 ##        # DEBUG DEBUG DEBUG
3314 ##        # If we get here DBS should have been set up already.
3315 ##        assert not self.dbs_api is None
3316 ##        # DEBUG DEBUG DEBUG end
3317 
3318 ##        api = self.dbs_api
3319 ##        dbs_query = "find run.number, file.name, file.numevents where dataset = %s " \
3320 ##                    "and dataset.status = VALID" % \
3321 ##                    dataset_name
3322 ##        try:
3323 ##            api_result = api.executeQuery(dbs_query)
3324 ##        except DbsApiException:
3325 ##            msg = "ERROR: Could not execute DBS query"
3326 ##            self.logger.fatal(msg)
3327 ##            raise Error(msg)
3328 
3329 ##        try:
3330 ##            files_info = {}
3331 ##            class Handler(xml.sax.handler.ContentHandler):
3332 ##                def startElement(self, name, attrs):
3333 ##                    if name == "result":
3334 ##                        run_number = int(attrs["RUNS_RUNNUMBER"])
3335 ##                        file_name = str(attrs["FILES_LOGICALFILENAME"])
3336 ##                        nevents = int(attrs["FILES_NUMBEROFEVENTS"])
3337 ##                        try:
3338 ##                            files_info[run_number][file_name] = nevents
3339 ##                        except KeyError:
3340 ##                            files_info[run_number] = {file_name: nevents}
3341 ##            xml.sax.parseString(api_result, Handler())
3342 ##        except SAXParseException:
3343 ##            msg = "ERROR: Could not parse DBS server output"
3344 ##            self.logger.fatal(msg)
3345 ##            raise Error(msg)
3346 
3347 ##        num_events_catalog = {}
3348 ##        for run_number in files_info.keys():
3349 ##            num_events_catalog[run_number] = sum(files_info[run_number].values())
3350 
3351 ##        # End of dbs_check_dataset_num_events.
3352 ##        return num_events_catalog
3353     # End of old version.
3354 
3355     ##########
3356 
3357     def build_dataset_list(self, input_method, input_name):
3358         """Build a list of all datasets to be processed.
3359 
3360         """
3361 
3362         dataset_names = []
3363 
3364         # It may be, but only for the list of datasets to ignore, that
3365         # the input method and name are None because nothing was
3366         # specified. In that case just an empty list is returned.
3367         if input_method is None:
3368             pass
3369         elif input_method == "dataset":
3370             # Input comes from a dataset name directly on the command
3371             # line. But, this can also contain wildcards so we need
3372             # DBS to translate it conclusively into a list of explicit
3373             # dataset names.
3374             self.logger.info("Asking DBS for dataset names")
3375             dataset_names = self.dbs_resolve_dataset_name(input_name)
3376         elif input_method == "datasetfile":
3377             # In this case a file containing a list of dataset names
3378             # is specified. Still, each line may contain wildcards so
3379             # this step also needs help from DBS.
3380             # NOTE: Lines starting with a `#' are ignored.
3381             self.logger.info("Reading input from list file `%s'" % \
3382                              input_name)
3383             try:
3384                 listfile = open("/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name, "r")
3385                 print("open listfile")
3386                 for dataset in listfile:
3387                     # Skip empty lines.
3388                     dataset_stripped = dataset.strip()
3389                     if len(dataset_stripped) < 1:
3390                         continue
3391                     # Skip lines starting with a `#'.
3392                     if dataset_stripped[0] != "#":
3393                         dataset_names.extend(self. \
3394                                              dbs_resolve_dataset_name(dataset_stripped))
3395                 listfile.close()
3396             except IOError:
3397                 msg = "ERROR: Could not open input list file `%s'" % \
3398                       input_name
3399                 self.logger.fatal(msg)
3400                 raise Error(msg)
3401         else:
3402             # DEBUG DEBUG DEBUG
3403             # We should never get here.
3404             assert False, "Unknown input method `%s'" % input_method
3405             # DEBUG DEBUG DEBUG end
3406 
3407         # Remove duplicates from the dataset list.
3408         # NOTE: There should not be any duplicates in any list coming
3409         # from DBS, but maybe the user provided a list file with less
3410         # care.
3411         # Store for later use.
3412         dataset_names = sorted(set(dataset_names))
3413 
3414 
3415         # End of build_dataset_list.
3416         return dataset_names
3417 
3418     ##########
3419 
3420     def build_dataset_use_list(self):
3421         """Build a list of datasets to process.
3422 
3423         """
3424 
3425         self.logger.info("Building list of datasets to consider...")
3426 
3427         input_method = self.input_method["datasets"]["use"]
3428         input_name = self.input_name["datasets"]["use"]
3429         dataset_names = self.build_dataset_list(input_method,
3430                                                 input_name)
3431         self.datasets_to_use = dict(list(zip(dataset_names,
3432                                         [None] * len(dataset_names))))
3433 
3434         self.logger.info("  found %d dataset(s) to process:" % \
3435                          len(dataset_names))
3436         for dataset in dataset_names:
3437             self.logger.info("  `%s'" % dataset)
3438 
3439         # End of build_dataset_use_list.
3440 
3441     ##########
3442 
3443     def build_dataset_ignore_list(self):
3444         """Build a list of datasets to ignore.
3445 
3446         NOTE: We should always have a list of datasets to process, but
3447         it may be that we don't have a list of datasets to ignore.
3448 
3449         """
3450 
3451         self.logger.info("Building list of datasets to ignore...")
3452 
3453         input_method = self.input_method["datasets"]["ignore"]
3454         input_name = self.input_name["datasets"]["ignore"]
3455         dataset_names = self.build_dataset_list(input_method,
3456                                                 input_name)
3457         self.datasets_to_ignore = dict(list(zip(dataset_names,
3458                                            [None] * len(dataset_names))))
3459 
3460         self.logger.info("  found %d dataset(s) to ignore:" % \
3461                          len(dataset_names))
3462         for dataset in dataset_names:
3463             self.logger.info("  `%s'" % dataset)
3464 
3465         # End of build_dataset_ignore_list.
3466 
3467     ##########
3468 
3469     def build_runs_list(self, input_method, input_name):
3470 
3471         runs = []
3472 
3473         # A list of runs (either to use or to ignore) is not
3474         # required. This protects against `empty cases.'
3475         if input_method is None:
3476             pass
3477         elif input_method == "runs":
3478             # A list of runs was specified directly from the command
3479             # line.
3480             self.logger.info("Reading list of runs from the " \
3481                              "command line")
3482             runs.extend([int(i.strip()) \
3483                          for i in input_name.split(",") \
3484                          if len(i.strip()) > 0])
3485         elif input_method == "runslistfile":
3486             # We were passed a file containing a list of runs.
3487             self.logger.info("Reading list of runs from file `%s'" % \
3488                              input_name)
3489             try:
3490                 listfile = open(input_name, "r")
3491                 for run in listfile:
3492                     # Skip empty lines.
3493                     run_stripped = run.strip()
3494                     if len(run_stripped) < 1:
3495                         continue
3496                     # Skip lines starting with a `#'.
3497                     if run_stripped[0] != "#":
3498                         runs.append(int(run_stripped))
3499                 listfile.close()
3500             except IOError:
3501                 msg = "ERROR: Could not open input list file `%s'" % \
3502                       input_name
3503                 self.logger.fatal(msg)
3504                 raise Error(msg)
3505 
3506         else:
3507             # DEBUG DEBUG DEBUG
3508             # We should never get here.
3509             assert False, "Unknown input method `%s'" % input_method
3510             # DEBUG DEBUG DEBUG end
3511 
3512         # Remove duplicates, sort and done.
3513         runs = list(set(runs))
3514 
3515         # End of build_runs_list().
3516         return runs
3517 
3518     ##########
3519 
3520     def build_runs_use_list(self):
3521         """Build a list of runs to process.
3522 
3523         """
3524 
3525         self.logger.info("Building list of runs to consider...")
3526 
3527         input_method = self.input_method["runs"]["use"]
3528         input_name = self.input_name["runs"]["use"]
3529         runs = self.build_runs_list(input_method, input_name)
3530         self.runs_to_use = dict(list(zip(runs, [None] * len(runs))))
3531 
3532         self.logger.info("  found %d run(s) to process:" % \
3533                          len(runs))
3534         if len(runs) > 0:
3535             self.logger.info("  %s" % ", ".join([str(i) for i in runs]))
3536 
3537         # End of build_runs_list().
3538 
3539     ##########
3540 
3541     def build_runs_ignore_list(self):
3542         """Build a list of runs to ignore.
3543 
3544         NOTE: We should always have a list of runs to process, but
3545         it may be that we don't have a list of runs to ignore.
3546 
3547         """
3548 
3549         self.logger.info("Building list of runs to ignore...")
3550 
3551         input_method = self.input_method["runs"]["ignore"]
3552         input_name = self.input_name["runs"]["ignore"]
3553         runs = self.build_runs_list(input_method, input_name)
3554         self.runs_to_ignore = dict(list(zip(runs, [None] * len(runs))))
3555 
3556         self.logger.info("  found %d run(s) to ignore:" % \
3557                          len(runs))
3558         if len(runs) > 0:
3559             self.logger.info("  %s" % ", ".join([str(i) for i in runs]))
3560 
3561         # End of build_runs_ignore_list().
3562 
3563     ##########
3564 
3565     def process_dataset_ignore_list(self):
3566         """Update the list of datasets taking into account the ones to
3567         ignore.
3568 
3569         Both lists have been generated before from DBS and both are
3570         assumed to be unique.
3571 
3572         NOTE: The advantage of creating the ignore list from DBS (in
3573         case a regexp is given) and matching that instead of directly
3574         matching the ignore criterion against the list of datasets (to
3575         consider) built from DBS is that in the former case we're sure
3576         that all regexps are treated exactly as DBS would have done
3577         without the cmsHarvester.
3578 
3579         NOTE: This only removes complete samples. Exclusion of single
3580         runs is done by the book keeping. So the assumption is that a
3581         user never wants to harvest just part (i.e. n out of N runs)
3582         of a sample.
3583 
3584         """
3585 
3586         self.logger.info("Processing list of datasets to ignore...")
3587 
3588         self.logger.debug("Before processing ignore list there are %d " \
3589                           "datasets in the list to be processed" % \
3590                           len(self.datasets_to_use))
3591 
3592         # Simple approach: just loop and search.
3593         dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3594         for dataset_name in self.datasets_to_use.keys():
3595             if dataset_name in self.datasets_to_ignore.keys():
3596                 del dataset_names_filtered[dataset_name]
3597 
3598         self.logger.info("  --> Removed %d dataset(s)" % \
3599                          (len(self.datasets_to_use) -
3600                           len(dataset_names_filtered)))
3601 
3602         self.datasets_to_use = dataset_names_filtered
3603 
3604         self.logger.debug("After processing ignore list there are %d " \
3605                           "datasets in the list to be processed" % \
3606                           len(self.datasets_to_use))
3607 
3608     # End of process_dataset_ignore_list.
3609 
3610     ##########
3611 
3612     def process_runs_use_and_ignore_lists(self):
3613 
3614         self.logger.info("Processing list of runs to use and ignore...")
3615 
3616         # This basically adds all runs in a dataset to be processed,
3617         # except for any runs that are not specified in the `to use'
3618         # list and any runs that are specified in the `to ignore'
3619         # list.
3620 
3621         # NOTE: It is assumed that those lists make sense. The input
3622         # should be checked against e.g. overlapping `use' and
3623         # `ignore' lists.
3624 
3625         runs_to_use = self.runs_to_use
3626         runs_to_ignore = self.runs_to_ignore
3627 
3628         for dataset_name in self.datasets_to_use:
3629             runs_in_dataset = self.datasets_information[dataset_name]["runs"]
3630 
3631             # First some sanity checks.
3632             runs_to_use_tmp = []
3633             for run in runs_to_use:
3634                 if not run in runs_in_dataset:
3635                     self.logger.warning("Dataset `%s' does not contain " \
3636                                         "requested run %d " \
3637                                         "--> ignoring `use' of this run" % \
3638                                         (dataset_name, run))
3639                 else:
3640                     runs_to_use_tmp.append(run)
3641 
3642             if len(runs_to_use) > 0:
3643                 runs = runs_to_use_tmp
3644                 self.logger.info("Using %d out of %d runs " \
3645                                  "of dataset `%s'" % \
3646                                  (len(runs), len(runs_in_dataset),
3647                                   dataset_name))
3648             else:
3649                 runs = runs_in_dataset
3650 
3651             if len(runs_to_ignore) > 0:
3652                 runs_tmp = []
3653                 for run in runs:
3654                     if not run in runs_to_ignore:
3655                         runs_tmp.append(run)
3656                 self.logger.info("Ignoring %d out of %d runs " \
3657                                  "of dataset `%s'" % \
3658                                  (len(runs)- len(runs_tmp),
3659                                   len(runs_in_dataset),
3660                                   dataset_name))
3661                 runs = runs_tmp
3662 
3663             if self.todofile != "YourToDofile.txt":
3664                 runs_todo = []
3665                 print("Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile)
3666                 cmd="grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3667                 (status, output)=subprocess.getstatusoutput(cmd)
3668                 for run in runs:
3669                     run_str="%s" %run
3670                     if run_str in output:
3671                         runs_todo.append(run)
3672                 self.logger.info("Using %d runs " \
3673                                  "of dataset `%s'" % \
3674                                  (len(runs_todo),
3675                                   dataset_name))
3676                 runs=runs_todo
3677 
3678             Json_runs = []
3679             if self.Jsonfilename != "YourJSON.txt":
3680                 good_runs = []
3681                 self.Jsonlumi = True
3682                 # We were passed a Jsonfile containing a dictionary of
3683                 # run/lunisection-pairs
3684                 self.logger.info("Reading runs and lumisections from file `%s'" % \
3685                                 self.Jsonfilename)
3686                 try:
3687                     Jsonfile = open(self.Jsonfilename, "r")
3688                     for names in Jsonfile:
3689                         dictNames= eval(str(names))
3690                         for key in dictNames:
3691                             intkey=int(key)
3692                             Json_runs.append(intkey)
3693                     Jsonfile.close()
3694                 except IOError:
3695                     msg = "ERROR: Could not open Jsonfile `%s'" % \
3696                           input_name
3697                     self.logger.fatal(msg)
3698                     raise Error(msg)
3699                 for run in runs:
3700                     if run in Json_runs:
3701                         good_runs.append(run)
3702                 self.logger.info("Using %d runs " \
3703                                  "of dataset `%s'" % \
3704                                  (len(good_runs),
3705                                   dataset_name))
3706                 runs=good_runs
3707             if (self.Jsonrunfilename != "YourJSON.txt") and (self.Jsonfilename == "YourJSON.txt"):
3708                 good_runs = []
3709                 # We were passed a Jsonfile containing a dictionary of
3710                 # run/lunisection-pairs
3711                 self.logger.info("Reading runs from file `%s'" % \
3712                                 self.Jsonrunfilename)
3713                 try:
3714                     Jsonfile = open(self.Jsonrunfilename, "r")
3715                     for names in Jsonfile:
3716                         dictNames= eval(str(names))
3717                         for key in dictNames:
3718                             intkey=int(key)
3719                             Json_runs.append(intkey)
3720                     Jsonfile.close()
3721                 except IOError:
3722                     msg = "ERROR: Could not open Jsonfile `%s'" % \
3723                           input_name
3724                     self.logger.fatal(msg)
3725                     raise Error(msg)
3726                 for run in runs:
3727                     if run in Json_runs: 
3728                         good_runs.append(run)
3729                 self.logger.info("Using %d runs " \
3730                                  "of dataset `%s'" % \
3731                                  (len(good_runs),
3732                                   dataset_name))
3733                 runs=good_runs
3734 
3735             self.datasets_to_use[dataset_name] = runs
3736 
3737         # End of process_runs_use_and_ignore_lists().
3738 
3739     ##########
3740 
3741     def singlify_datasets(self):
3742         """Remove all but the largest part of all datasets.
3743 
3744         This allows us to harvest at least part of these datasets
3745         using single-step harvesting until the two-step approach
3746         works.
3747 
3748         """
3749 
3750         # DEBUG DEBUG DEBUG
3751         assert self.harvesting_mode == "single-step-allow-partial"
3752         # DEBUG DEBUG DEBUG end
3753 
3754         for dataset_name in self.datasets_to_use:
3755             for run_number in self.datasets_information[dataset_name]["runs"]:
3756                 max_events = max(self.datasets_information[dataset_name]["sites"][run_number].values())
3757                 sites_with_max_events = [i[0] for i in self.datasets_information[dataset_name]["sites"][run_number].items() if i[1] == max_events]
3758                 self.logger.warning("Singlifying dataset `%s', " \
3759                                     "run %d" % \
3760                                     (dataset_name, run_number))
3761                 cmssw_version = self.datasets_information[dataset_name] \
3762                                 ["cmssw_version"]
3763                 selected_site = self.pick_a_site(sites_with_max_events,
3764                                                  cmssw_version)
3765 
3766                 # Let's tell the user that we're manhandling this dataset.
3767                 nevents_old = self.datasets_information[dataset_name]["num_events"][run_number]
3768                 self.logger.warning("  --> " \
3769                                     "only harvesting partial statistics: " \
3770                                     "%d out of %d events (5.1%f%%) " \
3771                                     "at site `%s'" % \
3772                                     (max_events,
3773                                      nevents_old,
3774                                      100. * max_events / nevents_old,
3775                                      selected_site))
3776                 self.logger.warning("!!! Please note that the number of " \
3777                                     "events in the output path name will " \
3778                                     "NOT reflect the actual statistics in " \
3779                                     "the harvested results !!!")
3780 
3781                 # We found the site with the highest statistics and
3782                 # the corresponding number of events. (CRAB gets upset
3783                 # if we ask for more events than there are at a given
3784                 # site.) Now update this information in our main
3785                 # datasets_information variable.
3786                 self.datasets_information[dataset_name]["sites"][run_number] = {selected_site: max_events}
3787                 self.datasets_information[dataset_name]["num_events"][run_number] = max_events
3788                 #self.datasets_information[dataset_name]["sites"][run_number] = [selected_site]
3789 
3790         # End of singlify_datasets.
3791 
3792     ##########
3793 
3794     def check_dataset_list(self):
3795         """Check list of dataset names for impossible ones.
3796 
3797         Two kinds of checks are done:
3798         - Checks for things that do not make sense. These lead to
3799           errors and skipped datasets.
3800         - Sanity checks. For these warnings are issued but the user is
3801           considered to be the authoritative expert.
3802 
3803         Checks performed:
3804         - The CMSSW version encoded in the dataset name should match
3805           self.cmssw_version. This is critical.
3806         - There should be some events in the dataset/run. This is
3807           critical in the sense that CRAB refuses to create jobs for
3808           zero events. And yes, this does happen in practice. E.g. the
3809           reprocessed CRAFT08 datasets contain runs with zero events.
3810         - A cursory check is performed to see if the harvesting type
3811           makes sense for the data type. This should prevent the user
3812           from inadvertently running RelVal for data.
3813         - It is not possible to run single-step harvesting jobs on
3814           samples that are not fully contained at a single site.
3815         - Each dataset/run has to be available at at least one site.
3816 
3817         """
3818 
3819         self.logger.info("Performing sanity checks on dataset list...")
3820 
3821         dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3822 
3823         for dataset_name in self.datasets_to_use.keys():
3824 
3825             # Check CMSSW version.
3826             version_from_dataset = self.datasets_information[dataset_name] \
3827                                    ["cmssw_version"]
3828             if version_from_dataset != self.cmssw_version:
3829                 msg = "  CMSSW version mismatch for dataset `%s' " \
3830                       "(%s vs. %s)" % \
3831                       (dataset_name,
3832                        self.cmssw_version, version_from_dataset)
3833                 if self.force_running:
3834                     # Expert mode: just warn, then continue.
3835                     self.logger.warning("%s " \
3836                                         "--> `force mode' active: " \
3837                                         "run anyway" % msg)
3838                 else:
3839                     del dataset_names_after_checks[dataset_name]
3840                     self.logger.warning("%s " \
3841                                         "--> skipping" % msg)
3842                     continue
3843 
3844             ###
3845 
3846             # Check that the harvesting type makes sense for the
3847             # sample. E.g. normally one would not run the DQMOffline
3848             # harvesting on Monte Carlo.
3849             # TODO TODO TODO
3850             # This should be further refined.
3851             suspicious = False
3852             datatype = self.datasets_information[dataset_name]["datatype"]
3853             if datatype == "data":
3854                 # Normally only DQM harvesting is run on data.
3855                 if self.harvesting_type != "DQMOffline":
3856                     suspicious = True
3857             elif datatype == "mc":
3858                 if self.harvesting_type == "DQMOffline":
3859                     suspicious = True
3860             else:
3861                 # Doh!
3862                 assert False, "ERROR Impossible data type `%s' " \
3863                        "for dataset `%s'" % \
3864                        (datatype, dataset_name)
3865             if suspicious:
3866                 msg = "  Normally one does not run `%s' harvesting " \
3867                       "on %s samples, are you sure?" % \
3868                       (self.harvesting_type, datatype)
3869                 if self.force_running:
3870                     self.logger.warning("%s " \
3871                                         "--> `force mode' active: " \
3872                                         "run anyway" % msg)
3873                 else:
3874                     del dataset_names_after_checks[dataset_name]
3875                     self.logger.warning("%s " \
3876                                         "--> skipping" % msg)
3877                     continue
3878 
3879             # TODO TODO TODO end
3880 
3881             ###
3882 
3883             # BUG BUG BUG
3884             # For the moment, due to a problem with DBS, I cannot
3885             # figure out the GlobalTag for data by myself. (For MC
3886             # it's no problem.) This means that unless a GlobalTag was
3887             # specified from the command line, we will have to skip
3888             # any data datasets.
3889 
3890             if datatype == "data":
3891                 if self.globaltag is None:
3892                     msg = "For data datasets (like `%s') " \
3893                           "we need a GlobalTag" % \
3894                           dataset_name
3895                     del dataset_names_after_checks[dataset_name]
3896                     self.logger.warning("%s " \
3897                                         "--> skipping" % msg)
3898                     continue
3899 
3900             # BUG BUG BUG end
3901 
3902             ###
3903 
3904             # Check if the GlobalTag exists and (if we're using
3905             # reference histograms) if it's ready to be used with
3906             # reference histograms.
3907             globaltag = self.datasets_information[dataset_name]["globaltag"]
3908             if not globaltag in self.globaltag_check_cache:
3909                 if self.check_globaltag(globaltag):
3910                     self.globaltag_check_cache.append(globaltag)
3911                 else:
3912                     msg = "Something is wrong with GlobalTag `%s' " \
3913                           "used by dataset `%s'!" % \
3914                           (globaltag, dataset_name)
3915                     if self.use_ref_hists:
3916                         msg += "\n(Either it does not exist or it " \
3917                                "does not contain the required key to " \
3918                                "be used with reference histograms.)"
3919                     else:
3920                         msg += "\n(It probably just does not exist.)"
3921                     self.logger.fatal(msg)
3922                     raise Usage(msg)
3923 
3924             ###
3925 
3926             # Require that each run is available at least somewhere.
3927             runs_without_sites = [i for (i, j) in \
3928                                   self.datasets_information[dataset_name] \
3929                                   ["sites"].items() \
3930                                   if len(j) < 1 and \
3931                                   i in self.datasets_to_use[dataset_name]]
3932             if len(runs_without_sites) > 0:
3933                 for run_without_sites in runs_without_sites:
3934                     try:
3935                         dataset_names_after_checks[dataset_name].remove(run_without_sites)
3936                     except KeyError:
3937                         pass
3938                 self.logger.warning("  removed %d unavailable run(s) " \
3939                                     "from dataset `%s'" % \
3940                                     (len(runs_without_sites), dataset_name))
3941                 self.logger.debug("    (%s)" % \
3942                                   ", ".join([str(i) for i in \
3943                                              runs_without_sites]))
3944 
3945             ###
3946 
3947             # Unless we're running two-step harvesting: only allow
3948             # samples located on a single site.
3949             if not self.harvesting_mode == "two-step":
3950                 for run_number in self.datasets_to_use[dataset_name]:
3951                     # DEBUG DEBUG DEBUG
3952 ##                    if self.datasets_information[dataset_name]["num_events"][run_number] != 0:
3953 ##                        pdb.set_trace()
3954                     # DEBUG DEBUG DEBUG end
3955                     num_sites = len(self.datasets_information[dataset_name] \
3956                                 ["sites"][run_number])
3957                     if num_sites > 1 and \
3958                            not self.datasets_information[dataset_name] \
3959                            ["mirrored"][run_number]:
3960                         # Cannot do this with a single-step job, not
3961                         # even in force mode. It just does not make
3962                         # sense.
3963                         msg = "  Dataset `%s', run %d is spread across more " \
3964                               "than one site.\n" \
3965                               "  Cannot run single-step harvesting on " \
3966                               "samples spread across multiple sites" % \
3967                               (dataset_name, run_number)
3968                         try:
3969                             dataset_names_after_checks[dataset_name].remove(run_number)
3970                         except KeyError:
3971                             pass
3972                         self.logger.warning("%s " \
3973                                             "--> skipping" % msg)
3974 
3975             ###
3976 
3977             # Require that the dataset/run is non-empty.
3978             # NOTE: To avoid reconsidering empty runs/datasets next
3979             # time around, we do include them in the book keeping.
3980             # BUG BUG BUG
3981             # This should sum only over the runs that we use!
3982             tmp = [j for (i, j) in self.datasets_information \
3983                    [dataset_name]["num_events"].items() \
3984                    if i in self.datasets_to_use[dataset_name]]
3985             num_events_dataset = sum(tmp)
3986             # BUG BUG BUG end
3987             if num_events_dataset < 1:
3988                 msg = "  dataset `%s' is empty" % dataset_name
3989                 del dataset_names_after_checks[dataset_name]
3990                 self.logger.warning("%s " \
3991                                     "--> skipping" % msg)
3992                 # Update the book keeping with all the runs in the dataset.
3993                 # DEBUG DEBUG DEBUG
3994                 #assert set([j for (i, j) in self.datasets_information \
3995                 #            [dataset_name]["num_events"].items() \
3996                 #            if i in self.datasets_to_use[dataset_name]]) == \
3997                 #           set([0])
3998                 # DEBUG DEBUG DEBUG end
3999                 #self.book_keeping_information[dataset_name] = self.datasets_information \
4000                 #                                              [dataset_name]["num_events"]
4001                 continue
4002 
4003             tmp = [i for i in \
4004                    self.datasets_information[dataset_name] \
4005                    ["num_events"].items() if i[1] < 1]
4006             tmp = [i for i in tmp if i[0] in self.datasets_to_use[dataset_name]]
4007             empty_runs = dict(tmp)
4008             if len(empty_runs) > 0:
4009                 for empty_run in empty_runs:
4010                     try:
4011                         dataset_names_after_checks[dataset_name].remove(empty_run)
4012                     except KeyError:
4013                         pass
4014                 self.logger.info("  removed %d empty run(s) from dataset `%s'" % \
4015                                  (len(empty_runs), dataset_name))
4016                 self.logger.debug("    (%s)" % \
4017                                   ", ".join([str(i) for i in empty_runs]))
4018 
4019         ###
4020 
4021         # If we emptied out a complete dataset, remove the whole
4022         # thing.
4023         dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4024         for (dataset_name, runs) in dataset_names_after_checks.items():
4025             if len(runs) < 1:
4026                 self.logger.warning("  Removing dataset without any runs " \
4027                                     "(left) `%s'" % \
4028                                     dataset_name)
4029                 del dataset_names_after_checks_tmp[dataset_name]
4030         dataset_names_after_checks = dataset_names_after_checks_tmp
4031 
4032         ###
4033 
4034         self.logger.warning("  --> Removed %d dataset(s)" % \
4035                             (len(self.datasets_to_use) -
4036                              len(dataset_names_after_checks)))
4037 
4038         # Now store the modified version of the dataset list.
4039         self.datasets_to_use = dataset_names_after_checks
4040 
4041         # End of check_dataset_list.
4042 
4043     ##########
4044 
4045     def escape_dataset_name(self, dataset_name):
4046         """Escape a DBS dataset name.
4047 
4048         Escape a DBS dataset name such that it does not cause trouble
4049         with the file system. This means turning each `/' into `__',
4050         except for the first one which is just removed.
4051 
4052         """
4053 
4054         escaped_dataset_name = dataset_name
4055         escaped_dataset_name = escaped_dataset_name.strip("/")
4056         escaped_dataset_name = escaped_dataset_name.replace("/", "__")
4057 
4058         return escaped_dataset_name
4059 
4060     ##########
4061 
4062     # BUG BUG BUG
4063     # This is a bit of a redundant method, isn't it?
4064     def create_config_file_name(self, dataset_name, run_number):
4065         """Generate the name of the configuration file to be run by
4066         CRAB.
4067 
4068         Depending on the harvesting mode (single-step or two-step)
4069         this is the name of the real harvesting configuration or the
4070         name of the first-step ME summary extraction configuration.
4071 
4072         """
4073 
4074         if self.harvesting_mode == "single-step":
4075             config_file_name = self.create_harvesting_config_file_name(dataset_name)
4076         elif self.harvesting_mode == "single-step-allow-partial":
4077             config_file_name = self.create_harvesting_config_file_name(dataset_name)
4078 ##            # Only add the alarming piece to the file name if this is
4079 ##            # a spread-out dataset.
4080 ##            pdb.set_trace()
4081 ##            if self.datasets_information[dataset_name] \
4082 ##                   ["mirrored"][run_number] == False:
4083 ##                config_file_name = config_file_name.replace(".py", "_partial.py")
4084         elif self.harvesting_mode == "two-step":
4085             config_file_name = self.create_me_summary_config_file_name(dataset_name)
4086         else:
4087             assert False, "ERROR Unknown harvesting mode `%s'" % \
4088                    self.harvesting_mode
4089 
4090         # End of create_config_file_name.
4091         return config_file_name
4092     # BUG BUG BUG end
4093 
4094     ##########
4095 
4096     def create_harvesting_config_file_name(self, dataset_name):
4097         "Generate the name to be used for the harvesting config file."
4098 
4099         file_name_base = "harvesting.py"
4100         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4101         config_file_name = file_name_base.replace(".py",
4102                                                   "_%s.py" % \
4103                                                   dataset_name_escaped)
4104 
4105         # End of create_harvesting_config_file_name.
4106         return config_file_name
4107 
4108     ##########
4109 
4110     def create_me_summary_config_file_name(self, dataset_name):
4111         "Generate the name of the ME summary extraction config file."
4112 
4113         file_name_base = "me_extraction.py"
4114         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4115         config_file_name = file_name_base.replace(".py",
4116                                                   "_%s.py" % \
4117                                                   dataset_name_escaped)
4118 
4119         # End of create_me_summary_config_file_name.
4120         return config_file_name
4121 
4122     ##########
4123 
4124     def create_output_file_name(self, dataset_name, run_number=None):
4125         """Create the name of the output file name to be used.
4126 
4127         This is the name of the output file of the `first step'. In
4128         the case of single-step harvesting this is already the final
4129         harvesting output ROOT file. In the case of two-step
4130         harvesting it is the name of the intermediary ME summary
4131         file.
4132 
4133         """
4134 
4135         # BUG BUG BUG
4136         # This method has become a bit of a mess. Originally it was
4137         # nice to have one entry point for both single- and two-step
4138         # output file names. However, now the former needs the run
4139         # number, while the latter does not even know about run
4140         # numbers. This should be fixed up a bit.
4141         # BUG BUG BUG end
4142 
4143         if self.harvesting_mode == "single-step":
4144             # DEBUG DEBUG DEBUG
4145             assert not run_number is None
4146             # DEBUG DEBUG DEBUG end
4147             output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4148         elif self.harvesting_mode == "single-step-allow-partial":
4149             # DEBUG DEBUG DEBUG
4150             assert not run_number is None
4151             # DEBUG DEBUG DEBUG end
4152             output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4153         elif self.harvesting_mode == "two-step":
4154             # DEBUG DEBUG DEBUG
4155             assert run_number is None
4156             # DEBUG DEBUG DEBUG end
4157             output_file_name = self.create_me_summary_output_file_name(dataset_name)
4158         else:
4159             # This should not be possible, but hey...
4160             assert False, "ERROR Unknown harvesting mode `%s'" % \
4161                    self.harvesting_mode
4162 
4163         # End of create_harvesting_output_file_name.
4164         return output_file_name
4165 
4166     ##########
4167 
4168     def create_harvesting_output_file_name(self, dataset_name, run_number):
4169         """Generate the name to be used for the harvesting output file.
4170 
4171         This harvesting output file is the _final_ ROOT output file
4172         containing the harvesting results. In case of two-step
4173         harvesting there is an intermediate ME output file as well.
4174 
4175         """
4176 
4177         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4178 
4179         # Hmmm, looking at the code for the DQMFileSaver this might
4180         # actually be the place where the first part of this file
4181         # naming scheme comes from.
4182         # NOTE: It looks like the `V0001' comes from the DQM
4183         # version. This is something that cannot be looked up from
4184         # here, so let's hope it does not change too often.
4185         output_file_name = "DQM_V0001_R%09d__%s.root" % \
4186                            (run_number, dataset_name_escaped)
4187         if self.harvesting_mode.find("partial") > -1:
4188             # Only add the alarming piece to the file name if this is
4189             # a spread-out dataset.
4190             if self.datasets_information[dataset_name] \
4191                    ["mirrored"][run_number] == False:
4192                 output_file_name = output_file_name.replace(".root", \
4193                                                             "_partial.root")
4194 
4195         # End of create_harvesting_output_file_name.
4196         return output_file_name
4197 
4198     ##########
4199 
4200     def create_me_summary_output_file_name(self, dataset_name):
4201         """Generate the name of the intermediate ME file name to be
4202         used in two-step harvesting.
4203 
4204         """
4205 
4206         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4207         output_file_name = "me_summary_%s.root" % \
4208                            dataset_name_escaped
4209 
4210         # End of create_me_summary_output_file_name.
4211         return output_file_name
4212 
4213     ##########
4214 
4215     def create_multicrab_block_name(self, dataset_name, run_number, index):
4216         """Create the block name to use for this dataset/run number.
4217 
4218         This is what appears in the brackets `[]' in multicrab.cfg. It
4219         is used as the name of the job and to create output
4220         directories.
4221 
4222         """
4223 
4224         dataset_name_escaped = self.escape_dataset_name(dataset_name)
4225         block_name = "%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4226 
4227         # End of create_multicrab_block_name.
4228         return block_name
4229 
4230     ##########
4231 
4232     def create_crab_config(self):
4233         """Create a CRAB configuration for a given job.
4234 
4235         NOTE: This is _not_ a complete (as in: submittable) CRAB
4236         configuration. It is used to store the common settings for the
4237         multicrab configuration.
4238 
4239         NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4240 
4241         NOTE: According to CRAB, you `Must define exactly two of
4242         total_number_of_events, events_per_job, or
4243         number_of_jobs.'. For single-step harvesting we force one job,
4244         for the rest we don't really care.
4245 
4246         # BUG BUG BUG
4247         # With the current version of CRAB (2.6.1), in which Daniele
4248         # fixed the behaviour of no_block_boundary for me, one _has to
4249         # specify_ the total_number_of_events and one single site in
4250         # the se_white_list.
4251         # BUG BUG BUG end
4252 
4253         """
4254 
4255         tmp = []
4256 
4257         # This is the stuff we will need to fill in.
4258         castor_prefix = self.castor_prefix
4259 
4260         tmp.append(self.config_file_header())
4261         tmp.append("")
4262 
4263         ## CRAB
4264         ##------
4265         tmp.append("[CRAB]")
4266         tmp.append("jobtype = cmssw")
4267         tmp.append("")
4268 
4269         ## GRID
4270         ##------
4271         tmp.append("[GRID]")
4272         tmp.append("virtual_organization=cms")
4273         tmp.append("")
4274 
4275         ## USER
4276         ##------
4277         tmp.append("[USER]")
4278         tmp.append("copy_data = 1")
4279         tmp.append("")
4280 
4281         ## CMSSW
4282         ##-------
4283         tmp.append("[CMSSW]")
4284         tmp.append("# This reveals data hosted on T1 sites,")
4285         tmp.append("# which is normally hidden by CRAB.")
4286         tmp.append("show_prod = 1")
4287         tmp.append("number_of_jobs = 1")
4288         if self.Jsonlumi == True:
4289             tmp.append("lumi_mask = %s" % self.Jsonfilename)
4290             tmp.append("total_number_of_lumis = -1")
4291         else:
4292             if self.harvesting_type == "DQMOffline":
4293                 tmp.append("total_number_of_lumis = -1")
4294             else:
4295                 tmp.append("total_number_of_events = -1")
4296         if self.harvesting_mode.find("single-step") > -1:
4297             tmp.append("# Force everything to run in one job.")
4298             tmp.append("no_block_boundary = 1")
4299         tmp.append("")
4300 
4301         ## CAF
4302         ##-----
4303         tmp.append("[CAF]")
4304 
4305         crab_config = "\n".join(tmp)
4306 
4307         # End of create_crab_config.
4308         return crab_config
4309 
4310     ##########
4311 
4312     def create_multicrab_config(self):
4313         """Create a multicrab.cfg file for all samples.
4314 
4315         This creates the contents for a multicrab.cfg file that uses
4316         the crab.cfg file (generated elsewhere) for the basic settings
4317         and contains blocks for each run of each dataset.
4318 
4319         # BUG BUG BUG
4320         # The fact that it's necessary to specify the se_white_list
4321         # and the total_number_of_events is due to our use of CRAB
4322         # version 2.6.1. This should no longer be necessary in the
4323         # future.
4324         # BUG BUG BUG end
4325 
4326         """
4327 
4328         cmd="who i am | cut -f1 -d' '"
4329         (status, output)=subprocess.getstatusoutput(cmd)
4330         UserName = output
4331 
4332         if self.caf_access == True:
4333             print("Extracting %s as user name" %UserName) 
4334 
4335         number_max_sites = self.nr_max_sites + 1
4336 
4337         multicrab_config_lines = []
4338         multicrab_config_lines.append(self.config_file_header())
4339         multicrab_config_lines.append("")
4340         multicrab_config_lines.append("[MULTICRAB]")
4341         multicrab_config_lines.append("cfg = crab.cfg")
4342         multicrab_config_lines.append("")
4343 
4344         dataset_names = sorted(self.datasets_to_use.keys())
4345 
4346         for dataset_name in dataset_names:
4347             runs = self.datasets_to_use[dataset_name]
4348             dataset_name_escaped = self.escape_dataset_name(dataset_name)
4349             castor_prefix = self.castor_prefix
4350 
4351             for run in runs:
4352 
4353                 # CASTOR output dir.
4354                 castor_dir = self.datasets_information[dataset_name] \
4355                                  ["castor_path"][run]
4356 
4357                 cmd = "rfdir %s" % castor_dir
4358                 (status, output) = subprocess.getstatusoutput(cmd)
4359 
4360                 if len(output) <= 0:
4361 
4362                     # DEBUG DEBUG DEBUG
4363                     # We should only get here if we're treating a
4364                     # dataset/run that is fully contained at a single
4365                     # site.
4366                     assert (len(self.datasets_information[dataset_name] \
4367                             ["sites"][run]) == 1) or \
4368                             self.datasets_information[dataset_name]["mirrored"]
4369                     # DEBUG DEBUG DEBUG end
4370 
4371                     site_names = self.datasets_information[dataset_name] \
4372                              ["sites"][run].keys()
4373 
4374                     for i in range(1, number_max_sites, 1):
4375                         if len(site_names) > 0: 
4376                             index = "site_%02d" % (i)
4377 
4378                             config_file_name = self. \
4379                                        create_config_file_name(dataset_name, run)
4380                             output_file_name = self. \
4381                                        create_output_file_name(dataset_name, run)
4382 
4383 
4384                             # If we're looking at a mirrored dataset we just pick
4385                             # one of the sites. Otherwise there is nothing to
4386                             # choose.
4387 
4388                             # Loop variable
4389                             loop = 0
4390 
4391                             if len(site_names) > 1:
4392                                 cmssw_version = self.datasets_information[dataset_name] \
4393                                             ["cmssw_version"]
4394                                 self.logger.info("Picking site for mirrored dataset " \
4395                                              "`%s', run %d" % \
4396                                              (dataset_name, run))
4397                                 site_name = self.pick_a_site(site_names, cmssw_version)
4398                                 if site_name in site_names:
4399                                     site_names.remove(site_name)
4400 
4401                             else:
4402                                 site_name = site_names[0]
4403                                 site_names.remove(site_name)
4404 
4405                             if site_name is self.no_matching_site_found_str:
4406                                 if loop < 1:
4407                                     break
4408 
4409                             nevents = self.datasets_information[dataset_name]["num_events"][run]
4410 
4411                             # The block name.
4412                             multicrab_block_name = self.create_multicrab_block_name( \
4413                                 dataset_name, run, index)
4414                             multicrab_config_lines.append("[%s]" % \
4415                                                       multicrab_block_name)
4416 
4417                             ## CRAB
4418                             ##------
4419                             if site_name == "caf.cern.ch":
4420                                 multicrab_config_lines.append("CRAB.use_server=0")
4421                                 multicrab_config_lines.append("CRAB.scheduler=caf")
4422                             else:
4423                                 multicrab_config_lines.append("scheduler = glite")
4424 
4425                             ## GRID
4426                             ##------
4427                             if site_name == "caf.cern.ch":
4428                                 pass
4429                             else:
4430                                 multicrab_config_lines.append("GRID.se_white_list = %s" % \
4431                                                       site_name)
4432                                 multicrab_config_lines.append("# This removes the default blacklisting of T1 sites.")
4433                                 multicrab_config_lines.append("GRID.remove_default_blacklist = 1")
4434                                 multicrab_config_lines.append("GRID.rb = CERN")
4435                                 if not self.non_t1access:
4436                                     multicrab_config_lines.append("GRID.role = t1access")
4437 
4438                             ## USER
4439                             ##------
4440 
4441                             castor_dir = castor_dir.replace(castor_prefix, "")
4442                             multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4443                             multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4444                                                       castor_dir)
4445                             multicrab_config_lines.append("USER.check_user_remote_dir=0")
4446 
4447                             if site_name == "caf.cern.ch":
4448                                 multicrab_config_lines.append("USER.storage_path=%s" % castor_prefix)
4449                                 #multicrab_config_lines.append("USER.storage_element=T2_CH_CAF")
4450                                 #castor_dir = castor_dir.replace("/cms/store/caf/user/%s" %UserName, "")
4451                                 #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4452                                 #             castor_dir)
4453                             else:
4454                                 multicrab_config_lines.append("USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4455                                 #multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4456                                 #             castor_dir)
4457                                 #multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4458 
4459                             ## CMSSW
4460                             ##-------
4461                             multicrab_config_lines.append("CMSSW.pset = %s" % \
4462                                                        config_file_name)
4463                             multicrab_config_lines.append("CMSSW.datasetpath = %s" % \
4464                                                       dataset_name)
4465                             multicrab_config_lines.append("CMSSW.runselection = %d" % \
4466                                                   run)
4467 
4468                             if self.Jsonlumi == True:
4469                                 pass
4470                             else:
4471                                 if self.harvesting_type == "DQMOffline":
4472                                     pass
4473                                 else:
4474                                     multicrab_config_lines.append("CMSSW.total_number_of_events = %d" % \
4475                                                                   nevents)
4476                             # The output file name.
4477                             multicrab_config_lines.append("CMSSW.output_file = %s" % \
4478                                                       output_file_name)
4479 
4480                             ## CAF
4481                             ##-----
4482                             if site_name == "caf.cern.ch":
4483                                 multicrab_config_lines.append("CAF.queue=cmscaf1nd")
4484 
4485 
4486                             # End of block.
4487                             multicrab_config_lines.append("")
4488 
4489                             loop = loop + 1
4490 
4491                             self.all_sites_found = True
4492 
4493         multicrab_config = "\n".join(multicrab_config_lines)
4494 
4495         # End of create_multicrab_config.
4496         return multicrab_config
4497 
4498     ##########
4499 
4500     def check_globaltag(self, globaltag=None):
4501         """Check if globaltag exists.
4502 
4503         Check if globaltag exists as GlobalTag in the database given
4504         by self.frontier_connection_name['globaltag']. If globaltag is
4505         None, self.globaltag is used instead.
4506 
4507         If we're going to use reference histograms this method also
4508         checks for the existence of the required key in the GlobalTag.
4509 
4510         """
4511 
4512         if globaltag is None:
4513             globaltag = self.globaltag
4514 
4515         # All GlobalTags should end in `::All', right?
4516         if globaltag.endswith("::All"):
4517             globaltag = globaltag[:-5]
4518 
4519         connect_name = self.frontier_connection_name["globaltag"]
4520         # BUG BUG BUG
4521         # There is a bug in cmscond_tagtree_list: some magic is
4522         # missing from the implementation requiring one to specify
4523         # explicitly the name of the squid to connect to. Since the
4524         # cmsHarvester can only be run from the CERN network anyway,
4525         # cmsfrontier:8000 is hard-coded in here. Not nice but it
4526         # works.
4527         connect_name = connect_name.replace("frontier://",
4528                                             "frontier://cmsfrontier:8000/")
4529         # BUG BUG BUG end
4530         connect_name += self.db_account_name_cms_cond_globaltag()
4531 
4532         tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4533 
4534         #----------
4535 
4536         tag_contains_ref_hist_key = False
4537         if self.use_ref_hists and tag_exists:
4538             # Check for the key required to use reference histograms.
4539             tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4540 
4541         #----------
4542 
4543         if self.use_ref_hists:
4544             ret_val = tag_exists and tag_contains_ref_hist_key
4545         else:
4546             ret_val = tag_exists
4547 
4548         #----------
4549 
4550         # End of check_globaltag.
4551         return ret_val
4552 
4553     ##########
4554 
4555     def check_globaltag_exists(self, globaltag, connect_name):
4556         """Check if globaltag exists.
4557 
4558         """
4559 
4560         self.logger.info("Checking existence of GlobalTag `%s'" % \
4561                          globaltag)
4562         self.logger.debug("  (Using database connection `%s')" % \
4563                           connect_name)
4564 
4565         cmd = "cmscond_tagtree_list -c %s -T %s" % \
4566               (connect_name, globaltag)
4567         (status, output) = subprocess.getstatusoutput(cmd)
4568         if status != 0 or \
4569                output.find("error") > -1:
4570             msg = "Could not check existence of GlobalTag `%s' in `%s'" % \
4571                   (globaltag, connect_name)
4572             if output.find(".ALL_TABLES not found") > -1:
4573                 msg = "%s\n" \
4574                       "Missing database account `%s'" % \
4575                       (msg, output.split(".ALL_TABLES")[0].split()[-1])
4576             self.logger.fatal(msg)
4577             self.logger.debug("Command used:")
4578             self.logger.debug("  %s" % cmd)
4579             self.logger.debug("Output received:")
4580             self.logger.debug(output)
4581             raise Error(msg)
4582         if output.find("does not exist") > -1:
4583             self.logger.debug("GlobalTag `%s' does not exist in `%s':" % \
4584                               (globaltag, connect_name))
4585             self.logger.debug("Output received:")
4586             self.logger.debug(output)
4587             tag_exists = False
4588         else:
4589             tag_exists = True
4590         self.logger.info("  GlobalTag exists? -> %s" % tag_exists)
4591 
4592         # End of check_globaltag_exists.
4593         return tag_exists
4594 
4595     ##########
4596 
4597     def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name):
4598         """Check if globaltag contains the required RefHistos key.
4599 
4600         """
4601 
4602         # Check for the key required to use reference histograms.
4603         tag_contains_key = None
4604         ref_hist_key = "RefHistos"
4605         self.logger.info("Checking existence of reference " \
4606                          "histogram key `%s' in GlobalTag `%s'" % \
4607                          (ref_hist_key, globaltag))
4608         self.logger.debug("  (Using database connection `%s')" % \
4609                               connect_name)
4610         cmd = "cmscond_tagtree_list -c %s -T %s -n %s" % \
4611               (connect_name, globaltag, ref_hist_key)
4612         (status, output) = subprocess.getstatusoutput(cmd)
4613         if status != 0 or \
4614                output.find("error") > -1:
4615             msg = "Could not check existence of key `%s'" % \
4616                   (ref_hist_key, connect_name)
4617             self.logger.fatal(msg)
4618             self.logger.debug("Command used:")
4619             self.logger.debug("  %s" % cmd)
4620             self.logger.debug("Output received:")
4621             self.logger.debug("  %s" % output)
4622             raise Error(msg)
4623         if len(output) < 1:
4624             self.logger.debug("Required key for use of reference " \
4625                               "histograms `%s' does not exist " \
4626                               "in GlobalTag `%s':" % \
4627                               (ref_hist_key, globaltag))
4628             self.logger.debug("Output received:")
4629             self.logger.debug(output)
4630             tag_contains_key = False
4631         else:
4632             tag_contains_key = True
4633 
4634         self.logger.info("  GlobalTag contains `%s' key? -> %s" % \
4635                          (ref_hist_key, tag_contains_key))
4636 
4637         # End of check_globaltag_contains_ref_hist_key.
4638         return tag_contains_key
4639 
4640     ##########
4641 
4642     def check_ref_hist_tag(self, tag_name):
4643         """Check the existence of tag_name in database connect_name.
4644 
4645         Check if tag_name exists as a reference histogram tag in the
4646         database given by self.frontier_connection_name['refhists'].
4647 
4648         """
4649 
4650         connect_name = self.frontier_connection_name["refhists"]
4651         connect_name += self.db_account_name_cms_cond_dqm_summary()
4652 
4653         self.logger.debug("Checking existence of reference " \
4654                           "histogram tag `%s'" % \
4655                           tag_name)
4656         self.logger.debug("  (Using database connection `%s')" % \
4657                           connect_name)
4658 
4659         cmd = "cmscond_list_iov -c %s" % \
4660               connect_name
4661         (status, output) = subprocess.getstatusoutput(cmd)
4662         if status != 0:
4663             msg = "Could not check existence of tag `%s' in `%s'" % \
4664                   (tag_name, connect_name)
4665             self.logger.fatal(msg)
4666             self.logger.debug("Command used:")
4667             self.logger.debug("  %s" % cmd)
4668             self.logger.debug("Output received:")
4669             self.logger.debug(output)
4670             raise Error(msg)
4671         if not tag_name in output.split():
4672             self.logger.debug("Reference histogram tag `%s' " \
4673                               "does not exist in `%s'" % \
4674                               (tag_name, connect_name))
4675             self.logger.debug(" Existing tags: `%s'" % \
4676                               "', `".join(output.split()))
4677             tag_exists = False
4678         else:
4679             tag_exists = True
4680         self.logger.debug("  Reference histogram tag exists? " \
4681                           "-> %s" % tag_exists)
4682 
4683         # End of check_ref_hist_tag.
4684         return tag_exists
4685 
4686     ##########
4687 
4688     def create_harvesting_config(self, dataset_name):
4689         """Create the Python harvesting configuration for harvesting.
4690 
4691         The basic configuration is created by
4692         Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4693         what cmsDriver.py does.) After that we add some specials
4694         ourselves.
4695 
4696         NOTE: On one hand it may not be nice to circumvent
4697         cmsDriver.py, on the other hand cmsDriver.py does not really
4698         do anything itself. All the real work is done by the
4699         ConfigBuilder so there is not much risk that we miss out on
4700         essential developments of cmsDriver in the future.
4701 
4702         """
4703 
4704         # Setup some options needed by the ConfigBuilder.
4705         config_options = defaultOptions
4706 
4707         # These are fixed for all kinds of harvesting jobs. Some of
4708         # them are not needed for the harvesting config, but to keep
4709         # the ConfigBuilder happy.
4710         config_options.name = "harvesting"
4711         config_options.scenario = "pp"
4712         config_options.number = 1
4713         config_options.arguments = self.ident_string()
4714         config_options.evt_type = config_options.name
4715         config_options.customisation_file = None
4716         config_options.filein = "dummy_value"
4717         config_options.filetype = "EDM"
4718         # This seems to be new in CMSSW 3.3.X, no clue what it does.
4719         config_options.gflash = "dummy_value"
4720         # This seems to be new in CMSSW 3.3.0.pre6, no clue what it
4721         # does.
4722         #config_options.himix = "dummy_value"
4723         config_options.dbsquery = ""
4724 
4725         ###
4726 
4727         # These options depend on the type of harvesting we're doing
4728         # and are stored in self.harvesting_info.
4729 
4730         config_options.step = "HARVESTING:%s" % \
4731                               self.harvesting_info[self.harvesting_type] \
4732                               ["step_string"]
4733         config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4734                                   ["beamspot"]
4735         config_options.eventcontent = self.harvesting_info \
4736                                       [self.harvesting_type] \
4737                                       ["eventcontent"]
4738         config_options.harvesting = self.harvesting_info \
4739                                     [self.harvesting_type] \
4740                                     ["harvesting"]
4741 
4742         ###
4743 
4744         # This one is required (see also above) for each dataset.
4745 
4746         datatype = self.datasets_information[dataset_name]["datatype"]
4747         config_options.isMC = (datatype.lower() == "mc")
4748         config_options.isData = (datatype.lower() == "data")
4749         globaltag = self.datasets_information[dataset_name]["globaltag"]
4750 
4751         config_options.conditions = self.format_conditions_string(globaltag)
4752 
4753         ###
4754 
4755         if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4756             # This is the case for 3.3.X.
4757             config_builder = ConfigBuilder(config_options, with_input=True)
4758         else:
4759             # This is the case in older CMSSW versions.
4760             config_builder = ConfigBuilder(config_options)
4761         config_builder.prepare(True)
4762         config_contents = config_builder.pythonCfgCode
4763 
4764         ###
4765 
4766         # Add our signature to the top of the configuration.  and add
4767         # some markers to the head and the tail of the Python code
4768         # generated by the ConfigBuilder.
4769         marker_lines = []
4770         sep = "#" * 30
4771         marker_lines.append(sep)
4772         marker_lines.append("# Code between these markers was generated by")
4773         marker_lines.append("# Configuration.PyReleaseValidation." \
4774                             "ConfigBuilder")
4775 
4776         marker_lines.append(sep)
4777         marker = "\n".join(marker_lines)
4778 
4779         tmp = [self.config_file_header()]
4780         tmp.append("")
4781         tmp.append(marker)
4782         tmp.append("")
4783         tmp.append(config_contents)
4784         tmp.append("")
4785         tmp.append(marker)
4786         tmp.append("")
4787         config_contents = "\n".join(tmp)
4788 
4789         ###
4790 
4791         # Now we add some stuff of our own.
4792         customisations = [""]
4793 
4794         customisations.append("# Now follow some customisations")
4795         customisations.append("")
4796         connect_name = self.frontier_connection_name["globaltag"]
4797         connect_name += self.db_account_name_cms_cond_globaltag()
4798         customisations.append("process.GlobalTag.connect = \"%s\"" % \
4799                               connect_name)
4800 
4801 
4802         if self.saveByLumiSection == True:
4803             customisations.append("process.dqmSaver.saveByLumiSection = 1")
4804         ##
4805         ##
4806 
4807         customisations.append("")
4808 
4809         # About the reference histograms... For data there is only one
4810         # set of references and those are picked up automatically
4811         # based on the GlobalTag. For MC we have to do some more work
4812         # since the reference histograms to be used depend on the MC
4813         # sample at hand. In this case we glue in an es_prefer snippet
4814         # to pick up the references. We do this only for RelVals since
4815         # for MC there are no meaningful references so far.
4816 
4817         # NOTE: Due to the lack of meaningful references for
4818         # MC samples reference histograms are explicitly
4819         # switched off in this case.
4820 
4821         use_es_prefer = (self.harvesting_type == "RelVal")
4822         use_refs = use_es_prefer or \
4823                    (not self.harvesting_type == "MC")
4824         # Allow global override.
4825         use_refs = use_refs and self.use_ref_hists
4826 
4827         if not use_refs:
4828             # Disable reference histograms explicitly. The histograms
4829             # are loaded by the dqmRefHistoRootFileGetter
4830             # EDAnalyzer. This analyzer can be run from several
4831             # sequences. Here we remove it from each sequence that
4832             # exists.
4833             customisations.append("print \"Not using reference histograms\"")
4834             customisations.append("if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4835             customisations.append("    for (sequence_name, sequence) in process.sequences.items():")
4836             customisations.append("        if sequence.remove(process.dqmRefHistoRootFileGetter):")
4837             customisations.append("            print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4838             customisations.append("                  sequence_name")
4839             customisations.append("process.dqmSaver.referenceHandling = \"skip\"")
4840         else:
4841             # This makes sure all reference histograms are saved to
4842             # the output ROOT file.
4843             customisations.append("process.dqmSaver.referenceHandling = \"all\"")
4844             if use_es_prefer:
4845                 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4846                 customisations.append(es_prefer_snippet)
4847 
4848         # Make sure we get the `workflow' correct. As far as I can see
4849         # this is only important for the output file name.
4850         workflow_name = dataset_name
4851         if self.harvesting_mode == "single-step-allow-partial":
4852             workflow_name += "_partial"
4853         customisations.append("process.dqmSaver.workflow = \"%s\"" % \
4854                               workflow_name)
4855 
4856         # BUG BUG BUG
4857         # This still does not work. The current two-step harvesting
4858         # efforts are on hold waiting for the solution to come from
4859         # elsewhere. (In this case the elsewhere is Daniele Spiga.)
4860 
4861 ##        # In case this file is the second step (the real harvesting
4862 ##        # step) of the two-step harvesting we have to tell it to use
4863 ##        # our local files.
4864 ##        if self.harvesting_mode == "two-step":
4865 ##            castor_dir = self.datasets_information[dataset_name] \
4866 ##                         ["castor_path"][run]
4867 ##            customisations.append("")
4868 ##            customisations.append("# This is the second step (the real")
4869 ##            customisations.append("# harvesting step) of a two-step")
4870 ##            customisations.append("# harvesting procedure.")
4871 ##            # BUG BUG BUG
4872 ##            # To be removed in production version.
4873 ##            customisations.append("import pdb")
4874 ##            # BUG BUG BUG end
4875 ##            customisations.append("import subprocess")
4876 ##            customisations.append("import os")
4877 ##            customisations.append("castor_dir = \"%s\"" % castor_dir)
4878 ##            customisations.append("cmd = \"rfdir %s\" % castor_dir")
4879 ##            customisations.append("(status, output) = subprocess.getstatusoutput(cmd)")
4880 ##            customisations.append("if status != 0:")
4881 ##            customisations.append("    print \"ERROR\"")
4882 ##            customisations.append("    raise Exception, \"ERROR\"")
4883 ##            customisations.append("file_names = [os.path.join(\"rfio:%s\" % path, i) for i in output.split() if i.startswith(\"EDM_summary\") and i.endswith(\".root\")]")
4884 ##            #customisations.append("pdb.set_trace()")
4885 ##            customisations.append("process.source.fileNames = cms.untracked.vstring(*file_names)")
4886 ##            customisations.append("")
4887 
4888         # BUG BUG BUG end
4889 
4890         config_contents = config_contents + "\n".join(customisations)
4891 
4892         ###
4893 
4894         # End of create_harvesting_config.
4895         return config_contents
4896 
4897 ##    ##########
4898 
4899 ##    def create_harvesting_config_two_step(self, dataset_name):
4900 ##        """Create the Python harvesting configuration for two-step
4901 ##        harvesting.
4902 
4903 ##        """
4904 
4905 ##        # BUG BUG BUG
4906 ##        config_contents = self.create_harvesting_config_single_step(dataset_name)
4907 ##        # BUG BUG BUG end
4908 
4909 ##        # End of create_harvesting_config_two_step.
4910 ##        return config_contents
4911 
4912     ##########
4913 
4914     def create_me_extraction_config(self, dataset_name):
4915         """
4916 
4917         """
4918 
4919         # Big chunk of hard-coded Python. Not such a big deal since
4920         # this does not do much and is not likely to break.
4921         tmp = []
4922         tmp.append(self.config_file_header())
4923         tmp.append("")
4924         tmp.append("import FWCore.ParameterSet.Config as cms")
4925         tmp.append("")
4926         tmp.append("process = cms.Process(\"ME2EDM\")")
4927         tmp.append("")
4928         tmp.append("# Import of standard configurations")
4929         tmp.append("process.load(\"Configuration/EventContent/EventContent_cff\")")
4930         tmp.append("")
4931         tmp.append("# We don't really process any events, just keep this set to one to")
4932         tmp.append("# make sure things work.")
4933         tmp.append("process.maxEvents = cms.untracked.PSet(")
4934         tmp.append("    input = cms.untracked.int32(1)")
4935         tmp.append("    )")
4936         tmp.append("")
4937         tmp.append("process.options = cms.untracked.PSet(")
4938         tmp.append("    Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4939         tmp.append("    )")
4940         tmp.append("")
4941         tmp.append("process.source = cms.Source(\"PoolSource\",")
4942         tmp.append("                            processingMode = \\")
4943         tmp.append("                            cms.untracked.string(\"RunsAndLumis\"),")
4944         tmp.append("                            fileNames = \\")
4945         tmp.append("                            cms.untracked.vstring(\"no_file_specified\")")
4946         tmp.append("                            )")
4947         tmp.append("")
4948         tmp.append("# Output definition: drop everything except for the monitoring.")
4949         tmp.append("process.output = cms.OutputModule(")
4950         tmp.append("    \"PoolOutputModule\",")
4951         tmp.append("    outputCommands = \\")
4952         tmp.append("    cms.untracked.vstring(\"drop *\", \\")
4953         tmp.append("                          \"keep *_MEtoEDMConverter_*_*\"),")
4954         output_file_name = self. \
4955                            create_output_file_name(dataset_name)
4956         tmp.append("    fileName = \\")
4957         tmp.append("    cms.untracked.string(\"%s\")," % output_file_name)
4958         tmp.append("    dataset = cms.untracked.PSet(")
4959         tmp.append("    dataTier = cms.untracked.string(\"RECO\"),")
4960         tmp.append("    filterName = cms.untracked.string(\"\")")
4961         tmp.append("    )")
4962         tmp.append("    )")
4963         tmp.append("")
4964         tmp.append("# Additional output definition")
4965         tmp.append("process.out_step = cms.EndPath(process.output)")
4966         tmp.append("")
4967         tmp.append("# Schedule definition")
4968         tmp.append("process.schedule = cms.Schedule(process.out_step)")
4969         tmp.append("")
4970 
4971         config_contents = "\n".join(tmp)
4972 
4973         # End of create_me_extraction_config.
4974         return config_contents
4975 
4976     ##########
4977 
4978 ##    def create_harvesting_config(self, dataset_name):
4979 ##        """Create the Python harvesting configuration for a given job.
4980 
4981 ##        NOTE: The reason to have a single harvesting configuration per
4982 ##        sample is to be able to specify the GlobalTag corresponding to
4983 ##        each sample. Since it has been decided that (apart from the
4984 ##        prompt reco) datasets cannot contain runs with different
4985 ##        GlobalTags, we don't need a harvesting config per run.
4986 
4987 ##        NOTE: This is the place where we distinguish between
4988 ##        single-step and two-step harvesting modes (at least for the
4989 ##        Python job configuration).
4990 
4991 ##        """
4992 
4993 ##        ###
4994 
4995 ##        if self.harvesting_mode == "single-step":
4996 ##            config_contents = self.create_harvesting_config_single_step(dataset_name)
4997 ##        elif self.harvesting_mode == "two-step":
4998 ##            config_contents = self.create_harvesting_config_two_step(dataset_name)
4999 ##        else:
5000 ##            # Impossible harvesting mode, we should never get here.
5001 ##            assert False, "ERROR: unknown harvesting mode `%s'" % \
5002 ##                   self.harvesting_mode
5003 
5004 ##        ###
5005 
5006 ##        # End of create_harvesting_config.
5007 ##        return config_contents
5008 
5009     ##########
5010 
5011     def write_crab_config(self):
5012         """Write a CRAB job configuration Python file.
5013 
5014         """
5015 
5016         self.logger.info("Writing CRAB configuration...")
5017 
5018         file_name_base = "crab.cfg"
5019 
5020         # Create CRAB configuration.
5021         crab_contents = self.create_crab_config()
5022 
5023         # Write configuration to file.
5024         crab_file_name = file_name_base
5025         try:
5026             crab_file = file(crab_file_name, "w")
5027             crab_file.write(crab_contents)
5028             crab_file.close()
5029         except IOError:
5030             self.logger.fatal("Could not write " \
5031                               "CRAB configuration to file `%s'" % \
5032                               crab_file_name)
5033             raise Error("ERROR: Could not write to file `%s'!" % \
5034                         crab_file_name)
5035 
5036         # End of write_crab_config.
5037 
5038     ##########
5039 
5040     def write_multicrab_config(self):
5041         """Write a multi-CRAB job configuration Python file.
5042 
5043         """
5044 
5045         self.logger.info("Writing multi-CRAB configuration...")
5046 
5047         file_name_base = "multicrab.cfg"
5048 
5049         # Create multi-CRAB configuration.
5050         multicrab_contents = self.create_multicrab_config()
5051 
5052         # Write configuration to file.
5053         multicrab_file_name = file_name_base
5054         try:
5055             multicrab_file = file(multicrab_file_name, "w")
5056             multicrab_file.write(multicrab_contents)
5057             multicrab_file.close()
5058         except IOError:
5059             self.logger.fatal("Could not write " \
5060                               "multi-CRAB configuration to file `%s'" % \
5061                               multicrab_file_name)
5062             raise Error("ERROR: Could not write to file `%s'!" % \
5063                         multicrab_file_name)
5064 
5065         # End of write_multicrab_config.
5066 
5067     ##########
5068 
5069     def write_harvesting_config(self, dataset_name):
5070         """Write a harvesting job configuration Python file.
5071 
5072         NOTE: This knows nothing about single-step or two-step
5073         harvesting. That's all taken care of by
5074         create_harvesting_config.
5075 
5076         """
5077 
5078         self.logger.debug("Writing harvesting configuration for `%s'..." % \
5079                           dataset_name)
5080 
5081         # Create Python configuration.
5082         config_contents = self.create_harvesting_config(dataset_name)
5083 
5084         # Write configuration to file.
5085         config_file_name = self. \
5086                            create_harvesting_config_file_name(dataset_name)
5087         try:
5088             config_file = file(config_file_name, "w")
5089             config_file.write(config_contents)
5090             config_file.close()
5091         except IOError:
5092             self.logger.fatal("Could not write " \
5093                               "harvesting configuration to file `%s'" % \
5094                               config_file_name)
5095             raise Error("ERROR: Could not write to file `%s'!" % \
5096                         config_file_name)
5097 
5098         # End of write_harvesting_config.
5099 
5100     ##########
5101 
5102     def write_me_extraction_config(self, dataset_name):
5103         """Write an ME-extraction configuration Python file.
5104 
5105         This `ME-extraction' (ME = Monitoring Element) is the first
5106         step of the two-step harvesting.
5107 
5108         """
5109 
5110         self.logger.debug("Writing ME-extraction configuration for `%s'..." % \
5111                           dataset_name)
5112 
5113         # Create Python configuration.
5114         config_contents = self.create_me_extraction_config(dataset_name)
5115 
5116         # Write configuration to file.
5117         config_file_name = self. \
5118                            create_me_summary_config_file_name(dataset_name)
5119         try:
5120             config_file = file(config_file_name, "w")
5121             config_file.write(config_contents)
5122             config_file.close()
5123         except IOError:
5124             self.logger.fatal("Could not write " \
5125                               "ME-extraction configuration to file `%s'" % \
5126                               config_file_name)
5127             raise Error("ERROR: Could not write to file `%s'!" % \
5128                         config_file_name)
5129 
5130         # End of write_me_extraction_config.
5131 
5132     ##########
5133 
5134 
5135     def ref_hist_mappings_needed(self, dataset_name=None):
5136         """Check if we need to load and check the reference mappings.
5137 
5138         For data the reference histograms should be taken
5139         automatically from the GlobalTag, so we don't need any
5140         mappings. For RelVals we need to know a mapping to be used in
5141         the es_prefer code snippet (different references for each of
5142         the datasets.)
5143 
5144         WARNING: This implementation is a bit convoluted.
5145 
5146         """
5147 
5148         # If no dataset name given, do everything, otherwise check
5149         # only this one dataset.
5150         if not dataset_name is None:
5151             data_type = self.datasets_information[dataset_name] \
5152                         ["datatype"]
5153             mappings_needed = (data_type == "mc")
5154             # DEBUG DEBUG DEBUG
5155             if not mappings_needed:
5156                 assert data_type == "data"
5157             # DEBUG DEBUG DEBUG end
5158         else:
5159             tmp = [self.ref_hist_mappings_needed(dataset_name) \
5160                    for dataset_name in \
5161                    self.datasets_information.keys()]
5162             mappings_needed = (True in tmp)
5163 
5164         # End of ref_hist_mappings_needed.
5165         return mappings_needed
5166 
5167     ##########
5168 
5169     def load_ref_hist_mappings(self):
5170         """Load the reference histogram mappings from file.
5171 
5172         The dataset name to reference histogram name mappings are read
5173         from a text file specified in self.ref_hist_mappings_file_name.
5174 
5175         """
5176 
5177         # DEBUG DEBUG DEBUG
5178         assert len(self.ref_hist_mappings) < 1, \
5179                "ERROR Should not be RE-loading " \
5180                "reference histogram mappings!"
5181         # DEBUG DEBUG DEBUG end
5182 
5183         self.logger.info("Loading reference histogram mappings " \
5184                          "from file `%s'" % \
5185                          self.ref_hist_mappings_file_name)
5186 
5187         mappings_lines = None
5188         try:
5189             mappings_file = file(self.ref_hist_mappings_file_name, "r")
5190             mappings_lines = mappings_file.readlines()
5191             mappings_file.close()
5192         except IOError:
5193             msg = "ERROR: Could not open reference histogram mapping "\
5194                   "file `%s'" % self.ref_hist_mappings_file_name
5195             self.logger.fatal(msg)
5196             raise Error(msg)
5197 
5198         ##########
5199 
5200         # The format we expect is: two white-space separated pieces
5201         # per line. The first the dataset name for which the reference
5202         # should be used, the second one the name of the reference
5203         # histogram in the database.
5204 
5205         for mapping in mappings_lines:
5206             # Skip comment lines.
5207             if not mapping.startswith("#"):
5208                 mapping = mapping.strip()
5209                 if len(mapping) > 0:
5210                     mapping_pieces = mapping.split()
5211                     if len(mapping_pieces) != 2:
5212                         msg = "ERROR: The reference histogram mapping " \
5213                               "file contains a line I don't " \
5214                               "understand:\n  %s" % mapping
5215                         self.logger.fatal(msg)
5216                         raise Error(msg)
5217                     dataset_name = mapping_pieces[0].strip()
5218                     ref_hist_name = mapping_pieces[1].strip()
5219                     # We don't want people to accidentally specify
5220                     # multiple mappings for the same dataset. Just
5221                     # don't accept those cases.
5222                     if dataset_name in self.ref_hist_mappings:
5223                         msg = "ERROR: The reference histogram mapping " \
5224                               "file contains multiple mappings for " \
5225                               "dataset `%s'."
5226                         self.logger.fatal(msg)
5227                         raise Error(msg)
5228 
5229                     # All is well that ends well.
5230                     self.ref_hist_mappings[dataset_name] = ref_hist_name
5231 
5232         ##########
5233 
5234         self.logger.info("  Successfully loaded %d mapping(s)" % \
5235                          len(self.ref_hist_mappings))
5236         max_len = max([len(i) for i in self.ref_hist_mappings.keys()])
5237         for (map_from, map_to) in self.ref_hist_mappings.items():
5238             self.logger.info("    %-*s -> %s" % \
5239                               (max_len, map_from, map_to))
5240 
5241         # End of load_ref_hist_mappings.
5242 
5243     ##########
5244 
5245     def check_ref_hist_mappings(self):
5246         """Make sure all necessary reference histograms exist.
5247 
5248         Check that for each of the datasets to be processed a
5249         reference histogram is specified and that that histogram
5250         exists in the database.
5251 
5252         NOTE: There's a little complication here. Since this whole
5253         thing was designed to allow (in principle) harvesting of both
5254         data and MC datasets in one go, we need to be careful to check
5255         the availability fof reference mappings only for those
5256         datasets that need it.
5257 
5258         """
5259 
5260         self.logger.info("Checking reference histogram mappings")
5261 
5262         for dataset_name in self.datasets_to_use:
5263             try:
5264                 ref_hist_name = self.ref_hist_mappings[dataset_name]
5265             except KeyError:
5266                 msg = "ERROR: No reference histogram mapping found " \
5267                       "for dataset `%s'" % \
5268                       dataset_name
5269                 self.logger.fatal(msg)
5270                 raise Error(msg)
5271 
5272             if not self.check_ref_hist_tag(ref_hist_name):
5273                 msg = "Reference histogram tag `%s' " \
5274                       "(used for dataset `%s') does not exist!" % \
5275                       (ref_hist_name, dataset_name)
5276                 self.logger.fatal(msg)
5277                 raise Usage(msg)
5278 
5279         self.logger.info("  Done checking reference histogram mappings.")
5280 
5281         # End of check_ref_hist_mappings.
5282 
5283     ##########
5284 
5285     def build_datasets_information(self):
5286         """Obtain all information on the datasets that we need to run.
5287 
5288         Use DBS to figure out all required information on our
5289         datasets, like the run numbers and the GlobalTag. All
5290         information is stored in the datasets_information member
5291         variable.
5292 
5293         """
5294 
5295         # Get a list of runs in the dataset.
5296         # NOTE: The harvesting has to be done run-by-run, so we
5297         # split up datasets based on the run numbers. Strictly
5298         # speaking this is not (yet?) necessary for Monte Carlo
5299         # since all those samples use run number 1. Still, this
5300         # general approach should work for all samples.
5301 
5302         # Now loop over all datasets in the list and process them.
5303         # NOTE: This processing has been split into several loops
5304         # to be easier to follow, sacrificing a bit of efficiency.
5305         self.datasets_information = {}
5306         self.logger.info("Collecting information for all datasets to process")
5307         dataset_names = sorted(self.datasets_to_use.keys())
5308         for dataset_name in dataset_names:
5309 
5310             # Tell the user which dataset: nice with many datasets.
5311             sep_line = "-" * 30
5312             self.logger.info(sep_line)
5313             self.logger.info("  `%s'" % dataset_name)
5314             self.logger.info(sep_line)
5315 
5316             runs = self.dbs_resolve_runs(dataset_name)
5317             self.logger.info("    found %d run(s)" % len(runs))
5318             if len(runs) > 0:
5319                 self.logger.debug("      run number(s): %s" % \
5320                                   ", ".join([str(i) for i in runs]))
5321             else:
5322                 # DEBUG DEBUG DEBUG
5323                 # This should never happen after the DBS checks.
5324                 self.logger.warning("  --> skipping dataset "
5325                                     "without any runs")
5326                 assert False, "Panic: found a dataset without runs " \
5327                        "after DBS checks!"
5328                 # DEBUG DEBUG DEBUG end
5329 
5330             cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5331             self.logger.info("    found CMSSW version `%s'" % cmssw_version)
5332 
5333             # Figure out if this is data or MC.
5334             datatype = self.dbs_resolve_datatype(dataset_name)
5335             self.logger.info("    sample is data or MC? --> %s" % \
5336                              datatype)
5337 
5338             ###
5339 
5340             # Try and figure out the GlobalTag to be used.
5341             if self.globaltag is None:
5342                 globaltag = self.dbs_resolve_globaltag(dataset_name)
5343             else:
5344                 globaltag = self.globaltag
5345 
5346             self.logger.info("    found GlobalTag `%s'" % globaltag)
5347 
5348             # DEBUG DEBUG DEBUG
5349             if globaltag == "":
5350                 # Actually we should not even reach this point, after
5351                 # our dataset sanity checks.
5352                 assert datatype == "data", \
5353                        "ERROR Empty GlobalTag for MC dataset!!!"
5354             # DEBUG DEBUG DEBUG end
5355 
5356             ###
5357 
5358             # DEBUG DEBUG DEBUG
5359             #tmp = self.dbs_check_dataset_spread_old(dataset_name)
5360             # DEBUG DEBUG DEBUG end
5361             sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5362 
5363             # Extract the total event counts.
5364             num_events = {}
5365             for run_number in sites_catalog.keys():
5366                 num_events[run_number] = sites_catalog \
5367                                          [run_number]["all_sites"]
5368                 del sites_catalog[run_number]["all_sites"]
5369 
5370             # Extract the information about whether or not datasets
5371             # are mirrored.
5372             mirror_catalog = {}
5373             for run_number in sites_catalog.keys():
5374                 mirror_catalog[run_number] = sites_catalog \
5375                                              [run_number]["mirrored"]
5376                 del sites_catalog[run_number]["mirrored"]
5377 
5378             # BUG BUG BUG
5379             # I think I could now get rid of that and just fill the
5380             # "sites" entry with the `inverse' of this
5381             # num_events_catalog(?).
5382             #num_sites = self.dbs_resolve_dataset_number_of_sites(dataset_name)
5383             #sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5384             #sites_catalog = dict(zip(num_events_catalog.keys(),
5385             #                         [[j for i in num_events_catalog.values() for j in i.keys()]]))
5386             # BUG BUG BUG end
5387 
5388 ##            # DEBUG DEBUG DEBUG
5389 ##            # This is probably only useful to make sure we don't muck
5390 ##            # things up, right?
5391 ##            # Figure out across how many sites this sample has been spread.
5392 ##            if num_sites == 1:
5393 ##                self.logger.info("    sample is contained at a single site")
5394 ##            else:
5395 ##                self.logger.info("    sample is spread across %d sites" % \
5396 ##                                 num_sites)
5397 ##            if num_sites < 1:
5398 ##                # NOTE: This _should not_ happen with any valid dataset.
5399 ##                self.logger.warning("  --> skipping dataset which is not " \
5400 ##                                    "hosted anywhere")
5401 ##            # DEBUG DEBUG DEBUG end
5402 
5403             # Now put everything in a place where we can find it again
5404             # if we need it.
5405             self.datasets_information[dataset_name] = {}
5406             self.datasets_information[dataset_name]["runs"] = runs
5407             self.datasets_information[dataset_name]["cmssw_version"] = \
5408                                                                      cmssw_version
5409             self.datasets_information[dataset_name]["globaltag"] = globaltag
5410             self.datasets_information[dataset_name]["datatype"] = datatype
5411             self.datasets_information[dataset_name]["num_events"] = num_events
5412             self.datasets_information[dataset_name]["mirrored"] = mirror_catalog
5413             self.datasets_information[dataset_name]["sites"] = sites_catalog
5414 
5415             # Each run of each dataset has a different CASTOR output
5416             # path.
5417             castor_path_common = self.create_castor_path_name_common(dataset_name)
5418             self.logger.info("    output will go into `%s'" % \
5419                              castor_path_common)
5420 
5421             castor_paths = dict(list(zip(runs,
5422                                     [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5423                                      for i in runs])))
5424             for path_name in castor_paths.values():
5425                 self.logger.debug("      %s" % path_name)
5426             self.datasets_information[dataset_name]["castor_path"] = \
5427                                                                    castor_paths
5428 
5429         # End of build_datasets_information.
5430 
5431     ##########
5432 
5433     def show_exit_message(self):
5434         """Tell the user what to do now, after this part is done.
5435 
5436         This should provide the user with some (preferably
5437         copy-pasteable) instructions on what to do now with the setups
5438         and files that have been created.
5439 
5440         """
5441 
5442         # TODO TODO TODO
5443         # This could be improved a bit.
5444         # TODO TODO TODO end
5445 
5446         sep_line = "-" * 60
5447 
5448         self.logger.info("")
5449         self.logger.info(sep_line)
5450         self.logger.info("  Configuration files have been created.")
5451         self.logger.info("  From here on please follow the usual CRAB instructions.")
5452         self.logger.info("  Quick copy-paste instructions are shown below.")
5453         self.logger.info(sep_line)
5454 
5455         self.logger.info("")
5456         self.logger.info("    Create all CRAB jobs:")
5457         self.logger.info("      multicrab -create")
5458         self.logger.info("")
5459         self.logger.info("    Submit all CRAB jobs:")
5460         self.logger.info("      multicrab -submit")
5461         self.logger.info("")
5462         self.logger.info("    Check CRAB status:")
5463         self.logger.info("      multicrab -status")
5464         self.logger.info("")
5465 
5466         self.logger.info("")
5467         self.logger.info("  For more information please see the CMS Twiki:")
5468         self.logger.info("    %s" % twiki_url)
5469         self.logger.info(sep_line)
5470 
5471         # If there were any jobs for which we could not find a
5472         # matching site show a warning message about that.
5473         if not self.all_sites_found:
5474             self.logger.warning("  For some of the jobs no matching " \
5475                                 "site could be found")
5476             self.logger.warning("  --> please scan your multicrab.cfg" \
5477                                 "for occurrences of `%s'." % \
5478                                 self.no_matching_site_found_str)
5479             self.logger.warning("      You will have to fix those " \
5480                                 "by hand, sorry.")
5481 
5482         # End of show_exit_message.
5483 
5484     ##########
5485 
5486     def run(self):
5487         "Main entry point of the CMS harvester."
5488 
5489         # Start with a positive thought.
5490         exit_code = 0
5491 
5492         try:
5493 
5494             try:
5495 
5496                 # Parse all command line options and arguments
5497                 self.parse_cmd_line_options()
5498                 # and check that they make sense.
5499                 self.check_input_status()
5500 
5501                 # Check if CMSSW is setup.
5502                 self.check_cmssw()
5503 
5504                 # Check if DBS is setup,
5505                 self.check_dbs()
5506                 # and if all is fine setup the Python side.
5507                 self.setup_dbs()
5508 
5509                 # Fill our dictionary with all the required info we
5510                 # need to understand harvesting jobs. This needs to be
5511                 # done after the CMSSW version is known.
5512                 self.setup_harvesting_info()
5513 
5514                 # Obtain list of dataset names to consider
5515                 self.build_dataset_use_list()
5516                 # and the list of dataset names to ignore.
5517                 self.build_dataset_ignore_list()
5518 
5519                 # The same for the runs lists (if specified).
5520                 self.build_runs_use_list()
5521                 self.build_runs_ignore_list()
5522 
5523                 # Process the list of datasets to ignore and fold that
5524                 # into the list of datasets to consider.
5525                 # NOTE: The run-based selection is done later since
5526                 # right now we don't know yet which runs a dataset
5527                 # contains.
5528                 self.process_dataset_ignore_list()
5529 
5530                 # Obtain all required information on the datasets,
5531                 # like run numbers and GlobalTags.
5532                 self.build_datasets_information()
5533 
5534                 if self.use_ref_hists and \
5535                        self.ref_hist_mappings_needed():
5536                     # Load the dataset name to reference histogram
5537                     # name mappings from file.
5538                     self.load_ref_hist_mappings()
5539                     # Now make sure that for all datasets we want to
5540                     # process there is a reference defined. Otherwise
5541                     # just bomb out before wasting any more time.
5542                     self.check_ref_hist_mappings()
5543                 else:
5544                     self.logger.info("No need to load reference " \
5545                                      "histogram mappings file")
5546 
5547                 # OBSOLETE OBSOLETE OBSOLETE
5548 ##                # TODO TODO TODO
5549 ##                # Need to think about where this should go, but
5550 ##                # somewhere we have to move over the fact that we want
5551 ##                # to process all runs for each dataset that we're
5552 ##                # considering. This basically means copying over the
5553 ##                # information from self.datasets_information[]["runs"]
5554 ##                # to self.datasets_to_use[].
5555 ##                for dataset_name in self.datasets_to_use.keys():
5556 ##                    self.datasets_to_use[dataset_name] = self.datasets_information[dataset_name]["runs"]
5557 ##                # TODO TODO TODO end
5558                 # OBSOLETE OBSOLETE OBSOLETE end
5559 
5560                 self.process_runs_use_and_ignore_lists()
5561 
5562                 # If we've been asked to sacrifice some parts of
5563                 # spread-out samples in order to be able to partially
5564                 # harvest them, we'll do that here.
5565                 if self.harvesting_mode == "single-step-allow-partial":
5566                     self.singlify_datasets()
5567 
5568                 # Check dataset name(s)
5569                 self.check_dataset_list()
5570                 # and see if there is anything left to do.
5571                 if len(self.datasets_to_use) < 1:
5572                     self.logger.info("After all checks etc. " \
5573                                      "there are no datasets (left?) " \
5574                                      "to process")
5575                 else:
5576 
5577                     self.logger.info("After all checks etc. we are left " \
5578                                      "with %d dataset(s) to process " \
5579                                      "for a total of %d runs" % \
5580                                      (len(self.datasets_to_use),
5581                                       sum([len(i) for i in \
5582                                            self.datasets_to_use.values()])))
5583 
5584                     # NOTE: The order in which things are done here is
5585                     # important. At the end of the job, independent on
5586                     # how it ends (exception, CTRL-C, normal end) the
5587                     # book keeping is written to file. At that time it
5588                     # should be clear which jobs are done and can be
5589                     # submitted. This means we first create the
5590                     # general files, and then the per-job config
5591                     # files.
5592 
5593                     # TODO TODO TODO
5594                     # It would be good to modify the book keeping a
5595                     # bit. Now we write the crab.cfg (which is the
5596                     # same for all samples and runs) and the
5597                     # multicrab.cfg (which contains blocks for all
5598                     # runs of all samples) without updating our book
5599                     # keeping. The only place we update the book
5600                     # keeping is after writing the harvesting config
5601                     # file for a given dataset. Since there is only
5602                     # one single harvesting configuration for each
5603                     # dataset, we have no book keeping information on
5604                     # a per-run basis.
5605                     # TODO TODO TODO end
5606 
5607                     # Check if the CASTOR output area exists. If
5608                     # necessary create it.
5609                     self.create_and_check_castor_dirs()
5610 
5611                     # Create one crab and one multicrab configuration
5612                     # for all jobs together.
5613                     self.write_crab_config()
5614                     self.write_multicrab_config()
5615 
5616                     # Loop over all datasets and create harvesting
5617                     # config files for all of them. One harvesting
5618                     # config per dataset is enough. The same file will
5619                     # be re-used by CRAB for each run.
5620                     # NOTE: We always need a harvesting
5621                     # configuration. For the two-step harvesting we
5622                     # also need a configuration file for the first
5623                     # step: the monitoring element extraction.
5624                     for dataset_name in self.datasets_to_use.keys():
5625                         try:
5626                             self.write_harvesting_config(dataset_name)
5627                             if self.harvesting_mode == "two-step":
5628                                 self.write_me_extraction_config(dataset_name)
5629                         except:
5630                             # Doh! Just re-raise the damn thing.
5631                             raise
5632                         else:
5633 ##                            tmp = self.datasets_information[dataset_name] \
5634 ##                                  ["num_events"]
5635                             tmp = {}
5636                             for run_number in self.datasets_to_use[dataset_name]:
5637                                 tmp[run_number] = self.datasets_information \
5638                                                   [dataset_name]["num_events"][run_number]
5639                             if dataset_name in self.book_keeping_information:
5640                                 self.book_keeping_information[dataset_name].update(tmp)
5641                             else:
5642                                 self.book_keeping_information[dataset_name] = tmp
5643 
5644                     # Explain the user what to do now.
5645                     self.show_exit_message()
5646 
5647             except Usage as err:
5648                 # self.logger.fatal(err.msg)
5649                 # self.option_parser.print_help()
5650                 pass
5651 
5652             except Error as err:
5653                 # self.logger.fatal(err.msg)
5654                 exit_code = 1
5655 
5656             except Exception as err:
5657                 # Hmmm, ignore keyboard interrupts from the
5658                 # user. These are not a `serious problem'. We also
5659                 # skip SystemExit, which is the exception thrown when
5660                 # one calls sys.exit(). This, for example, is done by
5661                 # the option parser after calling print_help(). We
5662                 # also have to catch all `no such option'
5663                 # complaints. Everything else we catch here is a
5664                 # `serious problem'.
5665                 if isinstance(err, SystemExit):
5666                     self.logger.fatal(err.code)
5667                 elif not isinstance(err, KeyboardInterrupt):
5668                     self.logger.fatal("!" * 50)
5669                     self.logger.fatal("  This looks like a serious problem.")
5670                     self.logger.fatal("  If you are sure you followed all " \
5671                                       "instructions")
5672                     self.logger.fatal("  please copy the below stack trace together")
5673                     self.logger.fatal("  with a description of what you were doing to")
5674                     self.logger.fatal("  jeroen.hegeman@cern.ch.")
5675                     self.logger.fatal("  %s" % self.ident_string())
5676                     self.logger.fatal("!" * 50)
5677                     self.logger.fatal(str(err))
5678                     import traceback
5679                     traceback_string = traceback.format_exc()
5680                     for line in traceback_string.split("\n"):
5681                         self.logger.fatal(line)
5682                     self.logger.fatal("!" * 50)
5683                     exit_code = 2
5684 
5685         # This is the stuff that we should really do, no matter
5686         # what. Of course cleaning up after ourselves is also done
5687         # from this place.  This alsokeeps track of the book keeping
5688         # so far. (This means that if half of the configuration files
5689         # were created before e.g. the disk was full, we should still
5690         # have a consistent book keeping file.
5691         finally:
5692 
5693             self.cleanup()
5694 
5695         ###
5696 
5697         if self.crab_submission == True:
5698             os.system("multicrab -create")
5699             os.system("multicrab -submit")
5700 
5701         # End of run.
5702         return exit_code
5703 
5704     # End of CMSHarvester.
5705 
5706 ###########################################################################
5707 ## Main entry point.
5708 ###########################################################################
5709 
5710 if __name__ == "__main__":
5711     "Main entry point for harvesting."
5712 
5713     CMSHarvester().run()
5714 
5715     # Done.
5716 
5717 ###########################################################################