File indexing completed on 2024-11-26 02:34:09
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015 """Main program to run all kinds of harvesting.
0016
0017 These are the basic kinds of harvesting implemented (contact me if
0018 your favourite is missing):
0019
0020 - RelVal : Run for release validation samples. Makes heavy use of MC
0021 truth information.
0022
0023 - RelValFS: FastSim RelVal.
0024
0025 - MC : Run for MC samples.
0026
0027 - DQMOffline : Run for real data (could also be run for MC).
0028
0029 For the mappings of these harvesting types to sequence names please
0030 see the setup_harvesting_info() and option_handler_list_types()
0031 methods.
0032
0033 """
0034
0035
0036
0037 from builtins import range
0038 __version__ = "3.8.2p1"
0039 __author__ = "Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
0040 "Niklas Pietsch (niklas.pietsch@desy.de)"
0041
0042 twiki_url = "https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
0043
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090 import os
0091 import sys
0092 import subprocess
0093 import re
0094 import logging
0095 import optparse
0096 import datetime
0097 import copy
0098 from inspect import getargspec
0099 from random import choice
0100
0101
0102
0103 from DBSAPI.dbsApi import DbsApi
0104 import DBSAPI.dbsException
0105 import DBSAPI.dbsApiException
0106 from functools import reduce
0107
0108 global xml
0109 global SAXParseException
0110 import xml.sax
0111 from xml.sax import SAXParseException
0112
0113 import Configuration.PyReleaseValidation
0114 from Configuration.PyReleaseValidation.ConfigBuilder import \
0115 ConfigBuilder, defaultOptions
0116
0117
0118
0119
0120
0121 import pdb
0122 try:
0123 import debug_hook
0124 except ImportError:
0125 pass
0126
0127
0128
0129
0130 class Usage(Exception):
0131 def __init__(self, msg):
0132 self.msg = msg
0133 def __str__(self):
0134 return repr(self.msg)
0135
0136
0137
0138
0139
0140
0141 class Error(Exception):
0142 def __init__(self, msg):
0143 self.msg = msg
0144 def __str__(self):
0145 return repr(self.msg)
0146
0147
0148
0149
0150
0151 class CMSHarvesterHelpFormatter(optparse.IndentedHelpFormatter):
0152 """Helper class to add some customised help output to cmsHarvester.
0153
0154 We want to add some instructions, as well as a pointer to the CMS
0155 Twiki.
0156
0157 """
0158
0159 def format_usage(self, usage):
0160
0161 usage_lines = []
0162
0163 sep_line = "-" * 60
0164 usage_lines.append(sep_line)
0165 usage_lines.append("Welcome to the CMS harvester, a (hopefully useful)")
0166 usage_lines.append("tool to create harvesting configurations.")
0167 usage_lines.append("For more information please have a look at the CMS Twiki:")
0168 usage_lines.append(" %s" % twiki_url)
0169 usage_lines.append(sep_line)
0170 usage_lines.append("")
0171
0172
0173
0174 usage_lines.append(optparse.IndentedHelpFormatter. \
0175 format_usage(self, usage))
0176
0177 formatted_usage = "\n".join(usage_lines)
0178 return formatted_usage
0179
0180
0181
0182
0183
0184
0185
0186 class DBSXMLHandler(xml.sax.handler.ContentHandler):
0187 """XML handler class to parse DBS results.
0188
0189 The tricky thing here is that older DBS versions (2.0.5 and
0190 earlier) return results in a different XML format than newer
0191 versions. Previously the result values were returned as attributes
0192 to the `result' element. The new approach returns result values as
0193 contents of named elements.
0194
0195 The old approach is handled directly in startElement(), the new
0196 approach in characters().
0197
0198 NOTE: All results are returned in the form of string values of
0199 course!
0200
0201 """
0202
0203
0204
0205
0206
0207 mapping = {
0208 "dataset" : "PATH",
0209 "dataset.tag" : "PROCESSEDDATASET_GLOBALTAG",
0210 "datatype.type" : "PRIMARYDSTYPE_TYPE",
0211 "run" : "RUNS_RUNNUMBER",
0212 "run.number" : "RUNS_RUNNUMBER",
0213 "file.name" : "FILES_LOGICALFILENAME",
0214 "file.numevents" : "FILES_NUMBEROFEVENTS",
0215 "algo.version" : "APPVERSION_VERSION",
0216 "site" : "STORAGEELEMENT_SENAME",
0217 }
0218
0219 def __init__(self, tag_names):
0220
0221
0222 self.element_position = []
0223 self.tag_names = tag_names
0224 self.results = {}
0225
0226 def startElement(self, name, attrs):
0227 self.element_position.append(name)
0228
0229 self.current_value = []
0230
0231
0232
0233
0234 if name == "result":
0235 for name in self.tag_names:
0236 key = DBSXMLHandler.mapping[name]
0237 value = str(attrs[key])
0238 try:
0239 self.results[name].append(value)
0240 except KeyError:
0241 self.results[name] = [value]
0242
0243
0244
0245 def endElement(self, name):
0246 assert self.current_element() == name, \
0247 "closing unopenend element `%s'" % name
0248
0249 if self.current_element() in self.tag_names:
0250 contents = "".join(self.current_value)
0251 if self.current_element() in self.results:
0252 self.results[self.current_element()].append(contents)
0253 else:
0254 self.results[self.current_element()] = [contents]
0255
0256 self.element_position.pop()
0257
0258 def characters(self, content):
0259
0260
0261
0262
0263 if self.current_element() in self.tag_names:
0264 self.current_value.append(content)
0265
0266 def current_element(self):
0267 return self.element_position[-1]
0268
0269 def check_results_validity(self):
0270 """Make sure that all results arrays have equal length.
0271
0272 We should have received complete rows from DBS. I.e. all
0273 results arrays in the handler should be of equal length.
0274
0275 """
0276
0277 results_valid = True
0278
0279 res_names = self.results.keys()
0280 if len(res_names) > 1:
0281 for res_name in res_names[1:]:
0282 res_tmp = self.results[res_name]
0283 if len(res_tmp) != len(self.results[res_names[0]]):
0284 results_valid = False
0285
0286 return results_valid
0287
0288
0289
0290
0291
0292
0293
0294 class CMSHarvester(object):
0295 """Class to perform CMS harvesting.
0296
0297 More documentation `obviously' to follow.
0298
0299 """
0300
0301
0302
0303 def __init__(self, cmd_line_opts=None):
0304 "Initialize class and process command line options."
0305
0306 self.version = __version__
0307
0308
0309
0310 self.harvesting_types = [
0311 "RelVal",
0312 "RelValFS",
0313 "MC",
0314 "DQMOffline",
0315 ]
0316
0317
0318
0319
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329 self.harvesting_modes = [
0330 "single-step",
0331 "single-step-allow-partial",
0332 "two-step"
0333 ]
0334
0335
0336
0337
0338
0339
0340
0341
0342
0343 self.globaltag = None
0344
0345
0346
0347 self.use_ref_hists = True
0348
0349
0350
0351
0352
0353
0354
0355 self.frontier_connection_name = {}
0356 self.frontier_connection_name["globaltag"] = "frontier://" \
0357 "FrontierProd/"
0358 self.frontier_connection_name["refhists"] = "frontier://" \
0359 "FrontierProd/"
0360 self.frontier_connection_overridden = {}
0361 for key in self.frontier_connection_name.keys():
0362 self.frontier_connection_overridden[key] = False
0363
0364
0365
0366
0367 self.harvesting_info = None
0368
0369
0370
0371
0372
0373
0374
0375
0376 self.harvesting_type = None
0377
0378
0379
0380
0381
0382 self.harvesting_mode = None
0383
0384
0385
0386 self.harvesting_mode_default = "single-step"
0387
0388
0389
0390
0391
0392
0393 self.input_method = {}
0394 self.input_method["datasets"] = {}
0395 self.input_method["datasets"]["use"] = None
0396 self.input_method["datasets"]["ignore"] = None
0397 self.input_method["runs"] = {}
0398 self.input_method["runs"]["use"] = None
0399 self.input_method["runs"]["ignore"] = None
0400 self.input_method["runs"]["ignore"] = None
0401
0402 self.input_name = {}
0403 self.input_name["datasets"] = {}
0404 self.input_name["datasets"]["use"] = None
0405 self.input_name["datasets"]["ignore"] = None
0406 self.input_name["runs"] = {}
0407 self.input_name["runs"]["use"] = None
0408 self.input_name["runs"]["ignore"] = None
0409
0410 self.Jsonlumi = False
0411 self.Jsonfilename = "YourJSON.txt"
0412 self.Jsonrunfilename = "YourJSON.txt"
0413 self.todofile = "YourToDofile.txt"
0414
0415
0416
0417
0418 self.force_running = None
0419
0420
0421 self.castor_base_dir = None
0422 self.castor_base_dir_default = "/castor/cern.ch/" \
0423 "cms/store/temp/" \
0424 "dqm/offline/harvesting_output/"
0425
0426
0427
0428 self.book_keeping_file_name = None
0429 self.book_keeping_file_name_default = "harvesting_accounting.txt"
0430
0431
0432
0433
0434 self.ref_hist_mappings_file_name = None
0435
0436 self.ref_hist_mappings_file_name_default = "harvesting_ref_hist_mappings.txt"
0437
0438
0439
0440
0441 self.castor_prefix = "/castor/cern.ch"
0442
0443
0444
0445
0446
0447 self.non_t1access = False
0448 self.caf_access = False
0449 self.saveByLumiSection = False
0450 self.crab_submission = False
0451 self.nr_max_sites = 1
0452
0453 self.preferred_site = "no preference"
0454
0455
0456 self.datasets_to_use = {}
0457
0458 self.datasets_to_ignore = {}
0459
0460 self.book_keeping_information = {}
0461
0462
0463 self.ref_hist_mappings = {}
0464
0465
0466
0467 self.runs_to_use = {}
0468 self.runs_to_ignore = {}
0469
0470
0471 self.sites_and_versions_cache = {}
0472
0473
0474 self.globaltag_check_cache = []
0475
0476
0477
0478 self.all_sites_found = True
0479
0480
0481 self.no_matching_site_found_str = "no_matching_site_found"
0482
0483
0484 if cmd_line_opts is None:
0485 cmd_line_opts = sys.argv[1:]
0486 self.cmd_line_opts = cmd_line_opts
0487
0488
0489 log_handler = logging.StreamHandler()
0490
0491
0492 log_formatter = logging.Formatter("%(message)s")
0493 log_handler.setFormatter(log_formatter)
0494 logger = logging.getLogger()
0495 logger.name = "main"
0496 logger.addHandler(log_handler)
0497 self.logger = logger
0498
0499 self.set_output_level("NORMAL")
0500
0501
0502
0503
0504
0505
0506
0507 def cleanup(self):
0508 "Clean up after ourselves."
0509
0510
0511
0512
0513 logging.shutdown()
0514
0515
0516
0517
0518
0519 def time_stamp(self):
0520 "Create a timestamp to use in the created config files."
0521
0522 time_now = datetime.datetime.utcnow()
0523
0524 time_now = time_now.replace(microsecond = 0)
0525 time_stamp = "%sUTC" % datetime.datetime.isoformat(time_now)
0526
0527
0528 return time_stamp
0529
0530
0531
0532 def ident_string(self):
0533 "Spit out an identification string for cmsHarvester.py."
0534
0535 ident_str = "`cmsHarvester.py " \
0536 "version %s': cmsHarvester.py %s" % \
0537 (__version__,
0538 reduce(lambda x, y: x+' '+y, sys.argv[1:]))
0539
0540 return ident_str
0541
0542
0543
0544 def format_conditions_string(self, globaltag):
0545 """Create the conditions string needed for `cmsDriver'.
0546
0547 Just glueing the FrontierConditions bit in front of it really.
0548
0549 """
0550
0551
0552
0553
0554
0555
0556
0557
0558 if globaltag.lower().find("conditions") > -1:
0559 conditions_string = globaltag
0560 else:
0561 conditions_string = "FrontierConditions_GlobalTag,%s" % \
0562 globaltag
0563
0564
0565 return conditions_string
0566
0567
0568
0569 def db_account_name_cms_cond_globaltag(self):
0570 """Return the database account name used to store the GlobalTag.
0571
0572 The name of the database account depends (albeit weakly) on
0573 the CMSSW release version.
0574
0575 """
0576
0577
0578
0579 account_name = "CMS_COND_31X_GLOBALTAG"
0580
0581
0582 return account_name
0583
0584
0585
0586 def db_account_name_cms_cond_dqm_summary(self):
0587 """See db_account_name_cms_cond_globaltag."""
0588
0589 account_name = None
0590 version = self.cmssw_version[6:11]
0591 if version < "3_4_0":
0592 account_name = "CMS_COND_31X_DQM_SUMMARY"
0593 else:
0594 account_name = "CMS_COND_34X"
0595
0596
0597 return account_name
0598
0599
0600
0601 def config_file_header(self):
0602 "Create a nice header to be used to mark the generated files."
0603
0604 tmp = []
0605
0606 time_stamp = self.time_stamp()
0607 ident_str = self.ident_string()
0608 tmp.append("# %s" % time_stamp)
0609 tmp.append("# WARNING: This file was created automatically!")
0610 tmp.append("")
0611 tmp.append("# Created by %s" % ident_str)
0612
0613 header = "\n".join(tmp)
0614
0615
0616 return header
0617
0618
0619
0620 def set_output_level(self, output_level):
0621 """Adjust the level of output generated.
0622
0623 Choices are:
0624 - normal : default level of output
0625 - quiet : less output than the default
0626 - verbose : some additional information
0627 - debug : lots more information, may be overwhelming
0628
0629 NOTE: The debug option is a bit special in the sense that it
0630 also modifies the output format.
0631
0632 """
0633
0634
0635
0636 output_levels = {
0637 "NORMAL" : logging.INFO,
0638 "QUIET" : logging.WARNING,
0639 "VERBOSE" : logging.INFO,
0640 "DEBUG" : logging.DEBUG
0641 }
0642
0643 output_level = output_level.upper()
0644
0645 try:
0646
0647 self.log_level = output_levels[output_level]
0648 self.logger.setLevel(self.log_level)
0649 except KeyError:
0650
0651 self.logger.fatal("Unknown output level `%s'" % ouput_level)
0652
0653 raise Exception
0654
0655
0656
0657
0658
0659 def option_handler_debug(self, option, opt_str, value, parser):
0660 """Switch to debug mode.
0661
0662 This both increases the amount of output generated, as well as
0663 changes the format used (more detailed information is given).
0664
0665 """
0666
0667
0668 log_formatter_debug = logging.Formatter("[%(levelname)s] " \
0669
0670
0671
0672
0673
0674
0675 "%(message)s")
0676
0677
0678 log_handler = self.logger.handlers[0]
0679 log_handler.setFormatter(log_formatter_debug)
0680 self.set_output_level("DEBUG")
0681
0682
0683
0684
0685
0686 def option_handler_quiet(self, option, opt_str, value, parser):
0687 "Switch to quiet mode: less verbose."
0688
0689 self.set_output_level("QUIET")
0690
0691
0692
0693
0694
0695 def option_handler_force(self, option, opt_str, value, parser):
0696 """Switch on `force mode' in which case we don't brake for nobody.
0697
0698 In so-called `force mode' all sanity checks are performed but
0699 we don't halt on failure. Of course this requires some care
0700 from the user.
0701
0702 """
0703
0704 self.logger.debug("Switching on `force mode'.")
0705 self.force_running = True
0706
0707
0708
0709
0710
0711 def option_handler_harvesting_type(self, option, opt_str, value, parser):
0712 """Set the harvesting type to be used.
0713
0714 This checks that no harvesting type is already set, and sets
0715 the harvesting type to be used to the one specified. If a
0716 harvesting type is already set an exception is thrown. The
0717 same happens when an unknown type is specified.
0718
0719 """
0720
0721
0722
0723
0724
0725
0726 value = value.lower()
0727 harvesting_types_lowered = [i.lower() for i in self.harvesting_types]
0728 try:
0729 type_index = harvesting_types_lowered.index(value)
0730
0731
0732 except ValueError:
0733 self.logger.fatal("Unknown harvesting type `%s'" % \
0734 value)
0735 self.logger.fatal(" possible types are: %s" %
0736 ", ".join(self.harvesting_types))
0737 raise Usage("Unknown harvesting type `%s'" % \
0738 value)
0739
0740
0741
0742 if not self.harvesting_type is None:
0743 msg = "Only one harvesting type should be specified"
0744 self.logger.fatal(msg)
0745 raise Usage(msg)
0746 self.harvesting_type = self.harvesting_types[type_index]
0747
0748 self.logger.info("Harvesting type to be used: `%s'" % \
0749 self.harvesting_type)
0750
0751
0752
0753
0754
0755 def option_handler_harvesting_mode(self, option, opt_str, value, parser):
0756 """Set the harvesting mode to be used.
0757
0758 Single-step harvesting can be used for samples that are
0759 located completely at a single site (= SE). Otherwise use
0760 two-step mode.
0761
0762 """
0763
0764
0765 harvesting_mode = value.lower()
0766 if not harvesting_mode in self.harvesting_modes:
0767 msg = "Unknown harvesting mode `%s'" % harvesting_mode
0768 self.logger.fatal(msg)
0769 self.logger.fatal(" possible modes are: %s" % \
0770 ", ".join(self.harvesting_modes))
0771 raise Usage(msg)
0772
0773
0774
0775 if not self.harvesting_mode is None:
0776 msg = "Only one harvesting mode should be specified"
0777 self.logger.fatal(msg)
0778 raise Usage(msg)
0779 self.harvesting_mode = harvesting_mode
0780
0781 self.logger.info("Harvesting mode to be used: `%s'" % \
0782 self.harvesting_mode)
0783
0784
0785
0786
0787
0788 def option_handler_globaltag(self, option, opt_str, value, parser):
0789 """Set the GlobalTag to be used, overriding our own choices.
0790
0791 By default the cmsHarvester will use the GlobalTag with which
0792 a given dataset was created also for the harvesting. The
0793 --globaltag option is the way to override this behaviour.
0794
0795 """
0796
0797
0798 if not self.globaltag is None:
0799 msg = "Only one GlobalTag should be specified"
0800 self.logger.fatal(msg)
0801 raise Usage(msg)
0802 self.globaltag = value
0803
0804 self.logger.info("GlobalTag to be used: `%s'" % \
0805 self.globaltag)
0806
0807
0808
0809
0810
0811 def option_handler_no_ref_hists(self, option, opt_str, value, parser):
0812 "Switch use of all reference histograms off."
0813
0814 self.use_ref_hists = False
0815
0816 self.logger.warning("Switching off all use of reference histograms")
0817
0818
0819
0820
0821
0822 def option_handler_frontier_connection(self, option, opt_str,
0823 value, parser):
0824 """Override the default Frontier connection string.
0825
0826 Please only use this for testing (e.g. when a test payload has
0827 been inserted into cms_orc_off instead of cms_orc_on).
0828
0829 This method gets called for three different command line
0830 options:
0831 - --frontier-connection,
0832 - --frontier-connection-for-globaltag,
0833 - --frontier-connection-for-refhists.
0834 Appropriate care has to be taken to make sure things are only
0835 specified once.
0836
0837 """
0838
0839
0840 frontier_type = opt_str.split("-")[-1]
0841 if frontier_type == "connection":
0842
0843 frontier_types = self.frontier_connection_name.keys()
0844 else:
0845 frontier_types = [frontier_type]
0846
0847
0848
0849 for connection_name in frontier_types:
0850 if self.frontier_connection_overridden[connection_name] == True:
0851 msg = "Please specify either:\n" \
0852 " `--frontier-connection' to change the " \
0853 "Frontier connection used for everything, or\n" \
0854 "either one or both of\n" \
0855 " `--frontier-connection-for-globaltag' to " \
0856 "change the Frontier connection used for the " \
0857 "GlobalTag and\n" \
0858 " `--frontier-connection-for-refhists' to change " \
0859 "the Frontier connection used for the " \
0860 "reference histograms."
0861 self.logger.fatal(msg)
0862 raise Usage(msg)
0863
0864 frontier_prefix = "frontier://"
0865 if not value.startswith(frontier_prefix):
0866 msg = "Expecting Frontier connections to start with " \
0867 "`%s'. You specified `%s'." % \
0868 (frontier_prefix, value)
0869 self.logger.fatal(msg)
0870 raise Usage(msg)
0871
0872
0873 if value.find("FrontierProd") < 0 and \
0874 value.find("FrontierProd") < 0:
0875 msg = "Expecting Frontier connections to contain either " \
0876 "`FrontierProd' or `FrontierPrep'. You specified " \
0877 "`%s'. Are you sure?" % \
0878 value
0879 self.logger.warning(msg)
0880
0881 if not value.endswith("/"):
0882 value += "/"
0883
0884 for connection_name in frontier_types:
0885 self.frontier_connection_name[connection_name] = value
0886 self.frontier_connection_overridden[connection_name] = True
0887
0888 frontier_type_str = "unknown"
0889 if connection_name == "globaltag":
0890 frontier_type_str = "the GlobalTag"
0891 elif connection_name == "refhists":
0892 frontier_type_str = "the reference histograms"
0893
0894 self.logger.warning("Overriding default Frontier " \
0895 "connection for %s " \
0896 "with `%s'" % \
0897 (frontier_type_str,
0898 self.frontier_connection_name[connection_name]))
0899
0900
0901
0902
0903
0904 def option_handler_input_todofile(self, option, opt_str, value, parser):
0905
0906 self.todofile = value
0907
0908
0909
0910
0911 def option_handler_input_Jsonfile(self, option, opt_str, value, parser):
0912
0913 self.Jsonfilename = value
0914
0915
0916
0917
0918 def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser):
0919
0920 self.Jsonrunfilename = value
0921
0922
0923
0924
0925 def option_handler_input_spec(self, option, opt_str, value, parser):
0926 """TODO TODO TODO
0927 Document this...
0928
0929 """
0930
0931
0932
0933 if opt_str.lower().find("ignore") > -1:
0934 spec_type = "ignore"
0935 else:
0936 spec_type = "use"
0937
0938
0939 if opt_str.lower().find("dataset") > -1:
0940 select_type = "datasets"
0941 else:
0942 select_type = "runs"
0943
0944 if not self.input_method[select_type][spec_type] is None:
0945 msg = "Please only specify one input method " \
0946 "(for the `%s' case)" % opt_str
0947 self.logger.fatal(msg)
0948 raise Usage(msg)
0949
0950 input_method = opt_str.replace("-", "").replace("ignore", "")
0951 self.input_method[select_type][spec_type] = input_method
0952 self.input_name[select_type][spec_type] = value
0953
0954 self.logger.debug("Input method for the `%s' case: %s" % \
0955 (spec_type, input_method))
0956
0957
0958
0959
0960
0961 def option_handler_book_keeping_file(self, option, opt_str, value, parser):
0962 """Store the name of the file to be used for book keeping.
0963
0964 The only check done here is that only a single book keeping
0965 file is specified.
0966
0967 """
0968
0969 file_name = value
0970
0971 if not self.book_keeping_file_name is None:
0972 msg = "Only one book keeping file should be specified"
0973 self.logger.fatal(msg)
0974 raise Usage(msg)
0975 self.book_keeping_file_name = file_name
0976
0977 self.logger.info("Book keeping file to be used: `%s'" % \
0978 self.book_keeping_file_name)
0979
0980
0981
0982
0983
0984 def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser):
0985 """Store the name of the file for the ref. histogram mapping.
0986
0987 """
0988
0989 file_name = value
0990
0991 if not self.ref_hist_mappings_file_name is None:
0992 msg = "Only one reference histogram mapping file " \
0993 "should be specified"
0994 self.logger.fatal(msg)
0995 raise Usage(msg)
0996 self.ref_hist_mappings_file_name = file_name
0997
0998 self.logger.info("Reference histogram mapping file " \
0999 "to be used: `%s'" % \
1000 self.ref_hist_mappings_file_name)
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060 def option_handler_castor_dir(self, option, opt_str, value, parser):
1061 """Specify where on CASTOR the output should go.
1062
1063 At the moment only output to CERN CASTOR is
1064 supported. Eventually the harvested results should go into the
1065 central place for DQM on CASTOR anyway.
1066
1067 """
1068
1069
1070 castor_dir = value
1071
1072 castor_prefix = self.castor_prefix
1073
1074
1075 castor_dir = os.path.join(os.path.sep, castor_dir)
1076 self.castor_base_dir = os.path.normpath(castor_dir)
1077
1078 self.logger.info("CASTOR (base) area to be used: `%s'" % \
1079 self.castor_base_dir)
1080
1081
1082
1083
1084
1085 def option_handler_no_t1access(self, option, opt_str, value, parser):
1086 """Set the self.no_t1access flag to try and create jobs that
1087 run without special `t1access' role.
1088
1089 """
1090
1091 self.non_t1access = True
1092
1093 self.logger.warning("Running in `non-t1access' mode. " \
1094 "Will try to create jobs that run " \
1095 "without special rights but no " \
1096 "further promises...")
1097
1098
1099
1100
1101
1102 def option_handler_caf_access(self, option, opt_str, value, parser):
1103 """Set the self.caf_access flag to try and create jobs that
1104 run on the CAF.
1105
1106 """
1107 self.caf_access = True
1108
1109 self.logger.warning("Running in `caf_access' mode. " \
1110 "Will try to create jobs that run " \
1111 "on CAF but no" \
1112 "further promises...")
1113
1114
1115
1116
1117
1118 def option_handler_saveByLumiSection(self, option, opt_str, value, parser):
1119 """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1120 """
1121 self.saveByLumiSection = True
1122
1123 self.logger.warning("waning concerning saveByLumiSection option")
1124
1125
1126
1127
1128
1129
1130 def option_handler_crab_submission(self, option, opt_str, value, parser):
1131 """Crab jobs are not created and
1132 "submitted automatically",
1133 """
1134 self.crab_submission = True
1135
1136
1137
1138
1139
1140 def option_handler_sites(self, option, opt_str, value, parser):
1141
1142 self.nr_max_sites = value
1143
1144
1145
1146 def option_handler_preferred_site(self, option, opt_str, value, parser):
1147
1148 self.preferred_site = value
1149
1150
1151
1152 def option_handler_list_types(self, option, opt_str, value, parser):
1153 """List all harvesting types and their mappings.
1154
1155 This lists all implemented harvesting types with their
1156 corresponding mappings to sequence names. This had to be
1157 separated out from the help since it depends on the CMSSW
1158 version and was making things a bit of a mess.
1159
1160 NOTE: There is no way (at least not that I could come up with)
1161 to code this in a neat generic way that can be read both by
1162 this method and by setup_harvesting_info(). Please try hard to
1163 keep these two methods in sync!
1164
1165 """
1166
1167 sep_line = "-" * 50
1168 sep_line_short = "-" * 20
1169
1170 print(sep_line)
1171 print("The following harvesting types are available:")
1172 print(sep_line)
1173
1174 print("`RelVal' maps to:")
1175 print(" pre-3_3_0 : HARVESTING:validationHarvesting")
1176 print(" 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting")
1177 print(" Exceptions:")
1178 print(" 3_3_0_pre1-4 : HARVESTING:validationHarvesting")
1179 print(" 3_3_0_pre6 : HARVESTING:validationHarvesting")
1180 print(" 3_4_0_pre1 : HARVESTING:validationHarvesting")
1181
1182 print(sep_line_short)
1183
1184 print("`RelValFS' maps to:")
1185 print(" always : HARVESTING:validationHarvestingFS")
1186
1187 print(sep_line_short)
1188
1189 print("`MC' maps to:")
1190 print(" always : HARVESTING:validationprodHarvesting")
1191
1192 print(sep_line_short)
1193
1194 print("`DQMOffline' maps to:")
1195 print(" always : HARVESTING:dqmHarvesting")
1196
1197 print(sep_line)
1198
1199
1200
1201 raise SystemExit
1202
1203
1204
1205
1206
1207 def setup_harvesting_info(self):
1208 """Fill our dictionary with all info needed to understand
1209 harvesting.
1210
1211 This depends on the CMSSW version since at some point the
1212 names and sequences were modified.
1213
1214 NOTE: There is no way (at least not that I could come up with)
1215 to code this in a neat generic way that can be read both by
1216 this method and by option_handler_list_types(). Please try
1217 hard to keep these two methods in sync!
1218
1219 """
1220
1221 assert not self.cmssw_version is None, \
1222 "ERROR setup_harvesting() requires " \
1223 "self.cmssw_version to be set!!!"
1224
1225 harvesting_info = {}
1226
1227
1228 harvesting_info["DQMOffline"] = {}
1229 harvesting_info["DQMOffline"]["beamspot"] = None
1230 harvesting_info["DQMOffline"]["eventcontent"] = None
1231 harvesting_info["DQMOffline"]["harvesting"] = "AtRunEnd"
1232
1233 harvesting_info["RelVal"] = {}
1234 harvesting_info["RelVal"]["beamspot"] = None
1235 harvesting_info["RelVal"]["eventcontent"] = None
1236 harvesting_info["RelVal"]["harvesting"] = "AtRunEnd"
1237
1238 harvesting_info["RelValFS"] = {}
1239 harvesting_info["RelValFS"]["beamspot"] = None
1240 harvesting_info["RelValFS"]["eventcontent"] = None
1241 harvesting_info["RelValFS"]["harvesting"] = "AtRunEnd"
1242
1243 harvesting_info["MC"] = {}
1244 harvesting_info["MC"]["beamspot"] = None
1245 harvesting_info["MC"]["eventcontent"] = None
1246 harvesting_info["MC"]["harvesting"] = "AtRunEnd"
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256 assert self.cmssw_version.startswith("CMSSW_")
1257
1258
1259 version = self.cmssw_version[6:]
1260
1261
1262
1263
1264 step_string = None
1265 if version < "3_3_0":
1266 step_string = "validationHarvesting"
1267 elif version in ["3_3_0_pre1", "3_3_0_pre2",
1268 "3_3_0_pre3", "3_3_0_pre4",
1269 "3_3_0_pre6", "3_4_0_pre1"]:
1270 step_string = "validationHarvesting"
1271 else:
1272 step_string = "validationHarvesting+dqmHarvesting"
1273
1274 harvesting_info["RelVal"]["step_string"] = step_string
1275
1276
1277
1278 assert not step_string is None, \
1279 "ERROR Could not decide a RelVal harvesting sequence " \
1280 "for CMSSW version %s" % self.cmssw_version
1281
1282
1283
1284
1285
1286 step_string = "validationHarvestingFS"
1287
1288 harvesting_info["RelValFS"]["step_string"] = step_string
1289
1290
1291
1292
1293 step_string = "validationprodHarvesting"
1294
1295 harvesting_info["MC"]["step_string"] = step_string
1296
1297
1298
1299 assert not step_string is None, \
1300 "ERROR Could not decide a MC harvesting " \
1301 "sequence for CMSSW version %s" % self.cmssw_version
1302
1303
1304
1305
1306
1307 step_string = "dqmHarvesting"
1308
1309 harvesting_info["DQMOffline"]["step_string"] = step_string
1310
1311
1312
1313 self.harvesting_info = harvesting_info
1314
1315 self.logger.info("Based on the CMSSW version (%s) " \
1316 "I decided to use the `HARVESTING:%s' " \
1317 "sequence for %s harvesting" % \
1318 (self.cmssw_version,
1319 self.harvesting_info[self.harvesting_type]["step_string"],
1320 self.harvesting_type))
1321
1322
1323
1324
1325
1326 def create_castor_path_name_common(self, dataset_name):
1327 """Build the common part of the output path to be used on
1328 CASTOR.
1329
1330 This consists of the CASTOR area base path specified by the
1331 user and a piece depending on the data type (data vs. MC), the
1332 harvesting type and the dataset name followed by a piece
1333 containing the run number and event count. (See comments in
1334 create_castor_path_name_special for details.) This method
1335 creates the common part, without run number and event count.
1336
1337 """
1338
1339 castor_path = self.castor_base_dir
1340
1341
1342
1343
1344 datatype = self.datasets_information[dataset_name]["datatype"]
1345 datatype = datatype.lower()
1346 castor_path = os.path.join(castor_path, datatype)
1347
1348
1349 harvesting_type = self.harvesting_type
1350 harvesting_type = harvesting_type.lower()
1351 castor_path = os.path.join(castor_path, harvesting_type)
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361 release_version = self.cmssw_version
1362 release_version = release_version.lower(). \
1363 replace("cmssw", ""). \
1364 strip("_")
1365 castor_path = os.path.join(castor_path, release_version)
1366
1367
1368 dataset_name_escaped = self.escape_dataset_name(dataset_name)
1369 castor_path = os.path.join(castor_path, dataset_name_escaped)
1370
1371
1372
1373 castor_path = os.path.normpath(castor_path)
1374
1375
1376 return castor_path
1377
1378
1379
1380 def create_castor_path_name_special(self,
1381 dataset_name, run_number,
1382 castor_path_common):
1383 """Create the specialised part of the CASTOR output dir name.
1384
1385 NOTE: To avoid clashes with `incremental harvesting'
1386 (re-harvesting when a dataset grows) we have to include the
1387 event count in the path name. The underlying `problem' is that
1388 CRAB does not overwrite existing output files so if the output
1389 file already exists CRAB will fail to copy back the output.
1390
1391 NOTE: It's not possible to create different kinds of
1392 harvesting jobs in a single call to this tool. However, in
1393 principle it could be possible to create both data and MC jobs
1394 in a single go.
1395
1396 NOTE: The number of events used in the path name is the
1397 _total_ number of events in the dataset/run at the time of
1398 harvesting. If we're doing partial harvesting the final
1399 results will reflect lower statistics. This is a) the easiest
1400 to code and b) the least likely to lead to confusion if
1401 someone ever decides to swap/copy around file blocks between
1402 sites.
1403
1404 """
1405
1406 castor_path = castor_path_common
1407
1408
1409
1410
1411 castor_path = os.path.join(castor_path, "run_%d" % run_number)
1412
1413
1414
1415
1416
1417
1418
1419 castor_path = os.path.join(castor_path, "nevents")
1420
1421
1422
1423 castor_path = os.path.normpath(castor_path)
1424
1425
1426 return castor_path
1427
1428
1429
1430 def create_and_check_castor_dirs(self):
1431 """Make sure all required CASTOR output dirs exist.
1432
1433 This checks the CASTOR base dir specified by the user as well
1434 as all the subdirs required by the current set of jobs.
1435
1436 """
1437
1438 self.logger.info("Checking (and if necessary creating) CASTOR " \
1439 "output area(s)...")
1440
1441
1442 self.create_and_check_castor_dir(self.castor_base_dir)
1443
1444
1445 castor_dirs = []
1446 for (dataset_name, runs) in self.datasets_to_use.items():
1447
1448 for run in runs:
1449 castor_dirs.append(self.datasets_information[dataset_name] \
1450 ["castor_path"][run])
1451 castor_dirs_unique = sorted(set(castor_dirs))
1452
1453
1454
1455 ndirs = len(castor_dirs_unique)
1456 step = max(ndirs / 10, 1)
1457 for (i, castor_dir) in enumerate(castor_dirs_unique):
1458 if (i + 1) % step == 0 or \
1459 (i + 1) == ndirs:
1460 self.logger.info(" %d/%d" % \
1461 (i + 1, ndirs))
1462 self.create_and_check_castor_dir(castor_dir)
1463
1464
1465
1466
1467
1468
1469 self.logger.debug("Checking if path `%s' is empty" % \
1470 castor_dir)
1471 cmd = "rfdir %s" % castor_dir
1472 (status, output) = subprocess.getstatusoutput(cmd)
1473 if status != 0:
1474 msg = "Could not access directory `%s'" \
1475 " !!! This is bad since I should have just" \
1476 " created it !!!" % castor_dir
1477 self.logger.fatal(msg)
1478 raise Error(msg)
1479 if len(output) > 0:
1480 self.logger.warning("Output directory `%s' is not empty:" \
1481 " new jobs will fail to" \
1482 " copy back output" % \
1483 castor_dir)
1484
1485
1486
1487
1488
1489 def create_and_check_castor_dir(self, castor_dir):
1490 """Check existence of the give CASTOR dir, if necessary create
1491 it.
1492
1493 Some special care has to be taken with several things like
1494 setting the correct permissions such that CRAB can store the
1495 output results. Of course this means that things like
1496 /castor/cern.ch/ and user/j/ have to be recognised and treated
1497 properly.
1498
1499 NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1500 the moment.
1501
1502 NOTE: This method uses some slightly tricky caching to make
1503 sure we don't keep over and over checking the same base paths.
1504
1505 """
1506
1507
1508
1509
1510 def split_completely(path):
1511 (parent_path, name) = os.path.split(path)
1512 if name == "":
1513 return (parent_path, )
1514 else:
1515 return split_completely(parent_path) + (name, )
1516
1517
1518
1519
1520
1521 def extract_permissions(rfstat_output):
1522 """Parse the output from rfstat and return the
1523 5-digit permissions string."""
1524
1525 permissions_line = [i for i in output.split("\n") \
1526 if i.lower().find("protection") > -1]
1527 regexp = re.compile(".*\(([0123456789]{5})\).*")
1528 match = regexp.search(rfstat_output)
1529 if not match or len(match.groups()) != 1:
1530 msg = "Could not extract permissions " \
1531 "from output: %s" % rfstat_output
1532 self.logger.fatal(msg)
1533 raise Error(msg)
1534 permissions = match.group(1)
1535
1536
1537 return permissions
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551 castor_paths_dont_touch = {
1552 0: ["/", "castor", "cern.ch", "cms", "store", "temp",
1553 "dqm", "offline", "user"],
1554 -1: ["user", "store"]
1555 }
1556
1557 self.logger.debug("Checking CASTOR path `%s'" % castor_dir)
1558
1559
1560
1561
1562 castor_path_pieces = split_completely(castor_dir)
1563
1564
1565
1566
1567 path = ""
1568 check_sizes = sorted(castor_paths_dont_touch.keys())
1569 len_castor_path_pieces = len(castor_path_pieces)
1570 for piece_index in range (len_castor_path_pieces):
1571 skip_this_path_piece = False
1572 piece = castor_path_pieces[piece_index]
1573
1574
1575 for check_size in check_sizes:
1576
1577 if (piece_index + check_size) > -1:
1578
1579
1580
1581 if castor_path_pieces[piece_index + check_size] in castor_paths_dont_touch[check_size]:
1582
1583 skip_this_path_piece = True
1584
1585
1586
1587
1588
1589
1590
1591 path = os.path.join(path, piece)
1592
1593
1594
1595
1596
1597 try:
1598 if path in self.castor_path_checks_cache:
1599 continue
1600 except AttributeError:
1601
1602 self.castor_path_checks_cache = []
1603 self.castor_path_checks_cache.append(path)
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621 if not skip_this_path_piece:
1622
1623
1624
1625
1626
1627
1628
1629 self.logger.debug("Checking if path `%s' exists" % \
1630 path)
1631 cmd = "rfstat %s" % path
1632 (status, output) = subprocess.getstatusoutput(cmd)
1633 if status != 0:
1634
1635 self.logger.debug("Creating path `%s'" % path)
1636 cmd = "nsmkdir -m 775 %s" % path
1637 (status, output) = subprocess.getstatusoutput(cmd)
1638 if status != 0:
1639 msg = "Could not create directory `%s'" % path
1640 self.logger.fatal(msg)
1641 raise Error(msg)
1642 cmd = "rfstat %s" % path
1643 (status, output) = subprocess.getstatusoutput(cmd)
1644
1645
1646
1647
1648 permissions = extract_permissions(output)
1649 if not permissions.startswith("40"):
1650 msg = "Path `%s' is not a directory(?)" % path
1651 self.logger.fatal(msg)
1652 raise Error(msg)
1653
1654
1655
1656 self.logger.debug("Checking permissions for path `%s'" % path)
1657 cmd = "rfstat %s" % path
1658 (status, output) = subprocess.getstatusoutput(cmd)
1659 if status != 0:
1660 msg = "Could not obtain permissions for directory `%s'" % \
1661 path
1662 self.logger.fatal(msg)
1663 raise Error(msg)
1664
1665 permissions = extract_permissions(output)[-3:]
1666
1667
1668
1669 if piece_index == (len_castor_path_pieces - 1):
1670
1671
1672 permissions_target = "775"
1673 else:
1674
1675 permissions_target = "775"
1676
1677
1678 permissions_new = []
1679 for (i, j) in zip(permissions, permissions_target):
1680 permissions_new.append(str(max(int(i), int(j))))
1681 permissions_new = "".join(permissions_new)
1682 self.logger.debug(" current permissions: %s" % \
1683 permissions)
1684 self.logger.debug(" target permissions : %s" % \
1685 permissions_target)
1686 if permissions_new != permissions:
1687
1688 self.logger.debug("Changing permissions of `%s' " \
1689 "to %s (were %s)" % \
1690 (path, permissions_new, permissions))
1691 cmd = "rfchmod %s %s" % (permissions_new, path)
1692 (status, output) = subprocess.getstatusoutput(cmd)
1693 if status != 0:
1694 msg = "Could not change permissions for path `%s' " \
1695 "to %s" % (path, permissions_new)
1696 self.logger.fatal(msg)
1697 raise Error(msg)
1698
1699 self.logger.debug(" Permissions ok (%s)" % permissions_new)
1700
1701
1702
1703
1704
1705 def pick_a_site(self, sites, cmssw_version):
1706
1707
1708 sites_forbidden = []
1709
1710 if (self.preferred_site == "CAF") or (self.preferred_site == "caf.cern.ch"):
1711 self.caf_access = True
1712
1713 if self.caf_access == False:
1714 sites_forbidden.append("caf.cern.ch")
1715
1716
1717
1718
1719
1720
1721
1722 all_t1 = [
1723 "srm-cms.cern.ch",
1724 "ccsrm.in2p3.fr",
1725 "cmssrm-fzk.gridka.de",
1726 "cmssrm.fnal.gov",
1727 "gridka-dCache.fzk.de",
1728 "srm-cms.gridpp.rl.ac.uk",
1729 "srm.grid.sinica.edu.tw",
1730 "srm2.grid.sinica.edu.tw",
1731 "srmcms.pic.es",
1732 "storm-fe-cms.cr.cnaf.infn.it"
1733 ]
1734
1735 country_codes = {
1736 "CAF" : "caf.cern.ch",
1737 "CH" : "srm-cms.cern.ch",
1738 "FR" : "ccsrm.in2p3.fr",
1739 "DE" : "cmssrm-fzk.gridka.de",
1740 "GOV" : "cmssrm.fnal.gov",
1741 "DE2" : "gridka-dCache.fzk.de",
1742 "UK" : "srm-cms.gridpp.rl.ac.uk",
1743 "TW" : "srm.grid.sinica.edu.tw",
1744 "TW2" : "srm2.grid.sinica.edu.tw",
1745 "ES" : "srmcms.pic.es",
1746 "IT" : "storm-fe-cms.cr.cnaf.infn.it"
1747 }
1748
1749 if self.non_t1access:
1750 sites_forbidden.extend(all_t1)
1751
1752 for site in sites_forbidden:
1753 if site in sites:
1754 sites.remove(site)
1755
1756 if self.preferred_site in country_codes:
1757 self.preferred_site = country_codes[self.preferred_site]
1758
1759 if self.preferred_site != "no preference":
1760 if self.preferred_site in sites:
1761 sites = [self.preferred_site]
1762 else:
1763 sites= []
1764
1765
1766
1767
1768
1769
1770
1771
1772 site_name = None
1773 cmd = None
1774 while len(sites) > 0 and \
1775 site_name is None:
1776
1777
1778 t1_sites = []
1779 for site in sites:
1780 if site in all_t1:
1781 t1_sites.append(site)
1782 if site == "caf.cern.ch":
1783 t1_sites.append(site)
1784
1785
1786
1787
1788
1789
1790 if len(t1_sites) > 0:
1791 se_name = choice(t1_sites)
1792
1793 else:
1794 se_name = choice(sites)
1795
1796
1797
1798 if se_name in self.sites_and_versions_cache and \
1799 cmssw_version in self.sites_and_versions_cache[se_name]:
1800 if self.sites_and_versions_cache[se_name][cmssw_version]:
1801 site_name = se_name
1802 break
1803 else:
1804 self.logger.debug(" --> rejecting site `%s'" % se_name)
1805 sites.remove(se_name)
1806
1807 else:
1808 self.logger.info("Checking if site `%s' " \
1809 "has CMSSW version `%s'" % \
1810 (se_name, cmssw_version))
1811 self.sites_and_versions_cache[se_name] = {}
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826 cmd = "lcg-info --list-ce " \
1827 "--query '" \
1828 "Tag=VO-cms-%s," \
1829 "CEStatus=Production," \
1830 "CloseSE=%s'" % \
1831 (cmssw_version, se_name)
1832 (status, output) = subprocess.getstatusoutput(cmd)
1833 if status != 0:
1834 self.logger.error("Could not check site information " \
1835 "for site `%s'" % se_name)
1836 else:
1837 if (len(output) > 0) or (se_name == "caf.cern.ch"):
1838 self.sites_and_versions_cache[se_name][cmssw_version] = True
1839 site_name = se_name
1840 break
1841 else:
1842 self.sites_and_versions_cache[se_name][cmssw_version] = False
1843 self.logger.debug(" --> rejecting site `%s'" % se_name)
1844 sites.remove(se_name)
1845
1846 if site_name is self.no_matching_site_found_str:
1847 self.logger.error(" --> no matching site found")
1848 self.logger.error(" --> Your release or SCRAM " \
1849 "architecture may not be available" \
1850 "anywhere on the (LCG) grid.")
1851 if not cmd is None:
1852 self.logger.debug(" (command used: `%s')" % cmd)
1853 else:
1854 self.logger.debug(" --> selected site `%s'" % site_name)
1855
1856
1857
1858 if site_name is None:
1859 site_name = self.no_matching_site_found_str
1860
1861
1862 self.all_sites_found = False
1863
1864
1865 return site_name
1866
1867
1868
1869 def parse_cmd_line_options(self):
1870
1871
1872
1873
1874 parser = optparse.OptionParser(version="%s %s" % \
1875 ("%prog", self.version),
1876 formatter=CMSHarvesterHelpFormatter())
1877
1878 self.option_parser = parser
1879
1880
1881 parser.add_option("-d", "--debug",
1882 help="Switch on debug mode",
1883 action="callback",
1884 callback=self.option_handler_debug)
1885
1886
1887 parser.add_option("-q", "--quiet",
1888 help="Be less verbose",
1889 action="callback",
1890 callback=self.option_handler_quiet)
1891
1892
1893
1894 parser.add_option("", "--force",
1895 help="Force mode. Do not abort on sanity check "
1896 "failures",
1897 action="callback",
1898 callback=self.option_handler_force)
1899
1900
1901 parser.add_option("", "--harvesting_type",
1902 help="Harvesting type: %s" % \
1903 ", ".join(self.harvesting_types),
1904 action="callback",
1905 callback=self.option_handler_harvesting_type,
1906 type="string",
1907 metavar="HARVESTING_TYPE")
1908
1909
1910 parser.add_option("", "--harvesting_mode",
1911 help="Harvesting mode: %s (default = %s)" % \
1912 (", ".join(self.harvesting_modes),
1913 self.harvesting_mode_default),
1914 action="callback",
1915 callback=self.option_handler_harvesting_mode,
1916 type="string",
1917 metavar="HARVESTING_MODE")
1918
1919
1920 parser.add_option("", "--globaltag",
1921 help="GlobalTag to use. Default is the ones " \
1922 "the dataset was created with for MC, for data" \
1923 "a GlobalTag has to be specified.",
1924 action="callback",
1925 callback=self.option_handler_globaltag,
1926 type="string",
1927 metavar="GLOBALTAG")
1928
1929
1930 parser.add_option("", "--no-ref-hists",
1931 help="Don't use any reference histograms",
1932 action="callback",
1933 callback=self.option_handler_no_ref_hists)
1934
1935
1936
1937 parser.add_option("", "--frontier-connection",
1938 help="Use this Frontier connection to find " \
1939 "GlobalTags and LocalTags (for reference " \
1940 "histograms).\nPlease only use this for " \
1941 "testing.",
1942 action="callback",
1943 callback=self.option_handler_frontier_connection,
1944 type="string",
1945 metavar="FRONTIER")
1946
1947
1948
1949 parser.add_option("", "--frontier-connection-for-globaltag",
1950 help="Use this Frontier connection to find " \
1951 "GlobalTags.\nPlease only use this for " \
1952 "testing.",
1953 action="callback",
1954 callback=self.option_handler_frontier_connection,
1955 type="string",
1956 metavar="FRONTIER")
1957
1958
1959
1960 parser.add_option("", "--frontier-connection-for-refhists",
1961 help="Use this Frontier connection to find " \
1962 "LocalTags (for reference " \
1963 "histograms).\nPlease only use this for " \
1964 "testing.",
1965 action="callback",
1966 callback=self.option_handler_frontier_connection,
1967 type="string",
1968 metavar="FRONTIER")
1969
1970
1971
1972 parser.add_option("", "--dataset",
1973 help="Name (or regexp) of dataset(s) to process",
1974 action="callback",
1975
1976 callback=self.option_handler_input_spec,
1977 type="string",
1978
1979 metavar="DATASET")
1980
1981
1982
1983 parser.add_option("", "--dataset-ignore",
1984 help="Name (or regexp) of dataset(s) to ignore",
1985 action="callback",
1986 callback=self.option_handler_input_spec,
1987 type="string",
1988 metavar="DATASET-IGNORE")
1989
1990
1991
1992 parser.add_option("", "--runs",
1993 help="Run number(s) to process",
1994 action="callback",
1995 callback=self.option_handler_input_spec,
1996 type="string",
1997 metavar="RUNS")
1998
1999
2000
2001 parser.add_option("", "--runs-ignore",
2002 help="Run number(s) to ignore",
2003 action="callback",
2004 callback=self.option_handler_input_spec,
2005 type="string",
2006 metavar="RUNS-IGNORE")
2007
2008
2009
2010 parser.add_option("", "--datasetfile",
2011 help="File containing list of dataset names " \
2012 "(or regexps) to process",
2013 action="callback",
2014
2015 callback=self.option_handler_input_spec,
2016 type="string",
2017
2018 metavar="DATASETFILE")
2019
2020
2021
2022 parser.add_option("", "--datasetfile-ignore",
2023 help="File containing list of dataset names " \
2024 "(or regexps) to ignore",
2025 action="callback",
2026 callback=self.option_handler_input_spec,
2027 type="string",
2028 metavar="DATASETFILE-IGNORE")
2029
2030
2031
2032 parser.add_option("", "--runslistfile",
2033 help="File containing list of run numbers " \
2034 "to process",
2035 action="callback",
2036 callback=self.option_handler_input_spec,
2037 type="string",
2038 metavar="RUNSLISTFILE")
2039
2040
2041
2042 parser.add_option("", "--runslistfile-ignore",
2043 help="File containing list of run numbers " \
2044 "to ignore",
2045 action="callback",
2046 callback=self.option_handler_input_spec,
2047 type="string",
2048 metavar="RUNSLISTFILE-IGNORE")
2049
2050
2051
2052 parser.add_option("", "--Jsonrunfile",
2053 help="Jsonfile containing dictionary of run/lumisections pairs. " \
2054 "All lumisections of runs contained in dictionary are processed.",
2055 action="callback",
2056 callback=self.option_handler_input_Jsonrunfile,
2057 type="string",
2058 metavar="JSONRUNFILE")
2059
2060
2061
2062 parser.add_option("", "--Jsonfile",
2063 help="Jsonfile containing dictionary of run/lumisections pairs. " \
2064 "Only specified lumisections of runs contained in dictionary are processed.",
2065 action="callback",
2066 callback=self.option_handler_input_Jsonfile,
2067 type="string",
2068 metavar="JSONFILE")
2069
2070
2071
2072 parser.add_option("", "--todo-file",
2073 help="Todo file containing a list of runs to process.",
2074 action="callback",
2075 callback=self.option_handler_input_todofile,
2076 type="string",
2077 metavar="TODO-FILE")
2078
2079
2080
2081 parser.add_option("", "--refhistmappingfile",
2082 help="File to be use for the reference " \
2083 "histogram mappings. Default: `%s'." % \
2084 self.ref_hist_mappings_file_name_default,
2085 action="callback",
2086 callback=self.option_handler_ref_hist_mapping_file,
2087 type="string",
2088 metavar="REFHISTMAPPING-FILE")
2089
2090
2091
2092
2093 parser.add_option("", "--castordir",
2094 help="Place on CASTOR to store results. " \
2095 "Default: `%s'." % \
2096 self.castor_base_dir_default,
2097 action="callback",
2098 callback=self.option_handler_castor_dir,
2099 type="string",
2100 metavar="CASTORDIR")
2101
2102
2103
2104 parser.add_option("", "--no-t1access",
2105 help="Try to create jobs that will run " \
2106 "without special `t1access' role",
2107 action="callback",
2108 callback=self.option_handler_no_t1access)
2109
2110
2111 parser.add_option("", "--caf-access",
2112 help="Crab jobs may run " \
2113 "on CAF",
2114 action="callback",
2115 callback=self.option_handler_caf_access)
2116
2117
2118 parser.add_option("", "--saveByLumiSection",
2119 help="set saveByLumiSection=1 in harvesting cfg file",
2120 action="callback",
2121 callback=self.option_handler_saveByLumiSection)
2122
2123
2124 parser.add_option("", "--automatic-crab-submission",
2125 help="Crab jobs are created and " \
2126 "submitted automatically",
2127 action="callback",
2128 callback=self.option_handler_crab_submission)
2129
2130
2131
2132 parser.add_option("", "--max-sites",
2133 help="Max. number of sites each job is submitted to",
2134 action="callback",
2135 callback=self.option_handler_sites,
2136 type="int")
2137
2138
2139 parser.add_option("", "--site",
2140 help="Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2141 srm-cms.cern.ch : CH \
2142 ccsrm.in2p3.fr : FR \
2143 cmssrm-fzk.gridka.de : DE \
2144 cmssrm.fnal.gov : GOV \
2145 gridka-dCache.fzk.de : DE2 \
2146 rm-cms.gridpp.rl.ac.uk : UK \
2147 srm.grid.sinica.edu.tw : TW \
2148 srm2.grid.sinica.edu.tw : TW2 \
2149 srmcms.pic.es : ES \
2150 storm-fe-cms.cr.cnaf.infn.it : IT",
2151 action="callback",
2152 callback=self.option_handler_preferred_site,
2153 type="string")
2154
2155
2156
2157 parser.add_option("-l", "--list",
2158 help="List all harvesting types and their" \
2159 "corresponding sequence names",
2160 action="callback",
2161 callback=self.option_handler_list_types)
2162
2163
2164
2165
2166
2167 if len(self.cmd_line_opts) < 1:
2168 self.cmd_line_opts = ["--help"]
2169
2170
2171
2172
2173
2174
2175
2176
2177 for i in ["-d", "--debug",
2178 "-q", "--quiet"]:
2179 if i in self.cmd_line_opts:
2180 self.cmd_line_opts.remove(i)
2181 self.cmd_line_opts.insert(0, i)
2182
2183
2184 parser.set_defaults()
2185 (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2186
2187
2188
2189
2190
2191 def check_input_status(self):
2192 """Check completeness and correctness of input information.
2193
2194 Check that all required information has been specified and
2195 that, at least as far as can be easily checked, it makes
2196 sense.
2197
2198 NOTE: This is also where any default values are applied.
2199
2200 """
2201
2202 self.logger.info("Checking completeness/correctness of input...")
2203
2204
2205
2206 if len(self.args) > 0:
2207 msg = "Sorry but I don't understand `%s'" % \
2208 (" ".join(self.args))
2209 self.logger.fatal(msg)
2210 raise Usage(msg)
2211
2212
2213
2214
2215 if self.harvesting_mode == "two-step":
2216 msg = "--------------------\n" \
2217 " Sorry, but for the moment (well, till it works)" \
2218 " the two-step mode has been disabled.\n" \
2219 "--------------------\n"
2220 self.logger.fatal(msg)
2221 raise Error(msg)
2222
2223
2224
2225 if self.harvesting_type is None:
2226 msg = "Please specify a harvesting type"
2227 self.logger.fatal(msg)
2228 raise Usage(msg)
2229
2230 if self.harvesting_mode is None:
2231 self.harvesting_mode = self.harvesting_mode_default
2232 msg = "No harvesting mode specified --> using default `%s'" % \
2233 self.harvesting_mode
2234 self.logger.warning(msg)
2235
2236
2237
2238
2239
2240 if self.input_method["datasets"]["use"] is None:
2241 msg = "Please specify an input dataset name " \
2242 "or a list file name"
2243 self.logger.fatal(msg)
2244 raise Usage(msg)
2245
2246
2247
2248 assert not self.input_name["datasets"]["use"] is None
2249
2250
2251
2252
2253
2254
2255 if self.use_ref_hists:
2256 if self.ref_hist_mappings_file_name is None:
2257 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2258 msg = "No reference histogram mapping file specified --> " \
2259 "using default `%s'" % \
2260 self.ref_hist_mappings_file_name
2261 self.logger.warning(msg)
2262
2263
2264
2265
2266
2267 if self.castor_base_dir is None:
2268 self.castor_base_dir = self.castor_base_dir_default
2269 msg = "No CASTOR area specified -> using default `%s'" % \
2270 self.castor_base_dir
2271 self.logger.warning(msg)
2272
2273
2274
2275 if not self.castor_base_dir.startswith(self.castor_prefix):
2276 msg = "CASTOR area does not start with `%s'" % \
2277 self.castor_prefix
2278 self.logger.fatal(msg)
2279 if self.castor_base_dir.find("castor") > -1 and \
2280 not self.castor_base_dir.find("cern.ch") > -1:
2281 self.logger.fatal("Only CERN CASTOR is supported")
2282 raise Usage(msg)
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292 if self.globaltag is None:
2293 self.logger.warning("No GlobalTag specified. This means I cannot")
2294 self.logger.warning("run on data, only on MC.")
2295 self.logger.warning("I will skip all data datasets.")
2296
2297
2298
2299
2300 if not self.globaltag is None:
2301 if not self.globaltag.endswith("::All"):
2302 self.logger.warning("Specified GlobalTag `%s' does " \
2303 "not end in `::All' --> " \
2304 "appending this missing piece" % \
2305 self.globaltag)
2306 self.globaltag = "%s::All" % self.globaltag
2307
2308
2309
2310
2311 for (key, value) in self.frontier_connection_name.items():
2312 frontier_type_str = "unknown"
2313 if key == "globaltag":
2314 frontier_type_str = "the GlobalTag"
2315 elif key == "refhists":
2316 frontier_type_str = "the reference histograms"
2317 non_str = None
2318 if self.frontier_connection_overridden[key] == True:
2319 non_str = "non-"
2320 else:
2321 non_str = ""
2322 self.logger.info("Using %sdefault Frontier " \
2323 "connection for %s: `%s'" % \
2324 (non_str, frontier_type_str, value))
2325
2326
2327
2328
2329
2330
2331
2332 def check_cmssw(self):
2333 """Check if CMSSW is setup.
2334
2335 """
2336
2337
2338
2339
2340 cmssw_version = os.getenv("CMSSW_VERSION")
2341 if cmssw_version is None:
2342 self.logger.fatal("It seems CMSSW is not setup...")
2343 self.logger.fatal("($CMSSW_VERSION is empty)")
2344 raise Error("ERROR: CMSSW needs to be setup first!")
2345
2346 self.cmssw_version = cmssw_version
2347 self.logger.info("Found CMSSW version %s properly set up" % \
2348 self.cmssw_version)
2349
2350
2351 return True
2352
2353
2354
2355 def check_dbs(self):
2356 """Check if DBS is setup.
2357
2358 """
2359
2360
2361
2362
2363 dbs_home = os.getenv("DBSCMD_HOME")
2364 if dbs_home is None:
2365 self.logger.fatal("It seems DBS is not setup...")
2366 self.logger.fatal(" $DBSCMD_HOME is empty")
2367 raise Error("ERROR: DBS needs to be setup first!")
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385 self.logger.debug("Found DBS properly set up")
2386
2387
2388 return True
2389
2390
2391
2392 def setup_dbs(self):
2393 """Setup the Python side of DBS.
2394
2395 For more information see the DBS Python API documentation:
2396 https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2397
2398 """
2399
2400 try:
2401 args={}
2402 args["url"]= "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2403 "servlet/DBSServlet"
2404 api = DbsApi(args)
2405 self.dbs_api = api
2406
2407 except DBSAPI.dbsApiException.DbsApiException as ex:
2408 self.logger.fatal("Caught DBS API exception %s: %s " % \
2409 (ex.getClassName(), ex.getErrorMessage()))
2410 if ex.getErrorCode() not in (None, ""):
2411 logger.debug("DBS exception error code: ", ex.getErrorCode())
2412 raise
2413
2414
2415
2416
2417
2418 def dbs_resolve_dataset_name(self, dataset_name):
2419 """Use DBS to resolve a wildcarded dataset name.
2420
2421 """
2422
2423
2424
2425 assert not self.dbs_api is None
2426
2427
2428
2429
2430 if not (dataset_name.startswith("/") and \
2431 dataset_name.endswith("RECO")):
2432 self.logger.warning("Dataset name `%s' does not sound " \
2433 "like a valid dataset name!" % \
2434 dataset_name)
2435
2436
2437
2438 api = self.dbs_api
2439 dbs_query = "find dataset where dataset like %s " \
2440 "and dataset.status = VALID" % \
2441 dataset_name
2442 try:
2443 api_result = api.executeQuery(dbs_query)
2444 except DBSAPI.dbsApiException.DbsApiException:
2445 msg = "ERROR: Could not execute DBS query"
2446 self.logger.fatal(msg)
2447 raise Error(msg)
2448
2449
2450 handler = DBSXMLHandler(["dataset"])
2451 parser = xml.sax.make_parser()
2452 parser.setContentHandler(handler)
2453
2454
2455 try:
2456 xml.sax.parseString(api_result, handler)
2457 except SAXParseException:
2458 msg = "ERROR: Could not parse DBS server output"
2459 self.logger.fatal(msg)
2460 raise Error(msg)
2461
2462
2463 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2464
2465
2466
2467 datasets = handler.results.values()[0]
2468
2469
2470 return datasets
2471
2472
2473
2474 def dbs_resolve_cmssw_version(self, dataset_name):
2475 """Ask DBS for the CMSSW version used to create this dataset.
2476
2477 """
2478
2479
2480
2481 assert not self.dbs_api is None
2482
2483
2484 api = self.dbs_api
2485 dbs_query = "find algo.version where dataset = %s " \
2486 "and dataset.status = VALID" % \
2487 dataset_name
2488 try:
2489 api_result = api.executeQuery(dbs_query)
2490 except DBSAPI.dbsApiException.DbsApiException:
2491 msg = "ERROR: Could not execute DBS query"
2492 self.logger.fatal(msg)
2493 raise Error(msg)
2494
2495 handler = DBSXMLHandler(["algo.version"])
2496 parser = xml.sax.make_parser()
2497 parser.setContentHandler(handler)
2498
2499 try:
2500 xml.sax.parseString(api_result, handler)
2501 except SAXParseException:
2502 msg = "ERROR: Could not parse DBS server output"
2503 self.logger.fatal(msg)
2504 raise Error(msg)
2505
2506
2507 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2508
2509
2510 cmssw_version = handler.results.values()[0]
2511
2512
2513 assert len(cmssw_version) == 1
2514
2515
2516 cmssw_version = cmssw_version[0]
2517
2518
2519 return cmssw_version
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568 def dbs_resolve_runs(self, dataset_name):
2569 """Ask DBS for the list of runs in a given dataset.
2570
2571 # NOTE: This does not (yet?) skip/remove empty runs. There is
2572 # a bug in the DBS entry run.numevents (i.e. it always returns
2573 # zero) which should be fixed in the `next DBS release'.
2574 # See also:
2575 # https://savannah.cern.ch/bugs/?53452
2576 # https://savannah.cern.ch/bugs/?53711
2577
2578 """
2579
2580
2581
2582
2583
2584
2585
2586
2587 assert not self.dbs_api is None
2588
2589
2590 api = self.dbs_api
2591 dbs_query = "find run where dataset = %s " \
2592 "and dataset.status = VALID" % \
2593 dataset_name
2594 try:
2595 api_result = api.executeQuery(dbs_query)
2596 except DBSAPI.dbsApiException.DbsApiException:
2597 msg = "ERROR: Could not execute DBS query"
2598 self.logger.fatal(msg)
2599 raise Error(msg)
2600
2601 handler = DBSXMLHandler(["run"])
2602 parser = xml.sax.make_parser()
2603 parser.setContentHandler(handler)
2604
2605 try:
2606 xml.sax.parseString(api_result, handler)
2607 except SAXParseException:
2608 msg = "ERROR: Could not parse DBS server output"
2609 self.logger.fatal(msg)
2610 raise Error(msg)
2611
2612
2613 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2614
2615
2616 runs = handler.results.values()[0]
2617
2618 runs = sorted([int(i) for i in runs])
2619
2620
2621 return runs
2622
2623
2624
2625 def dbs_resolve_globaltag(self, dataset_name):
2626 """Ask DBS for the globaltag corresponding to a given dataset.
2627
2628 # BUG BUG BUG
2629 # This does not seem to work for data datasets? E.g. for
2630 # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2631 # Probaly due to the fact that the GlobalTag changed during
2632 # datataking...
2633 BUG BUG BUG end
2634
2635 """
2636
2637
2638
2639 assert not self.dbs_api is None
2640
2641
2642 api = self.dbs_api
2643 dbs_query = "find dataset.tag where dataset = %s " \
2644 "and dataset.status = VALID" % \
2645 dataset_name
2646 try:
2647 api_result = api.executeQuery(dbs_query)
2648 except DBSAPI.dbsApiException.DbsApiException:
2649 msg = "ERROR: Could not execute DBS query"
2650 self.logger.fatal(msg)
2651 raise Error(msg)
2652
2653 handler = DBSXMLHandler(["dataset.tag"])
2654 parser = xml.sax.make_parser()
2655 parser.setContentHandler(parser)
2656
2657 try:
2658 xml.sax.parseString(api_result, handler)
2659 except SAXParseException:
2660 msg = "ERROR: Could not parse DBS server output"
2661 self.logger.fatal(msg)
2662 raise Error(msg)
2663
2664
2665 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2666
2667
2668 globaltag = handler.results.values()[0]
2669
2670
2671 assert len(globaltag) == 1
2672
2673
2674 globaltag = globaltag[0]
2675
2676
2677 return globaltag
2678
2679
2680
2681 def dbs_resolve_datatype(self, dataset_name):
2682 """Ask DBS for the the data type (data or mc) of a given
2683 dataset.
2684
2685 """
2686
2687
2688
2689 assert not self.dbs_api is None
2690
2691
2692 api = self.dbs_api
2693 dbs_query = "find datatype.type where dataset = %s " \
2694 "and dataset.status = VALID" % \
2695 dataset_name
2696 try:
2697 api_result = api.executeQuery(dbs_query)
2698 except DBSAPI.dbsApiException.DbsApiException:
2699 msg = "ERROR: Could not execute DBS query"
2700 self.logger.fatal(msg)
2701 raise Error(msg)
2702
2703 handler = DBSXMLHandler(["datatype.type"])
2704 parser = xml.sax.make_parser()
2705 parser.setContentHandler(handler)
2706
2707 try:
2708 xml.sax.parseString(api_result, handler)
2709 except SAXParseException:
2710 msg = "ERROR: Could not parse DBS server output"
2711 self.logger.fatal(msg)
2712 raise Error(msg)
2713
2714
2715 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2716
2717
2718 datatype = handler.results.values()[0]
2719
2720
2721 assert len(datatype) == 1
2722
2723
2724 datatype = datatype[0]
2725
2726
2727 return datatype
2728
2729
2730
2731
2732
2733
2734 def dbs_resolve_number_of_events(self, dataset_name, run_number=None):
2735 """Determine the number of events in a given dataset (and run).
2736
2737 Ask DBS for the number of events in a dataset. If a run number
2738 is specified the number of events returned is that in that run
2739 of that dataset. If problems occur we throw an exception.
2740
2741 # BUG BUG BUG
2742 # Since DBS does not return the number of events correctly,
2743 # neither for runs nor for whole datasets, we have to work
2744 # around that a bit...
2745 # BUG BUG BUG end
2746
2747 """
2748
2749
2750
2751 assert not self.dbs_api is None
2752
2753
2754 api = self.dbs_api
2755 dbs_query = "find file.name, file.numevents where dataset = %s " \
2756 "and dataset.status = VALID" % \
2757 dataset_name
2758 if not run_number is None:
2759 dbs_query = dbq_query + (" and run = %d" % run_number)
2760 try:
2761 api_result = api.executeQuery(dbs_query)
2762 except DBSAPI.dbsApiException.DbsApiException:
2763 msg = "ERROR: Could not execute DBS query"
2764 self.logger.fatal(msg)
2765 raise Error(msg)
2766
2767 handler = DBSXMLHandler(["file.name", "file.numevents"])
2768 parser = xml.sax.make_parser()
2769 parser.setContentHandler(handler)
2770
2771 try:
2772 xml.sax.parseString(api_result, handler)
2773 except SAXParseException:
2774 msg = "ERROR: Could not parse DBS server output"
2775 self.logger.fatal(msg)
2776 raise Error(msg)
2777
2778
2779 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2780
2781
2782 num_events = sum(handler.results["file.numevents"])
2783
2784
2785 return num_events
2786
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075 def dbs_check_dataset_spread(self, dataset_name):
3076 """Figure out the number of events in each run of this dataset.
3077
3078 This is a more efficient way of doing this than calling
3079 dbs_resolve_number_of_events for each run.
3080
3081 """
3082
3083 self.logger.debug("Checking spread of dataset `%s'" % dataset_name)
3084
3085
3086
3087 assert not self.dbs_api is None
3088
3089
3090 api = self.dbs_api
3091 dbs_query = "find run.number, site, file.name, file.numevents " \
3092 "where dataset = %s " \
3093 "and dataset.status = VALID" % \
3094 dataset_name
3095 try:
3096 api_result = api.executeQuery(dbs_query)
3097 except DBSAPI.dbsApiException.DbsApiException:
3098 msg = "ERROR: Could not execute DBS query"
3099 self.logger.fatal(msg)
3100 raise Error(msg)
3101
3102 handler = DBSXMLHandler(["run.number", "site", "file.name", "file.numevents"])
3103 parser = xml.sax.make_parser()
3104 parser.setContentHandler(handler)
3105
3106 try:
3107
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147 xml.sax.parseString(api_result, handler)
3148 except SAXParseException:
3149 msg = "ERROR: Could not parse DBS server output"
3150 self.logger.fatal(msg)
3151 raise Error(msg)
3152
3153
3154 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
3155
3156
3157
3158
3159
3160 files_info = {}
3161 for (index, site_name) in enumerate(handler.results["site"]):
3162
3163
3164
3165
3166
3167
3168
3169
3170
3171 if len(site_name) < 1:
3172 continue
3173 run_number = int(handler.results["run.number"][index])
3174 file_name = handler.results["file.name"][index]
3175 nevents = int(handler.results["file.numevents"][index])
3176
3177
3178 if run_number not in files_info:
3179
3180 files_info[run_number] = {}
3181 files_info[run_number][file_name] = (nevents,
3182 [site_name])
3183 elif file_name not in files_info[run_number]:
3184
3185 files_info[run_number][file_name] = (nevents,
3186 [site_name])
3187 else:
3188
3189
3190
3191
3192 assert nevents == files_info[run_number][file_name][0]
3193
3194 files_info[run_number][file_name][1].append(site_name)
3195
3196
3197
3198
3199 for run_number in files_info.keys():
3200 files_without_sites = [i for (i, j) in \
3201 files_info[run_number].items() \
3202 if len(j[1]) < 1]
3203 if len(files_without_sites) > 0:
3204 self.logger.warning("Removing %d file(s)" \
3205 " with empty site names" % \
3206 len(files_without_sites))
3207 for file_name in files_without_sites:
3208 del files_info[run_number][file_name]
3209
3210
3211
3212
3213
3214 num_events_catalog = {}
3215 for run_number in files_info.keys():
3216 site_names = list(set([j for i in files_info[run_number].values() for j in i[1]]))
3217
3218
3219
3220
3221 mirrored = None
3222 if len(site_names) > 1:
3223
3224
3225
3226
3227
3228
3229
3230 all_file_names = files_info[run_number].keys()
3231 all_file_names = set(all_file_names)
3232 sites_with_complete_copies = []
3233 for site_name in site_names:
3234 files_at_site = [i for (i, (j, k)) \
3235 in files_info[run_number].items() \
3236 if site_name in k]
3237 files_at_site = set(files_at_site)
3238 if files_at_site == all_file_names:
3239 sites_with_complete_copies.append(site_name)
3240 if len(sites_with_complete_copies) < 1:
3241
3242
3243
3244 mirrored = False
3245 else:
3246 if len(sites_with_complete_copies) > 1:
3247
3248
3249 mirrored = True
3250 else:
3251
3252
3253
3254
3255
3256 mirrored = True
3257
3258
3259
3260
3261
3262
3263
3264 if mirrored:
3265 self.logger.debug(" -> run appears to be `mirrored'")
3266 else:
3267 self.logger.debug(" -> run appears to be spread-out")
3268
3269 if mirrored and \
3270 len(sites_with_complete_copies) != len(site_names):
3271
3272
3273
3274 for (file_name, (i, sites)) in files_info[run_number].items():
3275 complete_sites = [site for site in sites \
3276 if site in sites_with_complete_copies]
3277 files_info[run_number][file_name] = (i, complete_sites)
3278 site_names = sites_with_complete_copies
3279
3280 self.logger.debug(" for run #%d:" % run_number)
3281 num_events_catalog[run_number] = {}
3282 num_events_catalog[run_number]["all_sites"] = sum([i[0] for i in files_info[run_number].values()])
3283 if len(site_names) < 1:
3284 self.logger.debug(" run is not available at any site")
3285 self.logger.debug(" (but should contain %d events" % \
3286 num_events_catalog[run_number]["all_sites"])
3287 else:
3288 self.logger.debug(" at all sites combined there are %d events" % \
3289 num_events_catalog[run_number]["all_sites"])
3290 for site_name in site_names:
3291 num_events_catalog[run_number][site_name] = sum([i[0] for i in files_info[run_number].values() if site_name in i[1]])
3292 self.logger.debug(" at site `%s' there are %d events" % \
3293 (site_name, num_events_catalog[run_number][site_name]))
3294 num_events_catalog[run_number]["mirrored"] = mirrored
3295
3296
3297 return num_events_catalog
3298
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356 def build_dataset_list(self, input_method, input_name):
3357 """Build a list of all datasets to be processed.
3358
3359 """
3360
3361 dataset_names = []
3362
3363
3364
3365
3366 if input_method is None:
3367 pass
3368 elif input_method == "dataset":
3369
3370
3371
3372
3373 self.logger.info("Asking DBS for dataset names")
3374 dataset_names = self.dbs_resolve_dataset_name(input_name)
3375 elif input_method == "datasetfile":
3376
3377
3378
3379
3380 self.logger.info("Reading input from list file `%s'" % \
3381 input_name)
3382 try:
3383 listfile = open("/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name, "r")
3384 print("open listfile")
3385 for dataset in listfile:
3386
3387 dataset_stripped = dataset.strip()
3388 if len(dataset_stripped) < 1:
3389 continue
3390
3391 if dataset_stripped[0] != "#":
3392 dataset_names.extend(self. \
3393 dbs_resolve_dataset_name(dataset_stripped))
3394 listfile.close()
3395 except IOError:
3396 msg = "ERROR: Could not open input list file `%s'" % \
3397 input_name
3398 self.logger.fatal(msg)
3399 raise Error(msg)
3400 else:
3401
3402
3403 assert False, "Unknown input method `%s'" % input_method
3404
3405
3406
3407
3408
3409
3410
3411 dataset_names = sorted(set(dataset_names))
3412
3413
3414
3415 return dataset_names
3416
3417
3418
3419 def build_dataset_use_list(self):
3420 """Build a list of datasets to process.
3421
3422 """
3423
3424 self.logger.info("Building list of datasets to consider...")
3425
3426 input_method = self.input_method["datasets"]["use"]
3427 input_name = self.input_name["datasets"]["use"]
3428 dataset_names = self.build_dataset_list(input_method,
3429 input_name)
3430 self.datasets_to_use = dict(list(zip(dataset_names,
3431 [None] * len(dataset_names))))
3432
3433 self.logger.info(" found %d dataset(s) to process:" % \
3434 len(dataset_names))
3435 for dataset in dataset_names:
3436 self.logger.info(" `%s'" % dataset)
3437
3438
3439
3440
3441
3442 def build_dataset_ignore_list(self):
3443 """Build a list of datasets to ignore.
3444
3445 NOTE: We should always have a list of datasets to process, but
3446 it may be that we don't have a list of datasets to ignore.
3447
3448 """
3449
3450 self.logger.info("Building list of datasets to ignore...")
3451
3452 input_method = self.input_method["datasets"]["ignore"]
3453 input_name = self.input_name["datasets"]["ignore"]
3454 dataset_names = self.build_dataset_list(input_method,
3455 input_name)
3456 self.datasets_to_ignore = dict(list(zip(dataset_names,
3457 [None] * len(dataset_names))))
3458
3459 self.logger.info(" found %d dataset(s) to ignore:" % \
3460 len(dataset_names))
3461 for dataset in dataset_names:
3462 self.logger.info(" `%s'" % dataset)
3463
3464
3465
3466
3467
3468 def build_runs_list(self, input_method, input_name):
3469
3470 runs = []
3471
3472
3473
3474 if input_method is None:
3475 pass
3476 elif input_method == "runs":
3477
3478
3479 self.logger.info("Reading list of runs from the " \
3480 "command line")
3481 runs.extend([int(i.strip()) \
3482 for i in input_name.split(",") \
3483 if len(i.strip()) > 0])
3484 elif input_method == "runslistfile":
3485
3486 self.logger.info("Reading list of runs from file `%s'" % \
3487 input_name)
3488 try:
3489 listfile = open(input_name, "r")
3490 for run in listfile:
3491
3492 run_stripped = run.strip()
3493 if len(run_stripped) < 1:
3494 continue
3495
3496 if run_stripped[0] != "#":
3497 runs.append(int(run_stripped))
3498 listfile.close()
3499 except IOError:
3500 msg = "ERROR: Could not open input list file `%s'" % \
3501 input_name
3502 self.logger.fatal(msg)
3503 raise Error(msg)
3504
3505 else:
3506
3507
3508 assert False, "Unknown input method `%s'" % input_method
3509
3510
3511
3512 runs = list(set(runs))
3513
3514
3515 return runs
3516
3517
3518
3519 def build_runs_use_list(self):
3520 """Build a list of runs to process.
3521
3522 """
3523
3524 self.logger.info("Building list of runs to consider...")
3525
3526 input_method = self.input_method["runs"]["use"]
3527 input_name = self.input_name["runs"]["use"]
3528 runs = self.build_runs_list(input_method, input_name)
3529 self.runs_to_use = dict(list(zip(runs, [None] * len(runs))))
3530
3531 self.logger.info(" found %d run(s) to process:" % \
3532 len(runs))
3533 if len(runs) > 0:
3534 self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3535
3536
3537
3538
3539
3540 def build_runs_ignore_list(self):
3541 """Build a list of runs to ignore.
3542
3543 NOTE: We should always have a list of runs to process, but
3544 it may be that we don't have a list of runs to ignore.
3545
3546 """
3547
3548 self.logger.info("Building list of runs to ignore...")
3549
3550 input_method = self.input_method["runs"]["ignore"]
3551 input_name = self.input_name["runs"]["ignore"]
3552 runs = self.build_runs_list(input_method, input_name)
3553 self.runs_to_ignore = dict(list(zip(runs, [None] * len(runs))))
3554
3555 self.logger.info(" found %d run(s) to ignore:" % \
3556 len(runs))
3557 if len(runs) > 0:
3558 self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3559
3560
3561
3562
3563
3564 def process_dataset_ignore_list(self):
3565 """Update the list of datasets taking into account the ones to
3566 ignore.
3567
3568 Both lists have been generated before from DBS and both are
3569 assumed to be unique.
3570
3571 NOTE: The advantage of creating the ignore list from DBS (in
3572 case a regexp is given) and matching that instead of directly
3573 matching the ignore criterion against the list of datasets (to
3574 consider) built from DBS is that in the former case we're sure
3575 that all regexps are treated exactly as DBS would have done
3576 without the cmsHarvester.
3577
3578 NOTE: This only removes complete samples. Exclusion of single
3579 runs is done by the book keeping. So the assumption is that a
3580 user never wants to harvest just part (i.e. n out of N runs)
3581 of a sample.
3582
3583 """
3584
3585 self.logger.info("Processing list of datasets to ignore...")
3586
3587 self.logger.debug("Before processing ignore list there are %d " \
3588 "datasets in the list to be processed" % \
3589 len(self.datasets_to_use))
3590
3591
3592 dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3593 for dataset_name in self.datasets_to_use.keys():
3594 if dataset_name in self.datasets_to_ignore.keys():
3595 del dataset_names_filtered[dataset_name]
3596
3597 self.logger.info(" --> Removed %d dataset(s)" % \
3598 (len(self.datasets_to_use) -
3599 len(dataset_names_filtered)))
3600
3601 self.datasets_to_use = dataset_names_filtered
3602
3603 self.logger.debug("After processing ignore list there are %d " \
3604 "datasets in the list to be processed" % \
3605 len(self.datasets_to_use))
3606
3607
3608
3609
3610
3611 def process_runs_use_and_ignore_lists(self):
3612
3613 self.logger.info("Processing list of runs to use and ignore...")
3614
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624 runs_to_use = self.runs_to_use
3625 runs_to_ignore = self.runs_to_ignore
3626
3627 for dataset_name in self.datasets_to_use:
3628 runs_in_dataset = self.datasets_information[dataset_name]["runs"]
3629
3630
3631 runs_to_use_tmp = []
3632 for run in runs_to_use:
3633 if not run in runs_in_dataset:
3634 self.logger.warning("Dataset `%s' does not contain " \
3635 "requested run %d " \
3636 "--> ignoring `use' of this run" % \
3637 (dataset_name, run))
3638 else:
3639 runs_to_use_tmp.append(run)
3640
3641 if len(runs_to_use) > 0:
3642 runs = runs_to_use_tmp
3643 self.logger.info("Using %d out of %d runs " \
3644 "of dataset `%s'" % \
3645 (len(runs), len(runs_in_dataset),
3646 dataset_name))
3647 else:
3648 runs = runs_in_dataset
3649
3650 if len(runs_to_ignore) > 0:
3651 runs_tmp = []
3652 for run in runs:
3653 if not run in runs_to_ignore:
3654 runs_tmp.append(run)
3655 self.logger.info("Ignoring %d out of %d runs " \
3656 "of dataset `%s'" % \
3657 (len(runs)- len(runs_tmp),
3658 len(runs_in_dataset),
3659 dataset_name))
3660 runs = runs_tmp
3661
3662 if self.todofile != "YourToDofile.txt":
3663 runs_todo = []
3664 print("Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile)
3665 cmd="grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3666 (status, output)=subprocess.getstatusoutput(cmd)
3667 for run in runs:
3668 run_str="%s" %run
3669 if run_str in output:
3670 runs_todo.append(run)
3671 self.logger.info("Using %d runs " \
3672 "of dataset `%s'" % \
3673 (len(runs_todo),
3674 dataset_name))
3675 runs=runs_todo
3676
3677 Json_runs = []
3678 if self.Jsonfilename != "YourJSON.txt":
3679 good_runs = []
3680 self.Jsonlumi = True
3681
3682
3683 self.logger.info("Reading runs and lumisections from file `%s'" % \
3684 self.Jsonfilename)
3685 try:
3686 Jsonfile = open(self.Jsonfilename, "r")
3687 for names in Jsonfile:
3688 dictNames= eval(str(names))
3689 for key in dictNames:
3690 intkey=int(key)
3691 Json_runs.append(intkey)
3692 Jsonfile.close()
3693 except IOError:
3694 msg = "ERROR: Could not open Jsonfile `%s'" % \
3695 input_name
3696 self.logger.fatal(msg)
3697 raise Error(msg)
3698 for run in runs:
3699 if run in Json_runs:
3700 good_runs.append(run)
3701 self.logger.info("Using %d runs " \
3702 "of dataset `%s'" % \
3703 (len(good_runs),
3704 dataset_name))
3705 runs=good_runs
3706 if (self.Jsonrunfilename != "YourJSON.txt") and (self.Jsonfilename == "YourJSON.txt"):
3707 good_runs = []
3708
3709
3710 self.logger.info("Reading runs from file `%s'" % \
3711 self.Jsonrunfilename)
3712 try:
3713 Jsonfile = open(self.Jsonrunfilename, "r")
3714 for names in Jsonfile:
3715 dictNames= eval(str(names))
3716 for key in dictNames:
3717 intkey=int(key)
3718 Json_runs.append(intkey)
3719 Jsonfile.close()
3720 except IOError:
3721 msg = "ERROR: Could not open Jsonfile `%s'" % \
3722 input_name
3723 self.logger.fatal(msg)
3724 raise Error(msg)
3725 for run in runs:
3726 if run in Json_runs:
3727 good_runs.append(run)
3728 self.logger.info("Using %d runs " \
3729 "of dataset `%s'" % \
3730 (len(good_runs),
3731 dataset_name))
3732 runs=good_runs
3733
3734 self.datasets_to_use[dataset_name] = runs
3735
3736
3737
3738
3739
3740 def singlify_datasets(self):
3741 """Remove all but the largest part of all datasets.
3742
3743 This allows us to harvest at least part of these datasets
3744 using single-step harvesting until the two-step approach
3745 works.
3746
3747 """
3748
3749
3750 assert self.harvesting_mode == "single-step-allow-partial"
3751
3752
3753 for dataset_name in self.datasets_to_use:
3754 for run_number in self.datasets_information[dataset_name]["runs"]:
3755 max_events = max(self.datasets_information[dataset_name]["sites"][run_number].values())
3756 sites_with_max_events = [i[0] for i in self.datasets_information[dataset_name]["sites"][run_number].items() if i[1] == max_events]
3757 self.logger.warning("Singlifying dataset `%s', " \
3758 "run %d" % \
3759 (dataset_name, run_number))
3760 cmssw_version = self.datasets_information[dataset_name] \
3761 ["cmssw_version"]
3762 selected_site = self.pick_a_site(sites_with_max_events,
3763 cmssw_version)
3764
3765
3766 nevents_old = self.datasets_information[dataset_name]["num_events"][run_number]
3767 self.logger.warning(" --> " \
3768 "only harvesting partial statistics: " \
3769 "%d out of %d events (5.1%f%%) " \
3770 "at site `%s'" % \
3771 (max_events,
3772 nevents_old,
3773 100. * max_events / nevents_old,
3774 selected_site))
3775 self.logger.warning("!!! Please note that the number of " \
3776 "events in the output path name will " \
3777 "NOT reflect the actual statistics in " \
3778 "the harvested results !!!")
3779
3780
3781
3782
3783
3784
3785 self.datasets_information[dataset_name]["sites"][run_number] = {selected_site: max_events}
3786 self.datasets_information[dataset_name]["num_events"][run_number] = max_events
3787
3788
3789
3790
3791
3792
3793 def check_dataset_list(self):
3794 """Check list of dataset names for impossible ones.
3795
3796 Two kinds of checks are done:
3797 - Checks for things that do not make sense. These lead to
3798 errors and skipped datasets.
3799 - Sanity checks. For these warnings are issued but the user is
3800 considered to be the authoritative expert.
3801
3802 Checks performed:
3803 - The CMSSW version encoded in the dataset name should match
3804 self.cmssw_version. This is critical.
3805 - There should be some events in the dataset/run. This is
3806 critical in the sense that CRAB refuses to create jobs for
3807 zero events. And yes, this does happen in practice. E.g. the
3808 reprocessed CRAFT08 datasets contain runs with zero events.
3809 - A cursory check is performed to see if the harvesting type
3810 makes sense for the data type. This should prevent the user
3811 from inadvertently running RelVal for data.
3812 - It is not possible to run single-step harvesting jobs on
3813 samples that are not fully contained at a single site.
3814 - Each dataset/run has to be available at at least one site.
3815
3816 """
3817
3818 self.logger.info("Performing sanity checks on dataset list...")
3819
3820 dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3821
3822 for dataset_name in self.datasets_to_use.keys():
3823
3824
3825 version_from_dataset = self.datasets_information[dataset_name] \
3826 ["cmssw_version"]
3827 if version_from_dataset != self.cmssw_version:
3828 msg = " CMSSW version mismatch for dataset `%s' " \
3829 "(%s vs. %s)" % \
3830 (dataset_name,
3831 self.cmssw_version, version_from_dataset)
3832 if self.force_running:
3833
3834 self.logger.warning("%s " \
3835 "--> `force mode' active: " \
3836 "run anyway" % msg)
3837 else:
3838 del dataset_names_after_checks[dataset_name]
3839 self.logger.warning("%s " \
3840 "--> skipping" % msg)
3841 continue
3842
3843
3844
3845
3846
3847
3848
3849
3850 suspicious = False
3851 datatype = self.datasets_information[dataset_name]["datatype"]
3852 if datatype == "data":
3853
3854 if self.harvesting_type != "DQMOffline":
3855 suspicious = True
3856 elif datatype == "mc":
3857 if self.harvesting_type == "DQMOffline":
3858 suspicious = True
3859 else:
3860
3861 assert False, "ERROR Impossible data type `%s' " \
3862 "for dataset `%s'" % \
3863 (datatype, dataset_name)
3864 if suspicious:
3865 msg = " Normally one does not run `%s' harvesting " \
3866 "on %s samples, are you sure?" % \
3867 (self.harvesting_type, datatype)
3868 if self.force_running:
3869 self.logger.warning("%s " \
3870 "--> `force mode' active: " \
3871 "run anyway" % msg)
3872 else:
3873 del dataset_names_after_checks[dataset_name]
3874 self.logger.warning("%s " \
3875 "--> skipping" % msg)
3876 continue
3877
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889 if datatype == "data":
3890 if self.globaltag is None:
3891 msg = "For data datasets (like `%s') " \
3892 "we need a GlobalTag" % \
3893 dataset_name
3894 del dataset_names_after_checks[dataset_name]
3895 self.logger.warning("%s " \
3896 "--> skipping" % msg)
3897 continue
3898
3899
3900
3901
3902
3903
3904
3905
3906 globaltag = self.datasets_information[dataset_name]["globaltag"]
3907 if not globaltag in self.globaltag_check_cache:
3908 if self.check_globaltag(globaltag):
3909 self.globaltag_check_cache.append(globaltag)
3910 else:
3911 msg = "Something is wrong with GlobalTag `%s' " \
3912 "used by dataset `%s'!" % \
3913 (globaltag, dataset_name)
3914 if self.use_ref_hists:
3915 msg += "\n(Either it does not exist or it " \
3916 "does not contain the required key to " \
3917 "be used with reference histograms.)"
3918 else:
3919 msg += "\n(It probably just does not exist.)"
3920 self.logger.fatal(msg)
3921 raise Usage(msg)
3922
3923
3924
3925
3926 runs_without_sites = [i for (i, j) in \
3927 self.datasets_information[dataset_name] \
3928 ["sites"].items() \
3929 if len(j) < 1 and \
3930 i in self.datasets_to_use[dataset_name]]
3931 if len(runs_without_sites) > 0:
3932 for run_without_sites in runs_without_sites:
3933 try:
3934 dataset_names_after_checks[dataset_name].remove(run_without_sites)
3935 except KeyError:
3936 pass
3937 self.logger.warning(" removed %d unavailable run(s) " \
3938 "from dataset `%s'" % \
3939 (len(runs_without_sites), dataset_name))
3940 self.logger.debug(" (%s)" % \
3941 ", ".join([str(i) for i in \
3942 runs_without_sites]))
3943
3944
3945
3946
3947
3948 if not self.harvesting_mode == "two-step":
3949 for run_number in self.datasets_to_use[dataset_name]:
3950
3951
3952
3953
3954 num_sites = len(self.datasets_information[dataset_name] \
3955 ["sites"][run_number])
3956 if num_sites > 1 and \
3957 not self.datasets_information[dataset_name] \
3958 ["mirrored"][run_number]:
3959
3960
3961
3962 msg = " Dataset `%s', run %d is spread across more " \
3963 "than one site.\n" \
3964 " Cannot run single-step harvesting on " \
3965 "samples spread across multiple sites" % \
3966 (dataset_name, run_number)
3967 try:
3968 dataset_names_after_checks[dataset_name].remove(run_number)
3969 except KeyError:
3970 pass
3971 self.logger.warning("%s " \
3972 "--> skipping" % msg)
3973
3974
3975
3976
3977
3978
3979
3980
3981 tmp = [j for (i, j) in self.datasets_information \
3982 [dataset_name]["num_events"].items() \
3983 if i in self.datasets_to_use[dataset_name]]
3984 num_events_dataset = sum(tmp)
3985
3986 if num_events_dataset < 1:
3987 msg = " dataset `%s' is empty" % dataset_name
3988 del dataset_names_after_checks[dataset_name]
3989 self.logger.warning("%s " \
3990 "--> skipping" % msg)
3991
3992
3993
3994
3995
3996
3997
3998
3999
4000 continue
4001
4002 tmp = [i for i in \
4003 self.datasets_information[dataset_name] \
4004 ["num_events"].items() if i[1] < 1]
4005 tmp = [i for i in tmp if i[0] in self.datasets_to_use[dataset_name]]
4006 empty_runs = dict(tmp)
4007 if len(empty_runs) > 0:
4008 for empty_run in empty_runs:
4009 try:
4010 dataset_names_after_checks[dataset_name].remove(empty_run)
4011 except KeyError:
4012 pass
4013 self.logger.info(" removed %d empty run(s) from dataset `%s'" % \
4014 (len(empty_runs), dataset_name))
4015 self.logger.debug(" (%s)" % \
4016 ", ".join([str(i) for i in empty_runs]))
4017
4018
4019
4020
4021
4022 dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4023 for (dataset_name, runs) in dataset_names_after_checks.items():
4024 if len(runs) < 1:
4025 self.logger.warning(" Removing dataset without any runs " \
4026 "(left) `%s'" % \
4027 dataset_name)
4028 del dataset_names_after_checks_tmp[dataset_name]
4029 dataset_names_after_checks = dataset_names_after_checks_tmp
4030
4031
4032
4033 self.logger.warning(" --> Removed %d dataset(s)" % \
4034 (len(self.datasets_to_use) -
4035 len(dataset_names_after_checks)))
4036
4037
4038 self.datasets_to_use = dataset_names_after_checks
4039
4040
4041
4042
4043
4044 def escape_dataset_name(self, dataset_name):
4045 """Escape a DBS dataset name.
4046
4047 Escape a DBS dataset name such that it does not cause trouble
4048 with the file system. This means turning each `/' into `__',
4049 except for the first one which is just removed.
4050
4051 """
4052
4053 escaped_dataset_name = dataset_name
4054 escaped_dataset_name = escaped_dataset_name.strip("/")
4055 escaped_dataset_name = escaped_dataset_name.replace("/", "__")
4056
4057 return escaped_dataset_name
4058
4059
4060
4061
4062
4063 def create_config_file_name(self, dataset_name, run_number):
4064 """Generate the name of the configuration file to be run by
4065 CRAB.
4066
4067 Depending on the harvesting mode (single-step or two-step)
4068 this is the name of the real harvesting configuration or the
4069 name of the first-step ME summary extraction configuration.
4070
4071 """
4072
4073 if self.harvesting_mode == "single-step":
4074 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4075 elif self.harvesting_mode == "single-step-allow-partial":
4076 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4077
4078
4079
4080
4081
4082
4083 elif self.harvesting_mode == "two-step":
4084 config_file_name = self.create_me_summary_config_file_name(dataset_name)
4085 else:
4086 assert False, "ERROR Unknown harvesting mode `%s'" % \
4087 self.harvesting_mode
4088
4089
4090 return config_file_name
4091
4092
4093
4094
4095 def create_harvesting_config_file_name(self, dataset_name):
4096 "Generate the name to be used for the harvesting config file."
4097
4098 file_name_base = "harvesting.py"
4099 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4100 config_file_name = file_name_base.replace(".py",
4101 "_%s.py" % \
4102 dataset_name_escaped)
4103
4104
4105 return config_file_name
4106
4107
4108
4109 def create_me_summary_config_file_name(self, dataset_name):
4110 "Generate the name of the ME summary extraction config file."
4111
4112 file_name_base = "me_extraction.py"
4113 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4114 config_file_name = file_name_base.replace(".py",
4115 "_%s.py" % \
4116 dataset_name_escaped)
4117
4118
4119 return config_file_name
4120
4121
4122
4123 def create_output_file_name(self, dataset_name, run_number=None):
4124 """Create the name of the output file name to be used.
4125
4126 This is the name of the output file of the `first step'. In
4127 the case of single-step harvesting this is already the final
4128 harvesting output ROOT file. In the case of two-step
4129 harvesting it is the name of the intermediary ME summary
4130 file.
4131
4132 """
4133
4134
4135
4136
4137
4138
4139
4140
4141
4142 if self.harvesting_mode == "single-step":
4143
4144 assert not run_number is None
4145
4146 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4147 elif self.harvesting_mode == "single-step-allow-partial":
4148
4149 assert not run_number is None
4150
4151 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4152 elif self.harvesting_mode == "two-step":
4153
4154 assert run_number is None
4155
4156 output_file_name = self.create_me_summary_output_file_name(dataset_name)
4157 else:
4158
4159 assert False, "ERROR Unknown harvesting mode `%s'" % \
4160 self.harvesting_mode
4161
4162
4163 return output_file_name
4164
4165
4166
4167 def create_harvesting_output_file_name(self, dataset_name, run_number):
4168 """Generate the name to be used for the harvesting output file.
4169
4170 This harvesting output file is the _final_ ROOT output file
4171 containing the harvesting results. In case of two-step
4172 harvesting there is an intermediate ME output file as well.
4173
4174 """
4175
4176 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4177
4178
4179
4180
4181
4182
4183
4184 output_file_name = "DQM_V0001_R%09d__%s.root" % \
4185 (run_number, dataset_name_escaped)
4186 if self.harvesting_mode.find("partial") > -1:
4187
4188
4189 if self.datasets_information[dataset_name] \
4190 ["mirrored"][run_number] == False:
4191 output_file_name = output_file_name.replace(".root", \
4192 "_partial.root")
4193
4194
4195 return output_file_name
4196
4197
4198
4199 def create_me_summary_output_file_name(self, dataset_name):
4200 """Generate the name of the intermediate ME file name to be
4201 used in two-step harvesting.
4202
4203 """
4204
4205 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4206 output_file_name = "me_summary_%s.root" % \
4207 dataset_name_escaped
4208
4209
4210 return output_file_name
4211
4212
4213
4214 def create_multicrab_block_name(self, dataset_name, run_number, index):
4215 """Create the block name to use for this dataset/run number.
4216
4217 This is what appears in the brackets `[]' in multicrab.cfg. It
4218 is used as the name of the job and to create output
4219 directories.
4220
4221 """
4222
4223 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4224 block_name = "%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4225
4226
4227 return block_name
4228
4229
4230
4231 def create_crab_config(self):
4232 """Create a CRAB configuration for a given job.
4233
4234 NOTE: This is _not_ a complete (as in: submittable) CRAB
4235 configuration. It is used to store the common settings for the
4236 multicrab configuration.
4237
4238 NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4239
4240 NOTE: According to CRAB, you `Must define exactly two of
4241 total_number_of_events, events_per_job, or
4242 number_of_jobs.'. For single-step harvesting we force one job,
4243 for the rest we don't really care.
4244
4245 # BUG BUG BUG
4246 # With the current version of CRAB (2.6.1), in which Daniele
4247 # fixed the behaviour of no_block_boundary for me, one _has to
4248 # specify_ the total_number_of_events and one single site in
4249 # the se_white_list.
4250 # BUG BUG BUG end
4251
4252 """
4253
4254 tmp = []
4255
4256
4257 castor_prefix = self.castor_prefix
4258
4259 tmp.append(self.config_file_header())
4260 tmp.append("")
4261
4262
4263
4264 tmp.append("[CRAB]")
4265 tmp.append("jobtype = cmssw")
4266 tmp.append("")
4267
4268
4269
4270 tmp.append("[GRID]")
4271 tmp.append("virtual_organization=cms")
4272 tmp.append("")
4273
4274
4275
4276 tmp.append("[USER]")
4277 tmp.append("copy_data = 1")
4278 tmp.append("")
4279
4280
4281
4282 tmp.append("[CMSSW]")
4283 tmp.append("# This reveals data hosted on T1 sites,")
4284 tmp.append("# which is normally hidden by CRAB.")
4285 tmp.append("show_prod = 1")
4286 tmp.append("number_of_jobs = 1")
4287 if self.Jsonlumi == True:
4288 tmp.append("lumi_mask = %s" % self.Jsonfilename)
4289 tmp.append("total_number_of_lumis = -1")
4290 else:
4291 if self.harvesting_type == "DQMOffline":
4292 tmp.append("total_number_of_lumis = -1")
4293 else:
4294 tmp.append("total_number_of_events = -1")
4295 if self.harvesting_mode.find("single-step") > -1:
4296 tmp.append("# Force everything to run in one job.")
4297 tmp.append("no_block_boundary = 1")
4298 tmp.append("")
4299
4300
4301
4302 tmp.append("[CAF]")
4303
4304 crab_config = "\n".join(tmp)
4305
4306
4307 return crab_config
4308
4309
4310
4311 def create_multicrab_config(self):
4312 """Create a multicrab.cfg file for all samples.
4313
4314 This creates the contents for a multicrab.cfg file that uses
4315 the crab.cfg file (generated elsewhere) for the basic settings
4316 and contains blocks for each run of each dataset.
4317
4318 # BUG BUG BUG
4319 # The fact that it's necessary to specify the se_white_list
4320 # and the total_number_of_events is due to our use of CRAB
4321 # version 2.6.1. This should no longer be necessary in the
4322 # future.
4323 # BUG BUG BUG end
4324
4325 """
4326
4327 cmd="who i am | cut -f1 -d' '"
4328 (status, output)=subprocess.getstatusoutput(cmd)
4329 UserName = output
4330
4331 if self.caf_access == True:
4332 print("Extracting %s as user name" %UserName)
4333
4334 number_max_sites = self.nr_max_sites + 1
4335
4336 multicrab_config_lines = []
4337 multicrab_config_lines.append(self.config_file_header())
4338 multicrab_config_lines.append("")
4339 multicrab_config_lines.append("[MULTICRAB]")
4340 multicrab_config_lines.append("cfg = crab.cfg")
4341 multicrab_config_lines.append("")
4342
4343 dataset_names = sorted(self.datasets_to_use.keys())
4344
4345 for dataset_name in dataset_names:
4346 runs = self.datasets_to_use[dataset_name]
4347 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4348 castor_prefix = self.castor_prefix
4349
4350 for run in runs:
4351
4352
4353 castor_dir = self.datasets_information[dataset_name] \
4354 ["castor_path"][run]
4355
4356 cmd = "rfdir %s" % castor_dir
4357 (status, output) = subprocess.getstatusoutput(cmd)
4358
4359 if len(output) <= 0:
4360
4361
4362
4363
4364
4365 assert (len(self.datasets_information[dataset_name] \
4366 ["sites"][run]) == 1) or \
4367 self.datasets_information[dataset_name]["mirrored"]
4368
4369
4370 site_names = self.datasets_information[dataset_name] \
4371 ["sites"][run].keys()
4372
4373 for i in range(1, number_max_sites, 1):
4374 if len(site_names) > 0:
4375 index = "site_%02d" % (i)
4376
4377 config_file_name = self. \
4378 create_config_file_name(dataset_name, run)
4379 output_file_name = self. \
4380 create_output_file_name(dataset_name, run)
4381
4382
4383
4384
4385
4386
4387
4388 loop = 0
4389
4390 if len(site_names) > 1:
4391 cmssw_version = self.datasets_information[dataset_name] \
4392 ["cmssw_version"]
4393 self.logger.info("Picking site for mirrored dataset " \
4394 "`%s', run %d" % \
4395 (dataset_name, run))
4396 site_name = self.pick_a_site(site_names, cmssw_version)
4397 if site_name in site_names:
4398 site_names.remove(site_name)
4399
4400 else:
4401 site_name = site_names[0]
4402 site_names.remove(site_name)
4403
4404 if site_name is self.no_matching_site_found_str:
4405 if loop < 1:
4406 break
4407
4408 nevents = self.datasets_information[dataset_name]["num_events"][run]
4409
4410
4411 multicrab_block_name = self.create_multicrab_block_name( \
4412 dataset_name, run, index)
4413 multicrab_config_lines.append("[%s]" % \
4414 multicrab_block_name)
4415
4416
4417
4418 if site_name == "caf.cern.ch":
4419 multicrab_config_lines.append("CRAB.use_server=0")
4420 multicrab_config_lines.append("CRAB.scheduler=caf")
4421 else:
4422 multicrab_config_lines.append("scheduler = glite")
4423
4424
4425
4426 if site_name == "caf.cern.ch":
4427 pass
4428 else:
4429 multicrab_config_lines.append("GRID.se_white_list = %s" % \
4430 site_name)
4431 multicrab_config_lines.append("# This removes the default blacklisting of T1 sites.")
4432 multicrab_config_lines.append("GRID.remove_default_blacklist = 1")
4433 multicrab_config_lines.append("GRID.rb = CERN")
4434 if not self.non_t1access:
4435 multicrab_config_lines.append("GRID.role = t1access")
4436
4437
4438
4439
4440 castor_dir = castor_dir.replace(castor_prefix, "")
4441 multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4442 multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4443 castor_dir)
4444 multicrab_config_lines.append("USER.check_user_remote_dir=0")
4445
4446 if site_name == "caf.cern.ch":
4447 multicrab_config_lines.append("USER.storage_path=%s" % castor_prefix)
4448
4449
4450
4451
4452 else:
4453 multicrab_config_lines.append("USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4454
4455
4456
4457
4458
4459
4460 multicrab_config_lines.append("CMSSW.pset = %s" % \
4461 config_file_name)
4462 multicrab_config_lines.append("CMSSW.datasetpath = %s" % \
4463 dataset_name)
4464 multicrab_config_lines.append("CMSSW.runselection = %d" % \
4465 run)
4466
4467 if self.Jsonlumi == True:
4468 pass
4469 else:
4470 if self.harvesting_type == "DQMOffline":
4471 pass
4472 else:
4473 multicrab_config_lines.append("CMSSW.total_number_of_events = %d" % \
4474 nevents)
4475
4476 multicrab_config_lines.append("CMSSW.output_file = %s" % \
4477 output_file_name)
4478
4479
4480
4481 if site_name == "caf.cern.ch":
4482 multicrab_config_lines.append("CAF.queue=cmscaf1nd")
4483
4484
4485
4486 multicrab_config_lines.append("")
4487
4488 loop = loop + 1
4489
4490 self.all_sites_found = True
4491
4492 multicrab_config = "\n".join(multicrab_config_lines)
4493
4494
4495 return multicrab_config
4496
4497
4498
4499 def check_globaltag(self, globaltag=None):
4500 """Check if globaltag exists.
4501
4502 Check if globaltag exists as GlobalTag in the database given
4503 by self.frontier_connection_name['globaltag']. If globaltag is
4504 None, self.globaltag is used instead.
4505
4506 If we're going to use reference histograms this method also
4507 checks for the existence of the required key in the GlobalTag.
4508
4509 """
4510
4511 if globaltag is None:
4512 globaltag = self.globaltag
4513
4514
4515 if globaltag.endswith("::All"):
4516 globaltag = globaltag[:-5]
4517
4518 connect_name = self.frontier_connection_name["globaltag"]
4519
4520
4521
4522
4523
4524
4525
4526 connect_name = connect_name.replace("frontier://",
4527 "frontier://cmsfrontier:8000/")
4528
4529 connect_name += self.db_account_name_cms_cond_globaltag()
4530
4531 tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4532
4533
4534
4535 tag_contains_ref_hist_key = False
4536 if self.use_ref_hists and tag_exists:
4537
4538 tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4539
4540
4541
4542 if self.use_ref_hists:
4543 ret_val = tag_exists and tag_contains_ref_hist_key
4544 else:
4545 ret_val = tag_exists
4546
4547
4548
4549
4550 return ret_val
4551
4552
4553
4554 def check_globaltag_exists(self, globaltag, connect_name):
4555 """Check if globaltag exists.
4556
4557 """
4558
4559 self.logger.info("Checking existence of GlobalTag `%s'" % \
4560 globaltag)
4561 self.logger.debug(" (Using database connection `%s')" % \
4562 connect_name)
4563
4564 cmd = "cmscond_tagtree_list -c %s -T %s" % \
4565 (connect_name, globaltag)
4566 (status, output) = subprocess.getstatusoutput(cmd)
4567 if status != 0 or \
4568 output.find("error") > -1:
4569 msg = "Could not check existence of GlobalTag `%s' in `%s'" % \
4570 (globaltag, connect_name)
4571 if output.find(".ALL_TABLES not found") > -1:
4572 msg = "%s\n" \
4573 "Missing database account `%s'" % \
4574 (msg, output.split(".ALL_TABLES")[0].split()[-1])
4575 self.logger.fatal(msg)
4576 self.logger.debug("Command used:")
4577 self.logger.debug(" %s" % cmd)
4578 self.logger.debug("Output received:")
4579 self.logger.debug(output)
4580 raise Error(msg)
4581 if output.find("does not exist") > -1:
4582 self.logger.debug("GlobalTag `%s' does not exist in `%s':" % \
4583 (globaltag, connect_name))
4584 self.logger.debug("Output received:")
4585 self.logger.debug(output)
4586 tag_exists = False
4587 else:
4588 tag_exists = True
4589 self.logger.info(" GlobalTag exists? -> %s" % tag_exists)
4590
4591
4592 return tag_exists
4593
4594
4595
4596 def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name):
4597 """Check if globaltag contains the required RefHistos key.
4598
4599 """
4600
4601
4602 tag_contains_key = None
4603 ref_hist_key = "RefHistos"
4604 self.logger.info("Checking existence of reference " \
4605 "histogram key `%s' in GlobalTag `%s'" % \
4606 (ref_hist_key, globaltag))
4607 self.logger.debug(" (Using database connection `%s')" % \
4608 connect_name)
4609 cmd = "cmscond_tagtree_list -c %s -T %s -n %s" % \
4610 (connect_name, globaltag, ref_hist_key)
4611 (status, output) = subprocess.getstatusoutput(cmd)
4612 if status != 0 or \
4613 output.find("error") > -1:
4614 msg = "Could not check existence of key `%s'" % \
4615 (ref_hist_key, connect_name)
4616 self.logger.fatal(msg)
4617 self.logger.debug("Command used:")
4618 self.logger.debug(" %s" % cmd)
4619 self.logger.debug("Output received:")
4620 self.logger.debug(" %s" % output)
4621 raise Error(msg)
4622 if len(output) < 1:
4623 self.logger.debug("Required key for use of reference " \
4624 "histograms `%s' does not exist " \
4625 "in GlobalTag `%s':" % \
4626 (ref_hist_key, globaltag))
4627 self.logger.debug("Output received:")
4628 self.logger.debug(output)
4629 tag_contains_key = False
4630 else:
4631 tag_contains_key = True
4632
4633 self.logger.info(" GlobalTag contains `%s' key? -> %s" % \
4634 (ref_hist_key, tag_contains_key))
4635
4636
4637 return tag_contains_key
4638
4639
4640
4641 def check_ref_hist_tag(self, tag_name):
4642 """Check the existence of tag_name in database connect_name.
4643
4644 Check if tag_name exists as a reference histogram tag in the
4645 database given by self.frontier_connection_name['refhists'].
4646
4647 """
4648
4649 connect_name = self.frontier_connection_name["refhists"]
4650 connect_name += self.db_account_name_cms_cond_dqm_summary()
4651
4652 self.logger.debug("Checking existence of reference " \
4653 "histogram tag `%s'" % \
4654 tag_name)
4655 self.logger.debug(" (Using database connection `%s')" % \
4656 connect_name)
4657
4658 cmd = "cmscond_list_iov -c %s" % \
4659 connect_name
4660 (status, output) = subprocess.getstatusoutput(cmd)
4661 if status != 0:
4662 msg = "Could not check existence of tag `%s' in `%s'" % \
4663 (tag_name, connect_name)
4664 self.logger.fatal(msg)
4665 self.logger.debug("Command used:")
4666 self.logger.debug(" %s" % cmd)
4667 self.logger.debug("Output received:")
4668 self.logger.debug(output)
4669 raise Error(msg)
4670 if not tag_name in output.split():
4671 self.logger.debug("Reference histogram tag `%s' " \
4672 "does not exist in `%s'" % \
4673 (tag_name, connect_name))
4674 self.logger.debug(" Existing tags: `%s'" % \
4675 "', `".join(output.split()))
4676 tag_exists = False
4677 else:
4678 tag_exists = True
4679 self.logger.debug(" Reference histogram tag exists? " \
4680 "-> %s" % tag_exists)
4681
4682
4683 return tag_exists
4684
4685
4686
4687 def create_harvesting_config(self, dataset_name):
4688 """Create the Python harvesting configuration for harvesting.
4689
4690 The basic configuration is created by
4691 Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4692 what cmsDriver.py does.) After that we add some specials
4693 ourselves.
4694
4695 NOTE: On one hand it may not be nice to circumvent
4696 cmsDriver.py, on the other hand cmsDriver.py does not really
4697 do anything itself. All the real work is done by the
4698 ConfigBuilder so there is not much risk that we miss out on
4699 essential developments of cmsDriver in the future.
4700
4701 """
4702
4703
4704 config_options = defaultOptions
4705
4706
4707
4708
4709 config_options.name = "harvesting"
4710 config_options.scenario = "pp"
4711 config_options.number = 1
4712 config_options.arguments = self.ident_string()
4713 config_options.evt_type = config_options.name
4714 config_options.customisation_file = None
4715 config_options.filein = "dummy_value"
4716 config_options.filetype = "EDM"
4717
4718 config_options.gflash = "dummy_value"
4719
4720
4721
4722 config_options.dbsquery = ""
4723
4724
4725
4726
4727
4728
4729 config_options.step = "HARVESTING:%s" % \
4730 self.harvesting_info[self.harvesting_type] \
4731 ["step_string"]
4732 config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4733 ["beamspot"]
4734 config_options.eventcontent = self.harvesting_info \
4735 [self.harvesting_type] \
4736 ["eventcontent"]
4737 config_options.harvesting = self.harvesting_info \
4738 [self.harvesting_type] \
4739 ["harvesting"]
4740
4741
4742
4743
4744
4745 datatype = self.datasets_information[dataset_name]["datatype"]
4746 config_options.isMC = (datatype.lower() == "mc")
4747 config_options.isData = (datatype.lower() == "data")
4748 globaltag = self.datasets_information[dataset_name]["globaltag"]
4749
4750 config_options.conditions = self.format_conditions_string(globaltag)
4751
4752
4753
4754 if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4755
4756 config_builder = ConfigBuilder(config_options, with_input=True)
4757 else:
4758
4759 config_builder = ConfigBuilder(config_options)
4760 config_builder.prepare(True)
4761 config_contents = config_builder.pythonCfgCode
4762
4763
4764
4765
4766
4767
4768 marker_lines = []
4769 sep = "#" * 30
4770 marker_lines.append(sep)
4771 marker_lines.append("# Code between these markers was generated by")
4772 marker_lines.append("# Configuration.PyReleaseValidation." \
4773 "ConfigBuilder")
4774
4775 marker_lines.append(sep)
4776 marker = "\n".join(marker_lines)
4777
4778 tmp = [self.config_file_header()]
4779 tmp.append("")
4780 tmp.append(marker)
4781 tmp.append("")
4782 tmp.append(config_contents)
4783 tmp.append("")
4784 tmp.append(marker)
4785 tmp.append("")
4786 config_contents = "\n".join(tmp)
4787
4788
4789
4790
4791 customisations = [""]
4792
4793 customisations.append("# Now follow some customisations")
4794 customisations.append("")
4795 connect_name = self.frontier_connection_name["globaltag"]
4796 connect_name += self.db_account_name_cms_cond_globaltag()
4797 customisations.append("process.GlobalTag.connect = \"%s\"" % \
4798 connect_name)
4799
4800
4801 if self.saveByLumiSection == True:
4802 customisations.append("process.dqmSaver.saveByLumiSection = 1")
4803
4804
4805
4806 customisations.append("")
4807
4808
4809
4810
4811
4812
4813
4814
4815
4816
4817
4818
4819
4820 use_es_prefer = (self.harvesting_type == "RelVal")
4821 use_refs = use_es_prefer or \
4822 (not self.harvesting_type == "MC")
4823
4824 use_refs = use_refs and self.use_ref_hists
4825
4826 if not use_refs:
4827
4828
4829
4830
4831
4832 customisations.append("print \"Not using reference histograms\"")
4833 customisations.append("if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4834 customisations.append(" for (sequence_name, sequence) in process.sequences.items():")
4835 customisations.append(" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4836 customisations.append(" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4837 customisations.append(" sequence_name")
4838 customisations.append("process.dqmSaver.referenceHandling = \"skip\"")
4839 else:
4840
4841
4842 customisations.append("process.dqmSaver.referenceHandling = \"all\"")
4843 if use_es_prefer:
4844 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4845 customisations.append(es_prefer_snippet)
4846
4847
4848
4849 workflow_name = dataset_name
4850 if self.harvesting_mode == "single-step-allow-partial":
4851 workflow_name += "_partial"
4852 customisations.append("process.dqmSaver.workflow = \"%s\"" % \
4853 workflow_name)
4854
4855
4856
4857
4858
4859
4860
4861
4862
4863
4864
4865
4866
4867
4868
4869
4870
4871
4872
4873
4874
4875
4876
4877
4878
4879
4880
4881
4882
4883
4884
4885
4886
4887
4888
4889 config_contents = config_contents + "\n".join(customisations)
4890
4891
4892
4893
4894 return config_contents
4895
4896
4897
4898
4899
4900
4901
4902
4903
4904
4905
4906
4907
4908
4909
4910
4911
4912
4913 def create_me_extraction_config(self, dataset_name):
4914 """
4915
4916 """
4917
4918
4919
4920 tmp = []
4921 tmp.append(self.config_file_header())
4922 tmp.append("")
4923 tmp.append("import FWCore.ParameterSet.Config as cms")
4924 tmp.append("")
4925 tmp.append("process = cms.Process(\"ME2EDM\")")
4926 tmp.append("")
4927 tmp.append("# Import of standard configurations")
4928 tmp.append("process.load(\"Configuration/EventContent/EventContent_cff\")")
4929 tmp.append("")
4930 tmp.append("# We don't really process any events, just keep this set to one to")
4931 tmp.append("# make sure things work.")
4932 tmp.append("process.maxEvents = cms.untracked.PSet(")
4933 tmp.append(" input = cms.untracked.int32(1)")
4934 tmp.append(" )")
4935 tmp.append("")
4936 tmp.append("process.options = cms.untracked.PSet(")
4937 tmp.append(" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4938 tmp.append(" )")
4939 tmp.append("")
4940 tmp.append("process.source = cms.Source(\"PoolSource\",")
4941 tmp.append(" processingMode = \\")
4942 tmp.append(" cms.untracked.string(\"RunsAndLumis\"),")
4943 tmp.append(" fileNames = \\")
4944 tmp.append(" cms.untracked.vstring(\"no_file_specified\")")
4945 tmp.append(" )")
4946 tmp.append("")
4947 tmp.append("# Output definition: drop everything except for the monitoring.")
4948 tmp.append("process.output = cms.OutputModule(")
4949 tmp.append(" \"PoolOutputModule\",")
4950 tmp.append(" outputCommands = \\")
4951 tmp.append(" cms.untracked.vstring(\"drop *\", \\")
4952 tmp.append(" \"keep *_MEtoEDMConverter_*_*\"),")
4953 output_file_name = self. \
4954 create_output_file_name(dataset_name)
4955 tmp.append(" fileName = \\")
4956 tmp.append(" cms.untracked.string(\"%s\")," % output_file_name)
4957 tmp.append(" dataset = cms.untracked.PSet(")
4958 tmp.append(" dataTier = cms.untracked.string(\"RECO\"),")
4959 tmp.append(" filterName = cms.untracked.string(\"\")")
4960 tmp.append(" )")
4961 tmp.append(" )")
4962 tmp.append("")
4963 tmp.append("# Additional output definition")
4964 tmp.append("process.out_step = cms.EndPath(process.output)")
4965 tmp.append("")
4966 tmp.append("# Schedule definition")
4967 tmp.append("process.schedule = cms.Schedule(process.out_step)")
4968 tmp.append("")
4969
4970 config_contents = "\n".join(tmp)
4971
4972
4973 return config_contents
4974
4975
4976
4977
4978
4979
4980
4981
4982
4983
4984
4985
4986
4987
4988
4989
4990
4991
4992
4993
4994
4995
4996
4997
4998
4999
5000
5001
5002
5003
5004
5005
5006
5007
5008
5009
5010 def write_crab_config(self):
5011 """Write a CRAB job configuration Python file.
5012
5013 """
5014
5015 self.logger.info("Writing CRAB configuration...")
5016
5017 file_name_base = "crab.cfg"
5018
5019
5020 crab_contents = self.create_crab_config()
5021
5022
5023 crab_file_name = file_name_base
5024 try:
5025 crab_file = file(crab_file_name, "w")
5026 crab_file.write(crab_contents)
5027 crab_file.close()
5028 except IOError:
5029 self.logger.fatal("Could not write " \
5030 "CRAB configuration to file `%s'" % \
5031 crab_file_name)
5032 raise Error("ERROR: Could not write to file `%s'!" % \
5033 crab_file_name)
5034
5035
5036
5037
5038
5039 def write_multicrab_config(self):
5040 """Write a multi-CRAB job configuration Python file.
5041
5042 """
5043
5044 self.logger.info("Writing multi-CRAB configuration...")
5045
5046 file_name_base = "multicrab.cfg"
5047
5048
5049 multicrab_contents = self.create_multicrab_config()
5050
5051
5052 multicrab_file_name = file_name_base
5053 try:
5054 multicrab_file = file(multicrab_file_name, "w")
5055 multicrab_file.write(multicrab_contents)
5056 multicrab_file.close()
5057 except IOError:
5058 self.logger.fatal("Could not write " \
5059 "multi-CRAB configuration to file `%s'" % \
5060 multicrab_file_name)
5061 raise Error("ERROR: Could not write to file `%s'!" % \
5062 multicrab_file_name)
5063
5064
5065
5066
5067
5068 def write_harvesting_config(self, dataset_name):
5069 """Write a harvesting job configuration Python file.
5070
5071 NOTE: This knows nothing about single-step or two-step
5072 harvesting. That's all taken care of by
5073 create_harvesting_config.
5074
5075 """
5076
5077 self.logger.debug("Writing harvesting configuration for `%s'..." % \
5078 dataset_name)
5079
5080
5081 config_contents = self.create_harvesting_config(dataset_name)
5082
5083
5084 config_file_name = self. \
5085 create_harvesting_config_file_name(dataset_name)
5086 try:
5087 config_file = file(config_file_name, "w")
5088 config_file.write(config_contents)
5089 config_file.close()
5090 except IOError:
5091 self.logger.fatal("Could not write " \
5092 "harvesting configuration to file `%s'" % \
5093 config_file_name)
5094 raise Error("ERROR: Could not write to file `%s'!" % \
5095 config_file_name)
5096
5097
5098
5099
5100
5101 def write_me_extraction_config(self, dataset_name):
5102 """Write an ME-extraction configuration Python file.
5103
5104 This `ME-extraction' (ME = Monitoring Element) is the first
5105 step of the two-step harvesting.
5106
5107 """
5108
5109 self.logger.debug("Writing ME-extraction configuration for `%s'..." % \
5110 dataset_name)
5111
5112
5113 config_contents = self.create_me_extraction_config(dataset_name)
5114
5115
5116 config_file_name = self. \
5117 create_me_summary_config_file_name(dataset_name)
5118 try:
5119 config_file = file(config_file_name, "w")
5120 config_file.write(config_contents)
5121 config_file.close()
5122 except IOError:
5123 self.logger.fatal("Could not write " \
5124 "ME-extraction configuration to file `%s'" % \
5125 config_file_name)
5126 raise Error("ERROR: Could not write to file `%s'!" % \
5127 config_file_name)
5128
5129
5130
5131
5132
5133
5134 def ref_hist_mappings_needed(self, dataset_name=None):
5135 """Check if we need to load and check the reference mappings.
5136
5137 For data the reference histograms should be taken
5138 automatically from the GlobalTag, so we don't need any
5139 mappings. For RelVals we need to know a mapping to be used in
5140 the es_prefer code snippet (different references for each of
5141 the datasets.)
5142
5143 WARNING: This implementation is a bit convoluted.
5144
5145 """
5146
5147
5148
5149 if not dataset_name is None:
5150 data_type = self.datasets_information[dataset_name] \
5151 ["datatype"]
5152 mappings_needed = (data_type == "mc")
5153
5154 if not mappings_needed:
5155 assert data_type == "data"
5156
5157 else:
5158 tmp = [self.ref_hist_mappings_needed(dataset_name) \
5159 for dataset_name in \
5160 self.datasets_information.keys()]
5161 mappings_needed = (True in tmp)
5162
5163
5164 return mappings_needed
5165
5166
5167
5168 def load_ref_hist_mappings(self):
5169 """Load the reference histogram mappings from file.
5170
5171 The dataset name to reference histogram name mappings are read
5172 from a text file specified in self.ref_hist_mappings_file_name.
5173
5174 """
5175
5176
5177 assert len(self.ref_hist_mappings) < 1, \
5178 "ERROR Should not be RE-loading " \
5179 "reference histogram mappings!"
5180
5181
5182 self.logger.info("Loading reference histogram mappings " \
5183 "from file `%s'" % \
5184 self.ref_hist_mappings_file_name)
5185
5186 mappings_lines = None
5187 try:
5188 mappings_file = file(self.ref_hist_mappings_file_name, "r")
5189 mappings_lines = mappings_file.readlines()
5190 mappings_file.close()
5191 except IOError:
5192 msg = "ERROR: Could not open reference histogram mapping "\
5193 "file `%s'" % self.ref_hist_mappings_file_name
5194 self.logger.fatal(msg)
5195 raise Error(msg)
5196
5197
5198
5199
5200
5201
5202
5203
5204 for mapping in mappings_lines:
5205
5206 if not mapping.startswith("#"):
5207 mapping = mapping.strip()
5208 if len(mapping) > 0:
5209 mapping_pieces = mapping.split()
5210 if len(mapping_pieces) != 2:
5211 msg = "ERROR: The reference histogram mapping " \
5212 "file contains a line I don't " \
5213 "understand:\n %s" % mapping
5214 self.logger.fatal(msg)
5215 raise Error(msg)
5216 dataset_name = mapping_pieces[0].strip()
5217 ref_hist_name = mapping_pieces[1].strip()
5218
5219
5220
5221 if dataset_name in self.ref_hist_mappings:
5222 msg = "ERROR: The reference histogram mapping " \
5223 "file contains multiple mappings for " \
5224 "dataset `%s'."
5225 self.logger.fatal(msg)
5226 raise Error(msg)
5227
5228
5229 self.ref_hist_mappings[dataset_name] = ref_hist_name
5230
5231
5232
5233 self.logger.info(" Successfully loaded %d mapping(s)" % \
5234 len(self.ref_hist_mappings))
5235 max_len = max([len(i) for i in self.ref_hist_mappings.keys()])
5236 for (map_from, map_to) in self.ref_hist_mappings.items():
5237 self.logger.info(" %-*s -> %s" % \
5238 (max_len, map_from, map_to))
5239
5240
5241
5242
5243
5244 def check_ref_hist_mappings(self):
5245 """Make sure all necessary reference histograms exist.
5246
5247 Check that for each of the datasets to be processed a
5248 reference histogram is specified and that that histogram
5249 exists in the database.
5250
5251 NOTE: There's a little complication here. Since this whole
5252 thing was designed to allow (in principle) harvesting of both
5253 data and MC datasets in one go, we need to be careful to check
5254 the availability fof reference mappings only for those
5255 datasets that need it.
5256
5257 """
5258
5259 self.logger.info("Checking reference histogram mappings")
5260
5261 for dataset_name in self.datasets_to_use:
5262 try:
5263 ref_hist_name = self.ref_hist_mappings[dataset_name]
5264 except KeyError:
5265 msg = "ERROR: No reference histogram mapping found " \
5266 "for dataset `%s'" % \
5267 dataset_name
5268 self.logger.fatal(msg)
5269 raise Error(msg)
5270
5271 if not self.check_ref_hist_tag(ref_hist_name):
5272 msg = "Reference histogram tag `%s' " \
5273 "(used for dataset `%s') does not exist!" % \
5274 (ref_hist_name, dataset_name)
5275 self.logger.fatal(msg)
5276 raise Usage(msg)
5277
5278 self.logger.info(" Done checking reference histogram mappings.")
5279
5280
5281
5282
5283
5284 def build_datasets_information(self):
5285 """Obtain all information on the datasets that we need to run.
5286
5287 Use DBS to figure out all required information on our
5288 datasets, like the run numbers and the GlobalTag. All
5289 information is stored in the datasets_information member
5290 variable.
5291
5292 """
5293
5294
5295
5296
5297
5298
5299
5300
5301
5302
5303
5304 self.datasets_information = {}
5305 self.logger.info("Collecting information for all datasets to process")
5306 dataset_names = sorted(self.datasets_to_use.keys())
5307 for dataset_name in dataset_names:
5308
5309
5310 sep_line = "-" * 30
5311 self.logger.info(sep_line)
5312 self.logger.info(" `%s'" % dataset_name)
5313 self.logger.info(sep_line)
5314
5315 runs = self.dbs_resolve_runs(dataset_name)
5316 self.logger.info(" found %d run(s)" % len(runs))
5317 if len(runs) > 0:
5318 self.logger.debug(" run number(s): %s" % \
5319 ", ".join([str(i) for i in runs]))
5320 else:
5321
5322
5323 self.logger.warning(" --> skipping dataset "
5324 "without any runs")
5325 assert False, "Panic: found a dataset without runs " \
5326 "after DBS checks!"
5327
5328
5329 cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5330 self.logger.info(" found CMSSW version `%s'" % cmssw_version)
5331
5332
5333 datatype = self.dbs_resolve_datatype(dataset_name)
5334 self.logger.info(" sample is data or MC? --> %s" % \
5335 datatype)
5336
5337
5338
5339
5340 if self.globaltag is None:
5341 globaltag = self.dbs_resolve_globaltag(dataset_name)
5342 else:
5343 globaltag = self.globaltag
5344
5345 self.logger.info(" found GlobalTag `%s'" % globaltag)
5346
5347
5348 if globaltag == "":
5349
5350
5351 assert datatype == "data", \
5352 "ERROR Empty GlobalTag for MC dataset!!!"
5353
5354
5355
5356
5357
5358
5359
5360 sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5361
5362
5363 num_events = {}
5364 for run_number in sites_catalog.keys():
5365 num_events[run_number] = sites_catalog \
5366 [run_number]["all_sites"]
5367 del sites_catalog[run_number]["all_sites"]
5368
5369
5370
5371 mirror_catalog = {}
5372 for run_number in sites_catalog.keys():
5373 mirror_catalog[run_number] = sites_catalog \
5374 [run_number]["mirrored"]
5375 del sites_catalog[run_number]["mirrored"]
5376
5377
5378
5379
5380
5381
5382
5383
5384
5385
5386
5387
5388
5389
5390
5391
5392
5393
5394
5395
5396
5397
5398
5399
5400
5401
5402
5403
5404 self.datasets_information[dataset_name] = {}
5405 self.datasets_information[dataset_name]["runs"] = runs
5406 self.datasets_information[dataset_name]["cmssw_version"] = \
5407 cmssw_version
5408 self.datasets_information[dataset_name]["globaltag"] = globaltag
5409 self.datasets_information[dataset_name]["datatype"] = datatype
5410 self.datasets_information[dataset_name]["num_events"] = num_events
5411 self.datasets_information[dataset_name]["mirrored"] = mirror_catalog
5412 self.datasets_information[dataset_name]["sites"] = sites_catalog
5413
5414
5415
5416 castor_path_common = self.create_castor_path_name_common(dataset_name)
5417 self.logger.info(" output will go into `%s'" % \
5418 castor_path_common)
5419
5420 castor_paths = dict(list(zip(runs,
5421 [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5422 for i in runs])))
5423 for path_name in castor_paths.values():
5424 self.logger.debug(" %s" % path_name)
5425 self.datasets_information[dataset_name]["castor_path"] = \
5426 castor_paths
5427
5428
5429
5430
5431
5432 def show_exit_message(self):
5433 """Tell the user what to do now, after this part is done.
5434
5435 This should provide the user with some (preferably
5436 copy-pasteable) instructions on what to do now with the setups
5437 and files that have been created.
5438
5439 """
5440
5441
5442
5443
5444
5445 sep_line = "-" * 60
5446
5447 self.logger.info("")
5448 self.logger.info(sep_line)
5449 self.logger.info(" Configuration files have been created.")
5450 self.logger.info(" From here on please follow the usual CRAB instructions.")
5451 self.logger.info(" Quick copy-paste instructions are shown below.")
5452 self.logger.info(sep_line)
5453
5454 self.logger.info("")
5455 self.logger.info(" Create all CRAB jobs:")
5456 self.logger.info(" multicrab -create")
5457 self.logger.info("")
5458 self.logger.info(" Submit all CRAB jobs:")
5459 self.logger.info(" multicrab -submit")
5460 self.logger.info("")
5461 self.logger.info(" Check CRAB status:")
5462 self.logger.info(" multicrab -status")
5463 self.logger.info("")
5464
5465 self.logger.info("")
5466 self.logger.info(" For more information please see the CMS Twiki:")
5467 self.logger.info(" %s" % twiki_url)
5468 self.logger.info(sep_line)
5469
5470
5471
5472 if not self.all_sites_found:
5473 self.logger.warning(" For some of the jobs no matching " \
5474 "site could be found")
5475 self.logger.warning(" --> please scan your multicrab.cfg" \
5476 "for occurrences of `%s'." % \
5477 self.no_matching_site_found_str)
5478 self.logger.warning(" You will have to fix those " \
5479 "by hand, sorry.")
5480
5481
5482
5483
5484
5485 def run(self):
5486 "Main entry point of the CMS harvester."
5487
5488
5489 exit_code = 0
5490
5491 try:
5492
5493 try:
5494
5495
5496 self.parse_cmd_line_options()
5497
5498 self.check_input_status()
5499
5500
5501 self.check_cmssw()
5502
5503
5504 self.check_dbs()
5505
5506 self.setup_dbs()
5507
5508
5509
5510
5511 self.setup_harvesting_info()
5512
5513
5514 self.build_dataset_use_list()
5515
5516 self.build_dataset_ignore_list()
5517
5518
5519 self.build_runs_use_list()
5520 self.build_runs_ignore_list()
5521
5522
5523
5524
5525
5526
5527 self.process_dataset_ignore_list()
5528
5529
5530
5531 self.build_datasets_information()
5532
5533 if self.use_ref_hists and \
5534 self.ref_hist_mappings_needed():
5535
5536
5537 self.load_ref_hist_mappings()
5538
5539
5540
5541 self.check_ref_hist_mappings()
5542 else:
5543 self.logger.info("No need to load reference " \
5544 "histogram mappings file")
5545
5546
5547
5548
5549
5550
5551
5552
5553
5554
5555
5556
5557
5558
5559 self.process_runs_use_and_ignore_lists()
5560
5561
5562
5563
5564 if self.harvesting_mode == "single-step-allow-partial":
5565 self.singlify_datasets()
5566
5567
5568 self.check_dataset_list()
5569
5570 if len(self.datasets_to_use) < 1:
5571 self.logger.info("After all checks etc. " \
5572 "there are no datasets (left?) " \
5573 "to process")
5574 else:
5575
5576 self.logger.info("After all checks etc. we are left " \
5577 "with %d dataset(s) to process " \
5578 "for a total of %d runs" % \
5579 (len(self.datasets_to_use),
5580 sum([len(i) for i in \
5581 self.datasets_to_use.values()])))
5582
5583
5584
5585
5586
5587
5588
5589
5590
5591
5592
5593
5594
5595
5596
5597
5598
5599
5600
5601
5602
5603
5604
5605
5606
5607
5608 self.create_and_check_castor_dirs()
5609
5610
5611
5612 self.write_crab_config()
5613 self.write_multicrab_config()
5614
5615
5616
5617
5618
5619
5620
5621
5622
5623 for dataset_name in self.datasets_to_use.keys():
5624 try:
5625 self.write_harvesting_config(dataset_name)
5626 if self.harvesting_mode == "two-step":
5627 self.write_me_extraction_config(dataset_name)
5628 except:
5629
5630 raise
5631 else:
5632
5633
5634 tmp = {}
5635 for run_number in self.datasets_to_use[dataset_name]:
5636 tmp[run_number] = self.datasets_information \
5637 [dataset_name]["num_events"][run_number]
5638 if dataset_name in self.book_keeping_information:
5639 self.book_keeping_information[dataset_name].update(tmp)
5640 else:
5641 self.book_keeping_information[dataset_name] = tmp
5642
5643
5644 self.show_exit_message()
5645
5646 except Usage as err:
5647
5648
5649 pass
5650
5651 except Error as err:
5652
5653 exit_code = 1
5654
5655 except Exception as err:
5656
5657
5658
5659
5660
5661
5662
5663
5664 if isinstance(err, SystemExit):
5665 self.logger.fatal(err.code)
5666 elif not isinstance(err, KeyboardInterrupt):
5667 self.logger.fatal("!" * 50)
5668 self.logger.fatal(" This looks like a serious problem.")
5669 self.logger.fatal(" If you are sure you followed all " \
5670 "instructions")
5671 self.logger.fatal(" please copy the below stack trace together")
5672 self.logger.fatal(" with a description of what you were doing to")
5673 self.logger.fatal(" jeroen.hegeman@cern.ch.")
5674 self.logger.fatal(" %s" % self.ident_string())
5675 self.logger.fatal("!" * 50)
5676 self.logger.fatal(str(err))
5677 import traceback
5678 traceback_string = traceback.format_exc()
5679 for line in traceback_string.split("\n"):
5680 self.logger.fatal(line)
5681 self.logger.fatal("!" * 50)
5682 exit_code = 2
5683
5684
5685
5686
5687
5688
5689
5690 finally:
5691
5692 self.cleanup()
5693
5694
5695
5696 if self.crab_submission == True:
5697 os.system("multicrab -create")
5698 os.system("multicrab -submit")
5699
5700
5701 return exit_code
5702
5703
5704
5705
5706
5707
5708
5709 if __name__ == "__main__":
5710 "Main entry point for harvesting."
5711
5712 CMSHarvester().run()
5713
5714
5715
5716