File indexing completed on 2023-03-17 10:55:10
0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015 """Main program to run all kinds of harvesting.
0016
0017 These are the basic kinds of harvesting implemented (contact me if
0018 your favourite is missing):
0019
0020 - RelVal : Run for release validation samples. Makes heavy use of MC
0021 truth information.
0022
0023 - RelValFS: FastSim RelVal.
0024
0025 - MC : Run for MC samples.
0026
0027 - DQMOffline : Run for real data (could also be run for MC).
0028
0029 For the mappings of these harvesting types to sequence names please
0030 see the setup_harvesting_info() and option_handler_list_types()
0031 methods.
0032
0033 """
0034 from __future__ import print_function
0035
0036
0037
0038 from builtins import range
0039 __version__ = "3.8.2p1"
0040 __author__ = "Jeroen Hegeman (jeroen.hegeman@cern.ch)," \
0041 "Niklas Pietsch (niklas.pietsch@desy.de)"
0042
0043 twiki_url = "https://twiki.cern.ch/twiki/bin/view/CMS/CmsHarvester"
0044
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082
0083
0084
0085
0086
0087
0088
0089
0090
0091 import os
0092 import sys
0093 import subprocess
0094 import re
0095 import logging
0096 import optparse
0097 import datetime
0098 import copy
0099 from inspect import getargspec
0100 from random import choice
0101
0102
0103
0104 from DBSAPI.dbsApi import DbsApi
0105 import DBSAPI.dbsException
0106 import DBSAPI.dbsApiException
0107 from functools import reduce
0108
0109 global xml
0110 global SAXParseException
0111 import xml.sax
0112 from xml.sax import SAXParseException
0113
0114 import Configuration.PyReleaseValidation
0115 from Configuration.PyReleaseValidation.ConfigBuilder import \
0116 ConfigBuilder, defaultOptions
0117
0118
0119
0120
0121
0122 import pdb
0123 try:
0124 import debug_hook
0125 except ImportError:
0126 pass
0127
0128
0129
0130
0131 class Usage(Exception):
0132 def __init__(self, msg):
0133 self.msg = msg
0134 def __str__(self):
0135 return repr(self.msg)
0136
0137
0138
0139
0140
0141
0142 class Error(Exception):
0143 def __init__(self, msg):
0144 self.msg = msg
0145 def __str__(self):
0146 return repr(self.msg)
0147
0148
0149
0150
0151
0152 class CMSHarvesterHelpFormatter(optparse.IndentedHelpFormatter):
0153 """Helper class to add some customised help output to cmsHarvester.
0154
0155 We want to add some instructions, as well as a pointer to the CMS
0156 Twiki.
0157
0158 """
0159
0160 def format_usage(self, usage):
0161
0162 usage_lines = []
0163
0164 sep_line = "-" * 60
0165 usage_lines.append(sep_line)
0166 usage_lines.append("Welcome to the CMS harvester, a (hopefully useful)")
0167 usage_lines.append("tool to create harvesting configurations.")
0168 usage_lines.append("For more information please have a look at the CMS Twiki:")
0169 usage_lines.append(" %s" % twiki_url)
0170 usage_lines.append(sep_line)
0171 usage_lines.append("")
0172
0173
0174
0175 usage_lines.append(optparse.IndentedHelpFormatter. \
0176 format_usage(self, usage))
0177
0178 formatted_usage = "\n".join(usage_lines)
0179 return formatted_usage
0180
0181
0182
0183
0184
0185
0186
0187 class DBSXMLHandler(xml.sax.handler.ContentHandler):
0188 """XML handler class to parse DBS results.
0189
0190 The tricky thing here is that older DBS versions (2.0.5 and
0191 earlier) return results in a different XML format than newer
0192 versions. Previously the result values were returned as attributes
0193 to the `result' element. The new approach returns result values as
0194 contents of named elements.
0195
0196 The old approach is handled directly in startElement(), the new
0197 approach in characters().
0198
0199 NOTE: All results are returned in the form of string values of
0200 course!
0201
0202 """
0203
0204
0205
0206
0207
0208 mapping = {
0209 "dataset" : "PATH",
0210 "dataset.tag" : "PROCESSEDDATASET_GLOBALTAG",
0211 "datatype.type" : "PRIMARYDSTYPE_TYPE",
0212 "run" : "RUNS_RUNNUMBER",
0213 "run.number" : "RUNS_RUNNUMBER",
0214 "file.name" : "FILES_LOGICALFILENAME",
0215 "file.numevents" : "FILES_NUMBEROFEVENTS",
0216 "algo.version" : "APPVERSION_VERSION",
0217 "site" : "STORAGEELEMENT_SENAME",
0218 }
0219
0220 def __init__(self, tag_names):
0221
0222
0223 self.element_position = []
0224 self.tag_names = tag_names
0225 self.results = {}
0226
0227 def startElement(self, name, attrs):
0228 self.element_position.append(name)
0229
0230 self.current_value = []
0231
0232
0233
0234
0235 if name == "result":
0236 for name in self.tag_names:
0237 key = DBSXMLHandler.mapping[name]
0238 value = str(attrs[key])
0239 try:
0240 self.results[name].append(value)
0241 except KeyError:
0242 self.results[name] = [value]
0243
0244
0245
0246 def endElement(self, name):
0247 assert self.current_element() == name, \
0248 "closing unopenend element `%s'" % name
0249
0250 if self.current_element() in self.tag_names:
0251 contents = "".join(self.current_value)
0252 if self.current_element() in self.results:
0253 self.results[self.current_element()].append(contents)
0254 else:
0255 self.results[self.current_element()] = [contents]
0256
0257 self.element_position.pop()
0258
0259 def characters(self, content):
0260
0261
0262
0263
0264 if self.current_element() in self.tag_names:
0265 self.current_value.append(content)
0266
0267 def current_element(self):
0268 return self.element_position[-1]
0269
0270 def check_results_validity(self):
0271 """Make sure that all results arrays have equal length.
0272
0273 We should have received complete rows from DBS. I.e. all
0274 results arrays in the handler should be of equal length.
0275
0276 """
0277
0278 results_valid = True
0279
0280 res_names = self.results.keys()
0281 if len(res_names) > 1:
0282 for res_name in res_names[1:]:
0283 res_tmp = self.results[res_name]
0284 if len(res_tmp) != len(self.results[res_names[0]]):
0285 results_valid = False
0286
0287 return results_valid
0288
0289
0290
0291
0292
0293
0294
0295 class CMSHarvester(object):
0296 """Class to perform CMS harvesting.
0297
0298 More documentation `obviously' to follow.
0299
0300 """
0301
0302
0303
0304 def __init__(self, cmd_line_opts=None):
0305 "Initialize class and process command line options."
0306
0307 self.version = __version__
0308
0309
0310
0311 self.harvesting_types = [
0312 "RelVal",
0313 "RelValFS",
0314 "MC",
0315 "DQMOffline",
0316 ]
0317
0318
0319
0320
0321
0322
0323
0324
0325
0326
0327
0328
0329
0330 self.harvesting_modes = [
0331 "single-step",
0332 "single-step-allow-partial",
0333 "two-step"
0334 ]
0335
0336
0337
0338
0339
0340
0341
0342
0343
0344 self.globaltag = None
0345
0346
0347
0348 self.use_ref_hists = True
0349
0350
0351
0352
0353
0354
0355
0356 self.frontier_connection_name = {}
0357 self.frontier_connection_name["globaltag"] = "frontier://" \
0358 "FrontierProd/"
0359 self.frontier_connection_name["refhists"] = "frontier://" \
0360 "FrontierProd/"
0361 self.frontier_connection_overridden = {}
0362 for key in self.frontier_connection_name.keys():
0363 self.frontier_connection_overridden[key] = False
0364
0365
0366
0367
0368 self.harvesting_info = None
0369
0370
0371
0372
0373
0374
0375
0376
0377 self.harvesting_type = None
0378
0379
0380
0381
0382
0383 self.harvesting_mode = None
0384
0385
0386
0387 self.harvesting_mode_default = "single-step"
0388
0389
0390
0391
0392
0393
0394 self.input_method = {}
0395 self.input_method["datasets"] = {}
0396 self.input_method["datasets"]["use"] = None
0397 self.input_method["datasets"]["ignore"] = None
0398 self.input_method["runs"] = {}
0399 self.input_method["runs"]["use"] = None
0400 self.input_method["runs"]["ignore"] = None
0401 self.input_method["runs"]["ignore"] = None
0402
0403 self.input_name = {}
0404 self.input_name["datasets"] = {}
0405 self.input_name["datasets"]["use"] = None
0406 self.input_name["datasets"]["ignore"] = None
0407 self.input_name["runs"] = {}
0408 self.input_name["runs"]["use"] = None
0409 self.input_name["runs"]["ignore"] = None
0410
0411 self.Jsonlumi = False
0412 self.Jsonfilename = "YourJSON.txt"
0413 self.Jsonrunfilename = "YourJSON.txt"
0414 self.todofile = "YourToDofile.txt"
0415
0416
0417
0418
0419 self.force_running = None
0420
0421
0422 self.castor_base_dir = None
0423 self.castor_base_dir_default = "/castor/cern.ch/" \
0424 "cms/store/temp/" \
0425 "dqm/offline/harvesting_output/"
0426
0427
0428
0429 self.book_keeping_file_name = None
0430 self.book_keeping_file_name_default = "harvesting_accounting.txt"
0431
0432
0433
0434
0435 self.ref_hist_mappings_file_name = None
0436
0437 self.ref_hist_mappings_file_name_default = "harvesting_ref_hist_mappings.txt"
0438
0439
0440
0441
0442 self.castor_prefix = "/castor/cern.ch"
0443
0444
0445
0446
0447
0448 self.non_t1access = False
0449 self.caf_access = False
0450 self.saveByLumiSection = False
0451 self.crab_submission = False
0452 self.nr_max_sites = 1
0453
0454 self.preferred_site = "no preference"
0455
0456
0457 self.datasets_to_use = {}
0458
0459 self.datasets_to_ignore = {}
0460
0461 self.book_keeping_information = {}
0462
0463
0464 self.ref_hist_mappings = {}
0465
0466
0467
0468 self.runs_to_use = {}
0469 self.runs_to_ignore = {}
0470
0471
0472 self.sites_and_versions_cache = {}
0473
0474
0475 self.globaltag_check_cache = []
0476
0477
0478
0479 self.all_sites_found = True
0480
0481
0482 self.no_matching_site_found_str = "no_matching_site_found"
0483
0484
0485 if cmd_line_opts is None:
0486 cmd_line_opts = sys.argv[1:]
0487 self.cmd_line_opts = cmd_line_opts
0488
0489
0490 log_handler = logging.StreamHandler()
0491
0492
0493 log_formatter = logging.Formatter("%(message)s")
0494 log_handler.setFormatter(log_formatter)
0495 logger = logging.getLogger()
0496 logger.name = "main"
0497 logger.addHandler(log_handler)
0498 self.logger = logger
0499
0500 self.set_output_level("NORMAL")
0501
0502
0503
0504
0505
0506
0507
0508 def cleanup(self):
0509 "Clean up after ourselves."
0510
0511
0512
0513
0514 logging.shutdown()
0515
0516
0517
0518
0519
0520 def time_stamp(self):
0521 "Create a timestamp to use in the created config files."
0522
0523 time_now = datetime.datetime.utcnow()
0524
0525 time_now = time_now.replace(microsecond = 0)
0526 time_stamp = "%sUTC" % datetime.datetime.isoformat(time_now)
0527
0528
0529 return time_stamp
0530
0531
0532
0533 def ident_string(self):
0534 "Spit out an identification string for cmsHarvester.py."
0535
0536 ident_str = "`cmsHarvester.py " \
0537 "version %s': cmsHarvester.py %s" % \
0538 (__version__,
0539 reduce(lambda x, y: x+' '+y, sys.argv[1:]))
0540
0541 return ident_str
0542
0543
0544
0545 def format_conditions_string(self, globaltag):
0546 """Create the conditions string needed for `cmsDriver'.
0547
0548 Just glueing the FrontierConditions bit in front of it really.
0549
0550 """
0551
0552
0553
0554
0555
0556
0557
0558
0559 if globaltag.lower().find("conditions") > -1:
0560 conditions_string = globaltag
0561 else:
0562 conditions_string = "FrontierConditions_GlobalTag,%s" % \
0563 globaltag
0564
0565
0566 return conditions_string
0567
0568
0569
0570 def db_account_name_cms_cond_globaltag(self):
0571 """Return the database account name used to store the GlobalTag.
0572
0573 The name of the database account depends (albeit weakly) on
0574 the CMSSW release version.
0575
0576 """
0577
0578
0579
0580 account_name = "CMS_COND_31X_GLOBALTAG"
0581
0582
0583 return account_name
0584
0585
0586
0587 def db_account_name_cms_cond_dqm_summary(self):
0588 """See db_account_name_cms_cond_globaltag."""
0589
0590 account_name = None
0591 version = self.cmssw_version[6:11]
0592 if version < "3_4_0":
0593 account_name = "CMS_COND_31X_DQM_SUMMARY"
0594 else:
0595 account_name = "CMS_COND_34X"
0596
0597
0598 return account_name
0599
0600
0601
0602 def config_file_header(self):
0603 "Create a nice header to be used to mark the generated files."
0604
0605 tmp = []
0606
0607 time_stamp = self.time_stamp()
0608 ident_str = self.ident_string()
0609 tmp.append("# %s" % time_stamp)
0610 tmp.append("# WARNING: This file was created automatically!")
0611 tmp.append("")
0612 tmp.append("# Created by %s" % ident_str)
0613
0614 header = "\n".join(tmp)
0615
0616
0617 return header
0618
0619
0620
0621 def set_output_level(self, output_level):
0622 """Adjust the level of output generated.
0623
0624 Choices are:
0625 - normal : default level of output
0626 - quiet : less output than the default
0627 - verbose : some additional information
0628 - debug : lots more information, may be overwhelming
0629
0630 NOTE: The debug option is a bit special in the sense that it
0631 also modifies the output format.
0632
0633 """
0634
0635
0636
0637 output_levels = {
0638 "NORMAL" : logging.INFO,
0639 "QUIET" : logging.WARNING,
0640 "VERBOSE" : logging.INFO,
0641 "DEBUG" : logging.DEBUG
0642 }
0643
0644 output_level = output_level.upper()
0645
0646 try:
0647
0648 self.log_level = output_levels[output_level]
0649 self.logger.setLevel(self.log_level)
0650 except KeyError:
0651
0652 self.logger.fatal("Unknown output level `%s'" % ouput_level)
0653
0654 raise Exception
0655
0656
0657
0658
0659
0660 def option_handler_debug(self, option, opt_str, value, parser):
0661 """Switch to debug mode.
0662
0663 This both increases the amount of output generated, as well as
0664 changes the format used (more detailed information is given).
0665
0666 """
0667
0668
0669 log_formatter_debug = logging.Formatter("[%(levelname)s] " \
0670
0671
0672
0673
0674
0675
0676 "%(message)s")
0677
0678
0679 log_handler = self.logger.handlers[0]
0680 log_handler.setFormatter(log_formatter_debug)
0681 self.set_output_level("DEBUG")
0682
0683
0684
0685
0686
0687 def option_handler_quiet(self, option, opt_str, value, parser):
0688 "Switch to quiet mode: less verbose."
0689
0690 self.set_output_level("QUIET")
0691
0692
0693
0694
0695
0696 def option_handler_force(self, option, opt_str, value, parser):
0697 """Switch on `force mode' in which case we don't brake for nobody.
0698
0699 In so-called `force mode' all sanity checks are performed but
0700 we don't halt on failure. Of course this requires some care
0701 from the user.
0702
0703 """
0704
0705 self.logger.debug("Switching on `force mode'.")
0706 self.force_running = True
0707
0708
0709
0710
0711
0712 def option_handler_harvesting_type(self, option, opt_str, value, parser):
0713 """Set the harvesting type to be used.
0714
0715 This checks that no harvesting type is already set, and sets
0716 the harvesting type to be used to the one specified. If a
0717 harvesting type is already set an exception is thrown. The
0718 same happens when an unknown type is specified.
0719
0720 """
0721
0722
0723
0724
0725
0726
0727 value = value.lower()
0728 harvesting_types_lowered = [i.lower() for i in self.harvesting_types]
0729 try:
0730 type_index = harvesting_types_lowered.index(value)
0731
0732
0733 except ValueError:
0734 self.logger.fatal("Unknown harvesting type `%s'" % \
0735 value)
0736 self.logger.fatal(" possible types are: %s" %
0737 ", ".join(self.harvesting_types))
0738 raise Usage("Unknown harvesting type `%s'" % \
0739 value)
0740
0741
0742
0743 if not self.harvesting_type is None:
0744 msg = "Only one harvesting type should be specified"
0745 self.logger.fatal(msg)
0746 raise Usage(msg)
0747 self.harvesting_type = self.harvesting_types[type_index]
0748
0749 self.logger.info("Harvesting type to be used: `%s'" % \
0750 self.harvesting_type)
0751
0752
0753
0754
0755
0756 def option_handler_harvesting_mode(self, option, opt_str, value, parser):
0757 """Set the harvesting mode to be used.
0758
0759 Single-step harvesting can be used for samples that are
0760 located completely at a single site (= SE). Otherwise use
0761 two-step mode.
0762
0763 """
0764
0765
0766 harvesting_mode = value.lower()
0767 if not harvesting_mode in self.harvesting_modes:
0768 msg = "Unknown harvesting mode `%s'" % harvesting_mode
0769 self.logger.fatal(msg)
0770 self.logger.fatal(" possible modes are: %s" % \
0771 ", ".join(self.harvesting_modes))
0772 raise Usage(msg)
0773
0774
0775
0776 if not self.harvesting_mode is None:
0777 msg = "Only one harvesting mode should be specified"
0778 self.logger.fatal(msg)
0779 raise Usage(msg)
0780 self.harvesting_mode = harvesting_mode
0781
0782 self.logger.info("Harvesting mode to be used: `%s'" % \
0783 self.harvesting_mode)
0784
0785
0786
0787
0788
0789 def option_handler_globaltag(self, option, opt_str, value, parser):
0790 """Set the GlobalTag to be used, overriding our own choices.
0791
0792 By default the cmsHarvester will use the GlobalTag with which
0793 a given dataset was created also for the harvesting. The
0794 --globaltag option is the way to override this behaviour.
0795
0796 """
0797
0798
0799 if not self.globaltag is None:
0800 msg = "Only one GlobalTag should be specified"
0801 self.logger.fatal(msg)
0802 raise Usage(msg)
0803 self.globaltag = value
0804
0805 self.logger.info("GlobalTag to be used: `%s'" % \
0806 self.globaltag)
0807
0808
0809
0810
0811
0812 def option_handler_no_ref_hists(self, option, opt_str, value, parser):
0813 "Switch use of all reference histograms off."
0814
0815 self.use_ref_hists = False
0816
0817 self.logger.warning("Switching off all use of reference histograms")
0818
0819
0820
0821
0822
0823 def option_handler_frontier_connection(self, option, opt_str,
0824 value, parser):
0825 """Override the default Frontier connection string.
0826
0827 Please only use this for testing (e.g. when a test payload has
0828 been inserted into cms_orc_off instead of cms_orc_on).
0829
0830 This method gets called for three different command line
0831 options:
0832 - --frontier-connection,
0833 - --frontier-connection-for-globaltag,
0834 - --frontier-connection-for-refhists.
0835 Appropriate care has to be taken to make sure things are only
0836 specified once.
0837
0838 """
0839
0840
0841 frontier_type = opt_str.split("-")[-1]
0842 if frontier_type == "connection":
0843
0844 frontier_types = self.frontier_connection_name.keys()
0845 else:
0846 frontier_types = [frontier_type]
0847
0848
0849
0850 for connection_name in frontier_types:
0851 if self.frontier_connection_overridden[connection_name] == True:
0852 msg = "Please specify either:\n" \
0853 " `--frontier-connection' to change the " \
0854 "Frontier connection used for everything, or\n" \
0855 "either one or both of\n" \
0856 " `--frontier-connection-for-globaltag' to " \
0857 "change the Frontier connection used for the " \
0858 "GlobalTag and\n" \
0859 " `--frontier-connection-for-refhists' to change " \
0860 "the Frontier connection used for the " \
0861 "reference histograms."
0862 self.logger.fatal(msg)
0863 raise Usage(msg)
0864
0865 frontier_prefix = "frontier://"
0866 if not value.startswith(frontier_prefix):
0867 msg = "Expecting Frontier connections to start with " \
0868 "`%s'. You specified `%s'." % \
0869 (frontier_prefix, value)
0870 self.logger.fatal(msg)
0871 raise Usage(msg)
0872
0873
0874 if value.find("FrontierProd") < 0 and \
0875 value.find("FrontierProd") < 0:
0876 msg = "Expecting Frontier connections to contain either " \
0877 "`FrontierProd' or `FrontierPrep'. You specified " \
0878 "`%s'. Are you sure?" % \
0879 value
0880 self.logger.warning(msg)
0881
0882 if not value.endswith("/"):
0883 value += "/"
0884
0885 for connection_name in frontier_types:
0886 self.frontier_connection_name[connection_name] = value
0887 self.frontier_connection_overridden[connection_name] = True
0888
0889 frontier_type_str = "unknown"
0890 if connection_name == "globaltag":
0891 frontier_type_str = "the GlobalTag"
0892 elif connection_name == "refhists":
0893 frontier_type_str = "the reference histograms"
0894
0895 self.logger.warning("Overriding default Frontier " \
0896 "connection for %s " \
0897 "with `%s'" % \
0898 (frontier_type_str,
0899 self.frontier_connection_name[connection_name]))
0900
0901
0902
0903
0904
0905 def option_handler_input_todofile(self, option, opt_str, value, parser):
0906
0907 self.todofile = value
0908
0909
0910
0911
0912 def option_handler_input_Jsonfile(self, option, opt_str, value, parser):
0913
0914 self.Jsonfilename = value
0915
0916
0917
0918
0919 def option_handler_input_Jsonrunfile(self, option, opt_str, value, parser):
0920
0921 self.Jsonrunfilename = value
0922
0923
0924
0925
0926 def option_handler_input_spec(self, option, opt_str, value, parser):
0927 """TODO TODO TODO
0928 Document this...
0929
0930 """
0931
0932
0933
0934 if opt_str.lower().find("ignore") > -1:
0935 spec_type = "ignore"
0936 else:
0937 spec_type = "use"
0938
0939
0940 if opt_str.lower().find("dataset") > -1:
0941 select_type = "datasets"
0942 else:
0943 select_type = "runs"
0944
0945 if not self.input_method[select_type][spec_type] is None:
0946 msg = "Please only specify one input method " \
0947 "(for the `%s' case)" % opt_str
0948 self.logger.fatal(msg)
0949 raise Usage(msg)
0950
0951 input_method = opt_str.replace("-", "").replace("ignore", "")
0952 self.input_method[select_type][spec_type] = input_method
0953 self.input_name[select_type][spec_type] = value
0954
0955 self.logger.debug("Input method for the `%s' case: %s" % \
0956 (spec_type, input_method))
0957
0958
0959
0960
0961
0962 def option_handler_book_keeping_file(self, option, opt_str, value, parser):
0963 """Store the name of the file to be used for book keeping.
0964
0965 The only check done here is that only a single book keeping
0966 file is specified.
0967
0968 """
0969
0970 file_name = value
0971
0972 if not self.book_keeping_file_name is None:
0973 msg = "Only one book keeping file should be specified"
0974 self.logger.fatal(msg)
0975 raise Usage(msg)
0976 self.book_keeping_file_name = file_name
0977
0978 self.logger.info("Book keeping file to be used: `%s'" % \
0979 self.book_keeping_file_name)
0980
0981
0982
0983
0984
0985 def option_handler_ref_hist_mapping_file(self, option, opt_str, value, parser):
0986 """Store the name of the file for the ref. histogram mapping.
0987
0988 """
0989
0990 file_name = value
0991
0992 if not self.ref_hist_mappings_file_name is None:
0993 msg = "Only one reference histogram mapping file " \
0994 "should be specified"
0995 self.logger.fatal(msg)
0996 raise Usage(msg)
0997 self.ref_hist_mappings_file_name = file_name
0998
0999 self.logger.info("Reference histogram mapping file " \
1000 "to be used: `%s'" % \
1001 self.ref_hist_mappings_file_name)
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061 def option_handler_castor_dir(self, option, opt_str, value, parser):
1062 """Specify where on CASTOR the output should go.
1063
1064 At the moment only output to CERN CASTOR is
1065 supported. Eventually the harvested results should go into the
1066 central place for DQM on CASTOR anyway.
1067
1068 """
1069
1070
1071 castor_dir = value
1072
1073 castor_prefix = self.castor_prefix
1074
1075
1076 castor_dir = os.path.join(os.path.sep, castor_dir)
1077 self.castor_base_dir = os.path.normpath(castor_dir)
1078
1079 self.logger.info("CASTOR (base) area to be used: `%s'" % \
1080 self.castor_base_dir)
1081
1082
1083
1084
1085
1086 def option_handler_no_t1access(self, option, opt_str, value, parser):
1087 """Set the self.no_t1access flag to try and create jobs that
1088 run without special `t1access' role.
1089
1090 """
1091
1092 self.non_t1access = True
1093
1094 self.logger.warning("Running in `non-t1access' mode. " \
1095 "Will try to create jobs that run " \
1096 "without special rights but no " \
1097 "further promises...")
1098
1099
1100
1101
1102
1103 def option_handler_caf_access(self, option, opt_str, value, parser):
1104 """Set the self.caf_access flag to try and create jobs that
1105 run on the CAF.
1106
1107 """
1108 self.caf_access = True
1109
1110 self.logger.warning("Running in `caf_access' mode. " \
1111 "Will try to create jobs that run " \
1112 "on CAF but no" \
1113 "further promises...")
1114
1115
1116
1117
1118
1119 def option_handler_saveByLumiSection(self, option, opt_str, value, parser):
1120 """Set process.dqmSaver.saveByLumiSectiont=1 in cfg harvesting file
1121 """
1122 self.saveByLumiSection = True
1123
1124 self.logger.warning("waning concerning saveByLumiSection option")
1125
1126
1127
1128
1129
1130
1131 def option_handler_crab_submission(self, option, opt_str, value, parser):
1132 """Crab jobs are not created and
1133 "submitted automatically",
1134 """
1135 self.crab_submission = True
1136
1137
1138
1139
1140
1141 def option_handler_sites(self, option, opt_str, value, parser):
1142
1143 self.nr_max_sites = value
1144
1145
1146
1147 def option_handler_preferred_site(self, option, opt_str, value, parser):
1148
1149 self.preferred_site = value
1150
1151
1152
1153 def option_handler_list_types(self, option, opt_str, value, parser):
1154 """List all harvesting types and their mappings.
1155
1156 This lists all implemented harvesting types with their
1157 corresponding mappings to sequence names. This had to be
1158 separated out from the help since it depends on the CMSSW
1159 version and was making things a bit of a mess.
1160
1161 NOTE: There is no way (at least not that I could come up with)
1162 to code this in a neat generic way that can be read both by
1163 this method and by setup_harvesting_info(). Please try hard to
1164 keep these two methods in sync!
1165
1166 """
1167
1168 sep_line = "-" * 50
1169 sep_line_short = "-" * 20
1170
1171 print(sep_line)
1172 print("The following harvesting types are available:")
1173 print(sep_line)
1174
1175 print("`RelVal' maps to:")
1176 print(" pre-3_3_0 : HARVESTING:validationHarvesting")
1177 print(" 3_4_0_pre2 and later: HARVESTING:validationHarvesting+dqmHarvesting")
1178 print(" Exceptions:")
1179 print(" 3_3_0_pre1-4 : HARVESTING:validationHarvesting")
1180 print(" 3_3_0_pre6 : HARVESTING:validationHarvesting")
1181 print(" 3_4_0_pre1 : HARVESTING:validationHarvesting")
1182
1183 print(sep_line_short)
1184
1185 print("`RelValFS' maps to:")
1186 print(" always : HARVESTING:validationHarvestingFS")
1187
1188 print(sep_line_short)
1189
1190 print("`MC' maps to:")
1191 print(" always : HARVESTING:validationprodHarvesting")
1192
1193 print(sep_line_short)
1194
1195 print("`DQMOffline' maps to:")
1196 print(" always : HARVESTING:dqmHarvesting")
1197
1198 print(sep_line)
1199
1200
1201
1202 raise SystemExit
1203
1204
1205
1206
1207
1208 def setup_harvesting_info(self):
1209 """Fill our dictionary with all info needed to understand
1210 harvesting.
1211
1212 This depends on the CMSSW version since at some point the
1213 names and sequences were modified.
1214
1215 NOTE: There is no way (at least not that I could come up with)
1216 to code this in a neat generic way that can be read both by
1217 this method and by option_handler_list_types(). Please try
1218 hard to keep these two methods in sync!
1219
1220 """
1221
1222 assert not self.cmssw_version is None, \
1223 "ERROR setup_harvesting() requires " \
1224 "self.cmssw_version to be set!!!"
1225
1226 harvesting_info = {}
1227
1228
1229 harvesting_info["DQMOffline"] = {}
1230 harvesting_info["DQMOffline"]["beamspot"] = None
1231 harvesting_info["DQMOffline"]["eventcontent"] = None
1232 harvesting_info["DQMOffline"]["harvesting"] = "AtRunEnd"
1233
1234 harvesting_info["RelVal"] = {}
1235 harvesting_info["RelVal"]["beamspot"] = None
1236 harvesting_info["RelVal"]["eventcontent"] = None
1237 harvesting_info["RelVal"]["harvesting"] = "AtRunEnd"
1238
1239 harvesting_info["RelValFS"] = {}
1240 harvesting_info["RelValFS"]["beamspot"] = None
1241 harvesting_info["RelValFS"]["eventcontent"] = None
1242 harvesting_info["RelValFS"]["harvesting"] = "AtRunEnd"
1243
1244 harvesting_info["MC"] = {}
1245 harvesting_info["MC"]["beamspot"] = None
1246 harvesting_info["MC"]["eventcontent"] = None
1247 harvesting_info["MC"]["harvesting"] = "AtRunEnd"
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257 assert self.cmssw_version.startswith("CMSSW_")
1258
1259
1260 version = self.cmssw_version[6:]
1261
1262
1263
1264
1265 step_string = None
1266 if version < "3_3_0":
1267 step_string = "validationHarvesting"
1268 elif version in ["3_3_0_pre1", "3_3_0_pre2",
1269 "3_3_0_pre3", "3_3_0_pre4",
1270 "3_3_0_pre6", "3_4_0_pre1"]:
1271 step_string = "validationHarvesting"
1272 else:
1273 step_string = "validationHarvesting+dqmHarvesting"
1274
1275 harvesting_info["RelVal"]["step_string"] = step_string
1276
1277
1278
1279 assert not step_string is None, \
1280 "ERROR Could not decide a RelVal harvesting sequence " \
1281 "for CMSSW version %s" % self.cmssw_version
1282
1283
1284
1285
1286
1287 step_string = "validationHarvestingFS"
1288
1289 harvesting_info["RelValFS"]["step_string"] = step_string
1290
1291
1292
1293
1294 step_string = "validationprodHarvesting"
1295
1296 harvesting_info["MC"]["step_string"] = step_string
1297
1298
1299
1300 assert not step_string is None, \
1301 "ERROR Could not decide a MC harvesting " \
1302 "sequence for CMSSW version %s" % self.cmssw_version
1303
1304
1305
1306
1307
1308 step_string = "dqmHarvesting"
1309
1310 harvesting_info["DQMOffline"]["step_string"] = step_string
1311
1312
1313
1314 self.harvesting_info = harvesting_info
1315
1316 self.logger.info("Based on the CMSSW version (%s) " \
1317 "I decided to use the `HARVESTING:%s' " \
1318 "sequence for %s harvesting" % \
1319 (self.cmssw_version,
1320 self.harvesting_info[self.harvesting_type]["step_string"],
1321 self.harvesting_type))
1322
1323
1324
1325
1326
1327 def create_castor_path_name_common(self, dataset_name):
1328 """Build the common part of the output path to be used on
1329 CASTOR.
1330
1331 This consists of the CASTOR area base path specified by the
1332 user and a piece depending on the data type (data vs. MC), the
1333 harvesting type and the dataset name followed by a piece
1334 containing the run number and event count. (See comments in
1335 create_castor_path_name_special for details.) This method
1336 creates the common part, without run number and event count.
1337
1338 """
1339
1340 castor_path = self.castor_base_dir
1341
1342
1343
1344
1345 datatype = self.datasets_information[dataset_name]["datatype"]
1346 datatype = datatype.lower()
1347 castor_path = os.path.join(castor_path, datatype)
1348
1349
1350 harvesting_type = self.harvesting_type
1351 harvesting_type = harvesting_type.lower()
1352 castor_path = os.path.join(castor_path, harvesting_type)
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362 release_version = self.cmssw_version
1363 release_version = release_version.lower(). \
1364 replace("cmssw", ""). \
1365 strip("_")
1366 castor_path = os.path.join(castor_path, release_version)
1367
1368
1369 dataset_name_escaped = self.escape_dataset_name(dataset_name)
1370 castor_path = os.path.join(castor_path, dataset_name_escaped)
1371
1372
1373
1374 castor_path = os.path.normpath(castor_path)
1375
1376
1377 return castor_path
1378
1379
1380
1381 def create_castor_path_name_special(self,
1382 dataset_name, run_number,
1383 castor_path_common):
1384 """Create the specialised part of the CASTOR output dir name.
1385
1386 NOTE: To avoid clashes with `incremental harvesting'
1387 (re-harvesting when a dataset grows) we have to include the
1388 event count in the path name. The underlying `problem' is that
1389 CRAB does not overwrite existing output files so if the output
1390 file already exists CRAB will fail to copy back the output.
1391
1392 NOTE: It's not possible to create different kinds of
1393 harvesting jobs in a single call to this tool. However, in
1394 principle it could be possible to create both data and MC jobs
1395 in a single go.
1396
1397 NOTE: The number of events used in the path name is the
1398 _total_ number of events in the dataset/run at the time of
1399 harvesting. If we're doing partial harvesting the final
1400 results will reflect lower statistics. This is a) the easiest
1401 to code and b) the least likely to lead to confusion if
1402 someone ever decides to swap/copy around file blocks between
1403 sites.
1404
1405 """
1406
1407 castor_path = castor_path_common
1408
1409
1410
1411
1412 castor_path = os.path.join(castor_path, "run_%d" % run_number)
1413
1414
1415
1416
1417
1418
1419
1420 castor_path = os.path.join(castor_path, "nevents")
1421
1422
1423
1424 castor_path = os.path.normpath(castor_path)
1425
1426
1427 return castor_path
1428
1429
1430
1431 def create_and_check_castor_dirs(self):
1432 """Make sure all required CASTOR output dirs exist.
1433
1434 This checks the CASTOR base dir specified by the user as well
1435 as all the subdirs required by the current set of jobs.
1436
1437 """
1438
1439 self.logger.info("Checking (and if necessary creating) CASTOR " \
1440 "output area(s)...")
1441
1442
1443 self.create_and_check_castor_dir(self.castor_base_dir)
1444
1445
1446 castor_dirs = []
1447 for (dataset_name, runs) in self.datasets_to_use.items():
1448
1449 for run in runs:
1450 castor_dirs.append(self.datasets_information[dataset_name] \
1451 ["castor_path"][run])
1452 castor_dirs_unique = sorted(set(castor_dirs))
1453
1454
1455
1456 ndirs = len(castor_dirs_unique)
1457 step = max(ndirs / 10, 1)
1458 for (i, castor_dir) in enumerate(castor_dirs_unique):
1459 if (i + 1) % step == 0 or \
1460 (i + 1) == ndirs:
1461 self.logger.info(" %d/%d" % \
1462 (i + 1, ndirs))
1463 self.create_and_check_castor_dir(castor_dir)
1464
1465
1466
1467
1468
1469
1470 self.logger.debug("Checking if path `%s' is empty" % \
1471 castor_dir)
1472 cmd = "rfdir %s" % castor_dir
1473 (status, output) = subprocess.getstatusoutput(cmd)
1474 if status != 0:
1475 msg = "Could not access directory `%s'" \
1476 " !!! This is bad since I should have just" \
1477 " created it !!!" % castor_dir
1478 self.logger.fatal(msg)
1479 raise Error(msg)
1480 if len(output) > 0:
1481 self.logger.warning("Output directory `%s' is not empty:" \
1482 " new jobs will fail to" \
1483 " copy back output" % \
1484 castor_dir)
1485
1486
1487
1488
1489
1490 def create_and_check_castor_dir(self, castor_dir):
1491 """Check existence of the give CASTOR dir, if necessary create
1492 it.
1493
1494 Some special care has to be taken with several things like
1495 setting the correct permissions such that CRAB can store the
1496 output results. Of course this means that things like
1497 /castor/cern.ch/ and user/j/ have to be recognised and treated
1498 properly.
1499
1500 NOTE: Only CERN CASTOR area (/castor/cern.ch/) supported for
1501 the moment.
1502
1503 NOTE: This method uses some slightly tricky caching to make
1504 sure we don't keep over and over checking the same base paths.
1505
1506 """
1507
1508
1509
1510
1511 def split_completely(path):
1512 (parent_path, name) = os.path.split(path)
1513 if name == "":
1514 return (parent_path, )
1515 else:
1516 return split_completely(parent_path) + (name, )
1517
1518
1519
1520
1521
1522 def extract_permissions(rfstat_output):
1523 """Parse the output from rfstat and return the
1524 5-digit permissions string."""
1525
1526 permissions_line = [i for i in output.split("\n") \
1527 if i.lower().find("protection") > -1]
1528 regexp = re.compile(".*\(([0123456789]{5})\).*")
1529 match = regexp.search(rfstat_output)
1530 if not match or len(match.groups()) != 1:
1531 msg = "Could not extract permissions " \
1532 "from output: %s" % rfstat_output
1533 self.logger.fatal(msg)
1534 raise Error(msg)
1535 permissions = match.group(1)
1536
1537
1538 return permissions
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552 castor_paths_dont_touch = {
1553 0: ["/", "castor", "cern.ch", "cms", "store", "temp",
1554 "dqm", "offline", "user"],
1555 -1: ["user", "store"]
1556 }
1557
1558 self.logger.debug("Checking CASTOR path `%s'" % castor_dir)
1559
1560
1561
1562
1563 castor_path_pieces = split_completely(castor_dir)
1564
1565
1566
1567
1568 path = ""
1569 check_sizes = sorted(castor_paths_dont_touch.keys())
1570 len_castor_path_pieces = len(castor_path_pieces)
1571 for piece_index in range (len_castor_path_pieces):
1572 skip_this_path_piece = False
1573 piece = castor_path_pieces[piece_index]
1574
1575
1576 for check_size in check_sizes:
1577
1578 if (piece_index + check_size) > -1:
1579
1580
1581
1582 if castor_path_pieces[piece_index + check_size] in castor_paths_dont_touch[check_size]:
1583
1584 skip_this_path_piece = True
1585
1586
1587
1588
1589
1590
1591
1592 path = os.path.join(path, piece)
1593
1594
1595
1596
1597
1598 try:
1599 if path in self.castor_path_checks_cache:
1600 continue
1601 except AttributeError:
1602
1603 self.castor_path_checks_cache = []
1604 self.castor_path_checks_cache.append(path)
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622 if not skip_this_path_piece:
1623
1624
1625
1626
1627
1628
1629
1630 self.logger.debug("Checking if path `%s' exists" % \
1631 path)
1632 cmd = "rfstat %s" % path
1633 (status, output) = subprocess.getstatusoutput(cmd)
1634 if status != 0:
1635
1636 self.logger.debug("Creating path `%s'" % path)
1637 cmd = "nsmkdir -m 775 %s" % path
1638 (status, output) = subprocess.getstatusoutput(cmd)
1639 if status != 0:
1640 msg = "Could not create directory `%s'" % path
1641 self.logger.fatal(msg)
1642 raise Error(msg)
1643 cmd = "rfstat %s" % path
1644 (status, output) = subprocess.getstatusoutput(cmd)
1645
1646
1647
1648
1649 permissions = extract_permissions(output)
1650 if not permissions.startswith("40"):
1651 msg = "Path `%s' is not a directory(?)" % path
1652 self.logger.fatal(msg)
1653 raise Error(msg)
1654
1655
1656
1657 self.logger.debug("Checking permissions for path `%s'" % path)
1658 cmd = "rfstat %s" % path
1659 (status, output) = subprocess.getstatusoutput(cmd)
1660 if status != 0:
1661 msg = "Could not obtain permissions for directory `%s'" % \
1662 path
1663 self.logger.fatal(msg)
1664 raise Error(msg)
1665
1666 permissions = extract_permissions(output)[-3:]
1667
1668
1669
1670 if piece_index == (len_castor_path_pieces - 1):
1671
1672
1673 permissions_target = "775"
1674 else:
1675
1676 permissions_target = "775"
1677
1678
1679 permissions_new = []
1680 for (i, j) in zip(permissions, permissions_target):
1681 permissions_new.append(str(max(int(i), int(j))))
1682 permissions_new = "".join(permissions_new)
1683 self.logger.debug(" current permissions: %s" % \
1684 permissions)
1685 self.logger.debug(" target permissions : %s" % \
1686 permissions_target)
1687 if permissions_new != permissions:
1688
1689 self.logger.debug("Changing permissions of `%s' " \
1690 "to %s (were %s)" % \
1691 (path, permissions_new, permissions))
1692 cmd = "rfchmod %s %s" % (permissions_new, path)
1693 (status, output) = subprocess.getstatusoutput(cmd)
1694 if status != 0:
1695 msg = "Could not change permissions for path `%s' " \
1696 "to %s" % (path, permissions_new)
1697 self.logger.fatal(msg)
1698 raise Error(msg)
1699
1700 self.logger.debug(" Permissions ok (%s)" % permissions_new)
1701
1702
1703
1704
1705
1706 def pick_a_site(self, sites, cmssw_version):
1707
1708
1709 sites_forbidden = []
1710
1711 if (self.preferred_site == "CAF") or (self.preferred_site == "caf.cern.ch"):
1712 self.caf_access = True
1713
1714 if self.caf_access == False:
1715 sites_forbidden.append("caf.cern.ch")
1716
1717
1718
1719
1720
1721
1722
1723 all_t1 = [
1724 "srm-cms.cern.ch",
1725 "ccsrm.in2p3.fr",
1726 "cmssrm-fzk.gridka.de",
1727 "cmssrm.fnal.gov",
1728 "gridka-dCache.fzk.de",
1729 "srm-cms.gridpp.rl.ac.uk",
1730 "srm.grid.sinica.edu.tw",
1731 "srm2.grid.sinica.edu.tw",
1732 "srmcms.pic.es",
1733 "storm-fe-cms.cr.cnaf.infn.it"
1734 ]
1735
1736 country_codes = {
1737 "CAF" : "caf.cern.ch",
1738 "CH" : "srm-cms.cern.ch",
1739 "FR" : "ccsrm.in2p3.fr",
1740 "DE" : "cmssrm-fzk.gridka.de",
1741 "GOV" : "cmssrm.fnal.gov",
1742 "DE2" : "gridka-dCache.fzk.de",
1743 "UK" : "srm-cms.gridpp.rl.ac.uk",
1744 "TW" : "srm.grid.sinica.edu.tw",
1745 "TW2" : "srm2.grid.sinica.edu.tw",
1746 "ES" : "srmcms.pic.es",
1747 "IT" : "storm-fe-cms.cr.cnaf.infn.it"
1748 }
1749
1750 if self.non_t1access:
1751 sites_forbidden.extend(all_t1)
1752
1753 for site in sites_forbidden:
1754 if site in sites:
1755 sites.remove(site)
1756
1757 if self.preferred_site in country_codes:
1758 self.preferred_site = country_codes[self.preferred_site]
1759
1760 if self.preferred_site != "no preference":
1761 if self.preferred_site in sites:
1762 sites = [self.preferred_site]
1763 else:
1764 sites= []
1765
1766
1767
1768
1769
1770
1771
1772
1773 site_name = None
1774 cmd = None
1775 while len(sites) > 0 and \
1776 site_name is None:
1777
1778
1779 t1_sites = []
1780 for site in sites:
1781 if site in all_t1:
1782 t1_sites.append(site)
1783 if site == "caf.cern.ch":
1784 t1_sites.append(site)
1785
1786
1787
1788
1789
1790
1791 if len(t1_sites) > 0:
1792 se_name = choice(t1_sites)
1793
1794 else:
1795 se_name = choice(sites)
1796
1797
1798
1799 if se_name in self.sites_and_versions_cache and \
1800 cmssw_version in self.sites_and_versions_cache[se_name]:
1801 if self.sites_and_versions_cache[se_name][cmssw_version]:
1802 site_name = se_name
1803 break
1804 else:
1805 self.logger.debug(" --> rejecting site `%s'" % se_name)
1806 sites.remove(se_name)
1807
1808 else:
1809 self.logger.info("Checking if site `%s' " \
1810 "has CMSSW version `%s'" % \
1811 (se_name, cmssw_version))
1812 self.sites_and_versions_cache[se_name] = {}
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827 cmd = "lcg-info --list-ce " \
1828 "--query '" \
1829 "Tag=VO-cms-%s," \
1830 "CEStatus=Production," \
1831 "CloseSE=%s'" % \
1832 (cmssw_version, se_name)
1833 (status, output) = subprocess.getstatusoutput(cmd)
1834 if status != 0:
1835 self.logger.error("Could not check site information " \
1836 "for site `%s'" % se_name)
1837 else:
1838 if (len(output) > 0) or (se_name == "caf.cern.ch"):
1839 self.sites_and_versions_cache[se_name][cmssw_version] = True
1840 site_name = se_name
1841 break
1842 else:
1843 self.sites_and_versions_cache[se_name][cmssw_version] = False
1844 self.logger.debug(" --> rejecting site `%s'" % se_name)
1845 sites.remove(se_name)
1846
1847 if site_name is self.no_matching_site_found_str:
1848 self.logger.error(" --> no matching site found")
1849 self.logger.error(" --> Your release or SCRAM " \
1850 "architecture may not be available" \
1851 "anywhere on the (LCG) grid.")
1852 if not cmd is None:
1853 self.logger.debug(" (command used: `%s')" % cmd)
1854 else:
1855 self.logger.debug(" --> selected site `%s'" % site_name)
1856
1857
1858
1859 if site_name is None:
1860 site_name = self.no_matching_site_found_str
1861
1862
1863 self.all_sites_found = False
1864
1865
1866 return site_name
1867
1868
1869
1870 def parse_cmd_line_options(self):
1871
1872
1873
1874
1875 parser = optparse.OptionParser(version="%s %s" % \
1876 ("%prog", self.version),
1877 formatter=CMSHarvesterHelpFormatter())
1878
1879 self.option_parser = parser
1880
1881
1882 parser.add_option("-d", "--debug",
1883 help="Switch on debug mode",
1884 action="callback",
1885 callback=self.option_handler_debug)
1886
1887
1888 parser.add_option("-q", "--quiet",
1889 help="Be less verbose",
1890 action="callback",
1891 callback=self.option_handler_quiet)
1892
1893
1894
1895 parser.add_option("", "--force",
1896 help="Force mode. Do not abort on sanity check "
1897 "failures",
1898 action="callback",
1899 callback=self.option_handler_force)
1900
1901
1902 parser.add_option("", "--harvesting_type",
1903 help="Harvesting type: %s" % \
1904 ", ".join(self.harvesting_types),
1905 action="callback",
1906 callback=self.option_handler_harvesting_type,
1907 type="string",
1908 metavar="HARVESTING_TYPE")
1909
1910
1911 parser.add_option("", "--harvesting_mode",
1912 help="Harvesting mode: %s (default = %s)" % \
1913 (", ".join(self.harvesting_modes),
1914 self.harvesting_mode_default),
1915 action="callback",
1916 callback=self.option_handler_harvesting_mode,
1917 type="string",
1918 metavar="HARVESTING_MODE")
1919
1920
1921 parser.add_option("", "--globaltag",
1922 help="GlobalTag to use. Default is the ones " \
1923 "the dataset was created with for MC, for data" \
1924 "a GlobalTag has to be specified.",
1925 action="callback",
1926 callback=self.option_handler_globaltag,
1927 type="string",
1928 metavar="GLOBALTAG")
1929
1930
1931 parser.add_option("", "--no-ref-hists",
1932 help="Don't use any reference histograms",
1933 action="callback",
1934 callback=self.option_handler_no_ref_hists)
1935
1936
1937
1938 parser.add_option("", "--frontier-connection",
1939 help="Use this Frontier connection to find " \
1940 "GlobalTags and LocalTags (for reference " \
1941 "histograms).\nPlease only use this for " \
1942 "testing.",
1943 action="callback",
1944 callback=self.option_handler_frontier_connection,
1945 type="string",
1946 metavar="FRONTIER")
1947
1948
1949
1950 parser.add_option("", "--frontier-connection-for-globaltag",
1951 help="Use this Frontier connection to find " \
1952 "GlobalTags.\nPlease only use this for " \
1953 "testing.",
1954 action="callback",
1955 callback=self.option_handler_frontier_connection,
1956 type="string",
1957 metavar="FRONTIER")
1958
1959
1960
1961 parser.add_option("", "--frontier-connection-for-refhists",
1962 help="Use this Frontier connection to find " \
1963 "LocalTags (for reference " \
1964 "histograms).\nPlease only use this for " \
1965 "testing.",
1966 action="callback",
1967 callback=self.option_handler_frontier_connection,
1968 type="string",
1969 metavar="FRONTIER")
1970
1971
1972
1973 parser.add_option("", "--dataset",
1974 help="Name (or regexp) of dataset(s) to process",
1975 action="callback",
1976
1977 callback=self.option_handler_input_spec,
1978 type="string",
1979
1980 metavar="DATASET")
1981
1982
1983
1984 parser.add_option("", "--dataset-ignore",
1985 help="Name (or regexp) of dataset(s) to ignore",
1986 action="callback",
1987 callback=self.option_handler_input_spec,
1988 type="string",
1989 metavar="DATASET-IGNORE")
1990
1991
1992
1993 parser.add_option("", "--runs",
1994 help="Run number(s) to process",
1995 action="callback",
1996 callback=self.option_handler_input_spec,
1997 type="string",
1998 metavar="RUNS")
1999
2000
2001
2002 parser.add_option("", "--runs-ignore",
2003 help="Run number(s) to ignore",
2004 action="callback",
2005 callback=self.option_handler_input_spec,
2006 type="string",
2007 metavar="RUNS-IGNORE")
2008
2009
2010
2011 parser.add_option("", "--datasetfile",
2012 help="File containing list of dataset names " \
2013 "(or regexps) to process",
2014 action="callback",
2015
2016 callback=self.option_handler_input_spec,
2017 type="string",
2018
2019 metavar="DATASETFILE")
2020
2021
2022
2023 parser.add_option("", "--datasetfile-ignore",
2024 help="File containing list of dataset names " \
2025 "(or regexps) to ignore",
2026 action="callback",
2027 callback=self.option_handler_input_spec,
2028 type="string",
2029 metavar="DATASETFILE-IGNORE")
2030
2031
2032
2033 parser.add_option("", "--runslistfile",
2034 help="File containing list of run numbers " \
2035 "to process",
2036 action="callback",
2037 callback=self.option_handler_input_spec,
2038 type="string",
2039 metavar="RUNSLISTFILE")
2040
2041
2042
2043 parser.add_option("", "--runslistfile-ignore",
2044 help="File containing list of run numbers " \
2045 "to ignore",
2046 action="callback",
2047 callback=self.option_handler_input_spec,
2048 type="string",
2049 metavar="RUNSLISTFILE-IGNORE")
2050
2051
2052
2053 parser.add_option("", "--Jsonrunfile",
2054 help="Jsonfile containing dictionary of run/lumisections pairs. " \
2055 "All lumisections of runs contained in dictionary are processed.",
2056 action="callback",
2057 callback=self.option_handler_input_Jsonrunfile,
2058 type="string",
2059 metavar="JSONRUNFILE")
2060
2061
2062
2063 parser.add_option("", "--Jsonfile",
2064 help="Jsonfile containing dictionary of run/lumisections pairs. " \
2065 "Only specified lumisections of runs contained in dictionary are processed.",
2066 action="callback",
2067 callback=self.option_handler_input_Jsonfile,
2068 type="string",
2069 metavar="JSONFILE")
2070
2071
2072
2073 parser.add_option("", "--todo-file",
2074 help="Todo file containing a list of runs to process.",
2075 action="callback",
2076 callback=self.option_handler_input_todofile,
2077 type="string",
2078 metavar="TODO-FILE")
2079
2080
2081
2082 parser.add_option("", "--refhistmappingfile",
2083 help="File to be use for the reference " \
2084 "histogram mappings. Default: `%s'." % \
2085 self.ref_hist_mappings_file_name_default,
2086 action="callback",
2087 callback=self.option_handler_ref_hist_mapping_file,
2088 type="string",
2089 metavar="REFHISTMAPPING-FILE")
2090
2091
2092
2093
2094 parser.add_option("", "--castordir",
2095 help="Place on CASTOR to store results. " \
2096 "Default: `%s'." % \
2097 self.castor_base_dir_default,
2098 action="callback",
2099 callback=self.option_handler_castor_dir,
2100 type="string",
2101 metavar="CASTORDIR")
2102
2103
2104
2105 parser.add_option("", "--no-t1access",
2106 help="Try to create jobs that will run " \
2107 "without special `t1access' role",
2108 action="callback",
2109 callback=self.option_handler_no_t1access)
2110
2111
2112 parser.add_option("", "--caf-access",
2113 help="Crab jobs may run " \
2114 "on CAF",
2115 action="callback",
2116 callback=self.option_handler_caf_access)
2117
2118
2119 parser.add_option("", "--saveByLumiSection",
2120 help="set saveByLumiSection=1 in harvesting cfg file",
2121 action="callback",
2122 callback=self.option_handler_saveByLumiSection)
2123
2124
2125 parser.add_option("", "--automatic-crab-submission",
2126 help="Crab jobs are created and " \
2127 "submitted automatically",
2128 action="callback",
2129 callback=self.option_handler_crab_submission)
2130
2131
2132
2133 parser.add_option("", "--max-sites",
2134 help="Max. number of sites each job is submitted to",
2135 action="callback",
2136 callback=self.option_handler_sites,
2137 type="int")
2138
2139
2140 parser.add_option("", "--site",
2141 help="Crab jobs are submitted to specified site. T1 sites may be shortened by the following (country) codes: \
2142 srm-cms.cern.ch : CH \
2143 ccsrm.in2p3.fr : FR \
2144 cmssrm-fzk.gridka.de : DE \
2145 cmssrm.fnal.gov : GOV \
2146 gridka-dCache.fzk.de : DE2 \
2147 rm-cms.gridpp.rl.ac.uk : UK \
2148 srm.grid.sinica.edu.tw : TW \
2149 srm2.grid.sinica.edu.tw : TW2 \
2150 srmcms.pic.es : ES \
2151 storm-fe-cms.cr.cnaf.infn.it : IT",
2152 action="callback",
2153 callback=self.option_handler_preferred_site,
2154 type="string")
2155
2156
2157
2158 parser.add_option("-l", "--list",
2159 help="List all harvesting types and their" \
2160 "corresponding sequence names",
2161 action="callback",
2162 callback=self.option_handler_list_types)
2163
2164
2165
2166
2167
2168 if len(self.cmd_line_opts) < 1:
2169 self.cmd_line_opts = ["--help"]
2170
2171
2172
2173
2174
2175
2176
2177
2178 for i in ["-d", "--debug",
2179 "-q", "--quiet"]:
2180 if i in self.cmd_line_opts:
2181 self.cmd_line_opts.remove(i)
2182 self.cmd_line_opts.insert(0, i)
2183
2184
2185 parser.set_defaults()
2186 (self.options, self.args) = parser.parse_args(self.cmd_line_opts)
2187
2188
2189
2190
2191
2192 def check_input_status(self):
2193 """Check completeness and correctness of input information.
2194
2195 Check that all required information has been specified and
2196 that, at least as far as can be easily checked, it makes
2197 sense.
2198
2199 NOTE: This is also where any default values are applied.
2200
2201 """
2202
2203 self.logger.info("Checking completeness/correctness of input...")
2204
2205
2206
2207 if len(self.args) > 0:
2208 msg = "Sorry but I don't understand `%s'" % \
2209 (" ".join(self.args))
2210 self.logger.fatal(msg)
2211 raise Usage(msg)
2212
2213
2214
2215
2216 if self.harvesting_mode == "two-step":
2217 msg = "--------------------\n" \
2218 " Sorry, but for the moment (well, till it works)" \
2219 " the two-step mode has been disabled.\n" \
2220 "--------------------\n"
2221 self.logger.fatal(msg)
2222 raise Error(msg)
2223
2224
2225
2226 if self.harvesting_type is None:
2227 msg = "Please specify a harvesting type"
2228 self.logger.fatal(msg)
2229 raise Usage(msg)
2230
2231 if self.harvesting_mode is None:
2232 self.harvesting_mode = self.harvesting_mode_default
2233 msg = "No harvesting mode specified --> using default `%s'" % \
2234 self.harvesting_mode
2235 self.logger.warning(msg)
2236
2237
2238
2239
2240
2241 if self.input_method["datasets"]["use"] is None:
2242 msg = "Please specify an input dataset name " \
2243 "or a list file name"
2244 self.logger.fatal(msg)
2245 raise Usage(msg)
2246
2247
2248
2249 assert not self.input_name["datasets"]["use"] is None
2250
2251
2252
2253
2254
2255
2256 if self.use_ref_hists:
2257 if self.ref_hist_mappings_file_name is None:
2258 self.ref_hist_mappings_file_name = self.ref_hist_mappings_file_name_default
2259 msg = "No reference histogram mapping file specified --> " \
2260 "using default `%s'" % \
2261 self.ref_hist_mappings_file_name
2262 self.logger.warning(msg)
2263
2264
2265
2266
2267
2268 if self.castor_base_dir is None:
2269 self.castor_base_dir = self.castor_base_dir_default
2270 msg = "No CASTOR area specified -> using default `%s'" % \
2271 self.castor_base_dir
2272 self.logger.warning(msg)
2273
2274
2275
2276 if not self.castor_base_dir.startswith(self.castor_prefix):
2277 msg = "CASTOR area does not start with `%s'" % \
2278 self.castor_prefix
2279 self.logger.fatal(msg)
2280 if self.castor_base_dir.find("castor") > -1 and \
2281 not self.castor_base_dir.find("cern.ch") > -1:
2282 self.logger.fatal("Only CERN CASTOR is supported")
2283 raise Usage(msg)
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293 if self.globaltag is None:
2294 self.logger.warning("No GlobalTag specified. This means I cannot")
2295 self.logger.warning("run on data, only on MC.")
2296 self.logger.warning("I will skip all data datasets.")
2297
2298
2299
2300
2301 if not self.globaltag is None:
2302 if not self.globaltag.endswith("::All"):
2303 self.logger.warning("Specified GlobalTag `%s' does " \
2304 "not end in `::All' --> " \
2305 "appending this missing piece" % \
2306 self.globaltag)
2307 self.globaltag = "%s::All" % self.globaltag
2308
2309
2310
2311
2312 for (key, value) in self.frontier_connection_name.items():
2313 frontier_type_str = "unknown"
2314 if key == "globaltag":
2315 frontier_type_str = "the GlobalTag"
2316 elif key == "refhists":
2317 frontier_type_str = "the reference histograms"
2318 non_str = None
2319 if self.frontier_connection_overridden[key] == True:
2320 non_str = "non-"
2321 else:
2322 non_str = ""
2323 self.logger.info("Using %sdefault Frontier " \
2324 "connection for %s: `%s'" % \
2325 (non_str, frontier_type_str, value))
2326
2327
2328
2329
2330
2331
2332
2333 def check_cmssw(self):
2334 """Check if CMSSW is setup.
2335
2336 """
2337
2338
2339
2340
2341 cmssw_version = os.getenv("CMSSW_VERSION")
2342 if cmssw_version is None:
2343 self.logger.fatal("It seems CMSSW is not setup...")
2344 self.logger.fatal("($CMSSW_VERSION is empty)")
2345 raise Error("ERROR: CMSSW needs to be setup first!")
2346
2347 self.cmssw_version = cmssw_version
2348 self.logger.info("Found CMSSW version %s properly set up" % \
2349 self.cmssw_version)
2350
2351
2352 return True
2353
2354
2355
2356 def check_dbs(self):
2357 """Check if DBS is setup.
2358
2359 """
2360
2361
2362
2363
2364 dbs_home = os.getenv("DBSCMD_HOME")
2365 if dbs_home is None:
2366 self.logger.fatal("It seems DBS is not setup...")
2367 self.logger.fatal(" $DBSCMD_HOME is empty")
2368 raise Error("ERROR: DBS needs to be setup first!")
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386 self.logger.debug("Found DBS properly set up")
2387
2388
2389 return True
2390
2391
2392
2393 def setup_dbs(self):
2394 """Setup the Python side of DBS.
2395
2396 For more information see the DBS Python API documentation:
2397 https://twiki.cern.ch/twiki/bin/view/CMS/DBSApiDocumentation
2398
2399 """
2400
2401 try:
2402 args={}
2403 args["url"]= "http://cmsdbsprod.cern.ch/cms_dbs_prod_global/" \
2404 "servlet/DBSServlet"
2405 api = DbsApi(args)
2406 self.dbs_api = api
2407
2408 except DBSAPI.dbsApiException.DbsApiException as ex:
2409 self.logger.fatal("Caught DBS API exception %s: %s " % \
2410 (ex.getClassName(), ex.getErrorMessage()))
2411 if ex.getErrorCode() not in (None, ""):
2412 logger.debug("DBS exception error code: ", ex.getErrorCode())
2413 raise
2414
2415
2416
2417
2418
2419 def dbs_resolve_dataset_name(self, dataset_name):
2420 """Use DBS to resolve a wildcarded dataset name.
2421
2422 """
2423
2424
2425
2426 assert not self.dbs_api is None
2427
2428
2429
2430
2431 if not (dataset_name.startswith("/") and \
2432 dataset_name.endswith("RECO")):
2433 self.logger.warning("Dataset name `%s' does not sound " \
2434 "like a valid dataset name!" % \
2435 dataset_name)
2436
2437
2438
2439 api = self.dbs_api
2440 dbs_query = "find dataset where dataset like %s " \
2441 "and dataset.status = VALID" % \
2442 dataset_name
2443 try:
2444 api_result = api.executeQuery(dbs_query)
2445 except DBSAPI.dbsApiException.DbsApiException:
2446 msg = "ERROR: Could not execute DBS query"
2447 self.logger.fatal(msg)
2448 raise Error(msg)
2449
2450
2451 handler = DBSXMLHandler(["dataset"])
2452 parser = xml.sax.make_parser()
2453 parser.setContentHandler(handler)
2454
2455
2456 try:
2457 xml.sax.parseString(api_result, handler)
2458 except SAXParseException:
2459 msg = "ERROR: Could not parse DBS server output"
2460 self.logger.fatal(msg)
2461 raise Error(msg)
2462
2463
2464 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2465
2466
2467
2468 datasets = handler.results.values()[0]
2469
2470
2471 return datasets
2472
2473
2474
2475 def dbs_resolve_cmssw_version(self, dataset_name):
2476 """Ask DBS for the CMSSW version used to create this dataset.
2477
2478 """
2479
2480
2481
2482 assert not self.dbs_api is None
2483
2484
2485 api = self.dbs_api
2486 dbs_query = "find algo.version where dataset = %s " \
2487 "and dataset.status = VALID" % \
2488 dataset_name
2489 try:
2490 api_result = api.executeQuery(dbs_query)
2491 except DBSAPI.dbsApiException.DbsApiException:
2492 msg = "ERROR: Could not execute DBS query"
2493 self.logger.fatal(msg)
2494 raise Error(msg)
2495
2496 handler = DBSXMLHandler(["algo.version"])
2497 parser = xml.sax.make_parser()
2498 parser.setContentHandler(handler)
2499
2500 try:
2501 xml.sax.parseString(api_result, handler)
2502 except SAXParseException:
2503 msg = "ERROR: Could not parse DBS server output"
2504 self.logger.fatal(msg)
2505 raise Error(msg)
2506
2507
2508 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2509
2510
2511 cmssw_version = handler.results.values()[0]
2512
2513
2514 assert len(cmssw_version) == 1
2515
2516
2517 cmssw_version = cmssw_version[0]
2518
2519
2520 return cmssw_version
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
2535
2536
2537
2538
2539
2540
2541
2542
2543
2544
2545
2546
2547
2548
2549
2550
2551
2552
2553
2554
2555
2556
2557
2558
2559
2560
2561
2562
2563
2564
2565
2566
2567
2568
2569 def dbs_resolve_runs(self, dataset_name):
2570 """Ask DBS for the list of runs in a given dataset.
2571
2572 # NOTE: This does not (yet?) skip/remove empty runs. There is
2573 # a bug in the DBS entry run.numevents (i.e. it always returns
2574 # zero) which should be fixed in the `next DBS release'.
2575 # See also:
2576 # https://savannah.cern.ch/bugs/?53452
2577 # https://savannah.cern.ch/bugs/?53711
2578
2579 """
2580
2581
2582
2583
2584
2585
2586
2587
2588 assert not self.dbs_api is None
2589
2590
2591 api = self.dbs_api
2592 dbs_query = "find run where dataset = %s " \
2593 "and dataset.status = VALID" % \
2594 dataset_name
2595 try:
2596 api_result = api.executeQuery(dbs_query)
2597 except DBSAPI.dbsApiException.DbsApiException:
2598 msg = "ERROR: Could not execute DBS query"
2599 self.logger.fatal(msg)
2600 raise Error(msg)
2601
2602 handler = DBSXMLHandler(["run"])
2603 parser = xml.sax.make_parser()
2604 parser.setContentHandler(handler)
2605
2606 try:
2607 xml.sax.parseString(api_result, handler)
2608 except SAXParseException:
2609 msg = "ERROR: Could not parse DBS server output"
2610 self.logger.fatal(msg)
2611 raise Error(msg)
2612
2613
2614 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2615
2616
2617 runs = handler.results.values()[0]
2618
2619 runs = sorted([int(i) for i in runs])
2620
2621
2622 return runs
2623
2624
2625
2626 def dbs_resolve_globaltag(self, dataset_name):
2627 """Ask DBS for the globaltag corresponding to a given dataset.
2628
2629 # BUG BUG BUG
2630 # This does not seem to work for data datasets? E.g. for
2631 # /Cosmics/Commissioning08_CRAFT0831X_V1_311_ReReco_FromSuperPointing_v1/RAW-RECO
2632 # Probaly due to the fact that the GlobalTag changed during
2633 # datataking...
2634 BUG BUG BUG end
2635
2636 """
2637
2638
2639
2640 assert not self.dbs_api is None
2641
2642
2643 api = self.dbs_api
2644 dbs_query = "find dataset.tag where dataset = %s " \
2645 "and dataset.status = VALID" % \
2646 dataset_name
2647 try:
2648 api_result = api.executeQuery(dbs_query)
2649 except DBSAPI.dbsApiException.DbsApiException:
2650 msg = "ERROR: Could not execute DBS query"
2651 self.logger.fatal(msg)
2652 raise Error(msg)
2653
2654 handler = DBSXMLHandler(["dataset.tag"])
2655 parser = xml.sax.make_parser()
2656 parser.setContentHandler(parser)
2657
2658 try:
2659 xml.sax.parseString(api_result, handler)
2660 except SAXParseException:
2661 msg = "ERROR: Could not parse DBS server output"
2662 self.logger.fatal(msg)
2663 raise Error(msg)
2664
2665
2666 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2667
2668
2669 globaltag = handler.results.values()[0]
2670
2671
2672 assert len(globaltag) == 1
2673
2674
2675 globaltag = globaltag[0]
2676
2677
2678 return globaltag
2679
2680
2681
2682 def dbs_resolve_datatype(self, dataset_name):
2683 """Ask DBS for the the data type (data or mc) of a given
2684 dataset.
2685
2686 """
2687
2688
2689
2690 assert not self.dbs_api is None
2691
2692
2693 api = self.dbs_api
2694 dbs_query = "find datatype.type where dataset = %s " \
2695 "and dataset.status = VALID" % \
2696 dataset_name
2697 try:
2698 api_result = api.executeQuery(dbs_query)
2699 except DBSAPI.dbsApiException.DbsApiException:
2700 msg = "ERROR: Could not execute DBS query"
2701 self.logger.fatal(msg)
2702 raise Error(msg)
2703
2704 handler = DBSXMLHandler(["datatype.type"])
2705 parser = xml.sax.make_parser()
2706 parser.setContentHandler(handler)
2707
2708 try:
2709 xml.sax.parseString(api_result, handler)
2710 except SAXParseException:
2711 msg = "ERROR: Could not parse DBS server output"
2712 self.logger.fatal(msg)
2713 raise Error(msg)
2714
2715
2716 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2717
2718
2719 datatype = handler.results.values()[0]
2720
2721
2722 assert len(datatype) == 1
2723
2724
2725 datatype = datatype[0]
2726
2727
2728 return datatype
2729
2730
2731
2732
2733
2734
2735 def dbs_resolve_number_of_events(self, dataset_name, run_number=None):
2736 """Determine the number of events in a given dataset (and run).
2737
2738 Ask DBS for the number of events in a dataset. If a run number
2739 is specified the number of events returned is that in that run
2740 of that dataset. If problems occur we throw an exception.
2741
2742 # BUG BUG BUG
2743 # Since DBS does not return the number of events correctly,
2744 # neither for runs nor for whole datasets, we have to work
2745 # around that a bit...
2746 # BUG BUG BUG end
2747
2748 """
2749
2750
2751
2752 assert not self.dbs_api is None
2753
2754
2755 api = self.dbs_api
2756 dbs_query = "find file.name, file.numevents where dataset = %s " \
2757 "and dataset.status = VALID" % \
2758 dataset_name
2759 if not run_number is None:
2760 dbs_query = dbq_query + (" and run = %d" % run_number)
2761 try:
2762 api_result = api.executeQuery(dbs_query)
2763 except DBSAPI.dbsApiException.DbsApiException:
2764 msg = "ERROR: Could not execute DBS query"
2765 self.logger.fatal(msg)
2766 raise Error(msg)
2767
2768 handler = DBSXMLHandler(["file.name", "file.numevents"])
2769 parser = xml.sax.make_parser()
2770 parser.setContentHandler(handler)
2771
2772 try:
2773 xml.sax.parseString(api_result, handler)
2774 except SAXParseException:
2775 msg = "ERROR: Could not parse DBS server output"
2776 self.logger.fatal(msg)
2777 raise Error(msg)
2778
2779
2780 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
2781
2782
2783 num_events = sum(handler.results["file.numevents"])
2784
2785
2786 return num_events
2787
2788
2789
2790
2791
2792
2793
2794
2795
2796
2797
2798
2799
2800
2801
2802
2803
2804
2805
2806
2807
2808
2809
2810
2811
2812
2813
2814
2815
2816
2817
2818
2819
2820
2821
2822
2823
2824
2825
2826
2827
2828
2829
2830
2831
2832
2833
2834
2835
2836
2837
2838
2839
2840
2841
2842
2843
2844
2845
2846
2847
2848
2849
2850
2851
2852
2853
2854
2855
2856
2857
2858
2859
2860
2861
2862
2863
2864
2865
2866
2867
2868
2869
2870
2871
2872
2873
2874
2875
2876
2877
2878
2879
2880
2881
2882
2883
2884
2885
2886
2887
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934
2935
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964
2965
2966
2967
2968
2969
2970
2971
2972
2973
2974
2975
2976
2977
2978
2979
2980
2981
2982
2983
2984
2985
2986
2987
2988
2989
2990
2991
2992
2993
2994
2995
2996
2997
2998
2999
3000
3001
3002
3003
3004
3005
3006
3007
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030
3031
3032
3033
3034
3035
3036
3037
3038
3039
3040
3041
3042
3043
3044
3045
3046
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059
3060
3061
3062
3063
3064
3065
3066
3067
3068
3069
3070
3071
3072
3073
3074
3075
3076 def dbs_check_dataset_spread(self, dataset_name):
3077 """Figure out the number of events in each run of this dataset.
3078
3079 This is a more efficient way of doing this than calling
3080 dbs_resolve_number_of_events for each run.
3081
3082 """
3083
3084 self.logger.debug("Checking spread of dataset `%s'" % dataset_name)
3085
3086
3087
3088 assert not self.dbs_api is None
3089
3090
3091 api = self.dbs_api
3092 dbs_query = "find run.number, site, file.name, file.numevents " \
3093 "where dataset = %s " \
3094 "and dataset.status = VALID" % \
3095 dataset_name
3096 try:
3097 api_result = api.executeQuery(dbs_query)
3098 except DBSAPI.dbsApiException.DbsApiException:
3099 msg = "ERROR: Could not execute DBS query"
3100 self.logger.fatal(msg)
3101 raise Error(msg)
3102
3103 handler = DBSXMLHandler(["run.number", "site", "file.name", "file.numevents"])
3104 parser = xml.sax.make_parser()
3105 parser.setContentHandler(handler)
3106
3107 try:
3108
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120
3121
3122
3123
3124
3125
3126
3127
3128
3129
3130
3131
3132
3133
3134
3135
3136
3137
3138
3139
3140
3141
3142
3143
3144
3145
3146
3147
3148 xml.sax.parseString(api_result, handler)
3149 except SAXParseException:
3150 msg = "ERROR: Could not parse DBS server output"
3151 self.logger.fatal(msg)
3152 raise Error(msg)
3153
3154
3155 assert(handler.check_results_validity()), "ERROR The DBSXMLHandler screwed something up!"
3156
3157
3158
3159
3160
3161 files_info = {}
3162 for (index, site_name) in enumerate(handler.results["site"]):
3163
3164
3165
3166
3167
3168
3169
3170
3171
3172 if len(site_name) < 1:
3173 continue
3174 run_number = int(handler.results["run.number"][index])
3175 file_name = handler.results["file.name"][index]
3176 nevents = int(handler.results["file.numevents"][index])
3177
3178
3179 if run_number not in files_info:
3180
3181 files_info[run_number] = {}
3182 files_info[run_number][file_name] = (nevents,
3183 [site_name])
3184 elif file_name not in files_info[run_number]:
3185
3186 files_info[run_number][file_name] = (nevents,
3187 [site_name])
3188 else:
3189
3190
3191
3192
3193 assert nevents == files_info[run_number][file_name][0]
3194
3195 files_info[run_number][file_name][1].append(site_name)
3196
3197
3198
3199
3200 for run_number in files_info.keys():
3201 files_without_sites = [i for (i, j) in \
3202 files_info[run_number].items() \
3203 if len(j[1]) < 1]
3204 if len(files_without_sites) > 0:
3205 self.logger.warning("Removing %d file(s)" \
3206 " with empty site names" % \
3207 len(files_without_sites))
3208 for file_name in files_without_sites:
3209 del files_info[run_number][file_name]
3210
3211
3212
3213
3214
3215 num_events_catalog = {}
3216 for run_number in files_info.keys():
3217 site_names = list(set([j for i in files_info[run_number].values() for j in i[1]]))
3218
3219
3220
3221
3222 mirrored = None
3223 if len(site_names) > 1:
3224
3225
3226
3227
3228
3229
3230
3231 all_file_names = files_info[run_number].keys()
3232 all_file_names = set(all_file_names)
3233 sites_with_complete_copies = []
3234 for site_name in site_names:
3235 files_at_site = [i for (i, (j, k)) \
3236 in files_info[run_number].items() \
3237 if site_name in k]
3238 files_at_site = set(files_at_site)
3239 if files_at_site == all_file_names:
3240 sites_with_complete_copies.append(site_name)
3241 if len(sites_with_complete_copies) < 1:
3242
3243
3244
3245 mirrored = False
3246 else:
3247 if len(sites_with_complete_copies) > 1:
3248
3249
3250 mirrored = True
3251 else:
3252
3253
3254
3255
3256
3257 mirrored = True
3258
3259
3260
3261
3262
3263
3264
3265 if mirrored:
3266 self.logger.debug(" -> run appears to be `mirrored'")
3267 else:
3268 self.logger.debug(" -> run appears to be spread-out")
3269
3270 if mirrored and \
3271 len(sites_with_complete_copies) != len(site_names):
3272
3273
3274
3275 for (file_name, (i, sites)) in files_info[run_number].items():
3276 complete_sites = [site for site in sites \
3277 if site in sites_with_complete_copies]
3278 files_info[run_number][file_name] = (i, complete_sites)
3279 site_names = sites_with_complete_copies
3280
3281 self.logger.debug(" for run #%d:" % run_number)
3282 num_events_catalog[run_number] = {}
3283 num_events_catalog[run_number]["all_sites"] = sum([i[0] for i in files_info[run_number].values()])
3284 if len(site_names) < 1:
3285 self.logger.debug(" run is not available at any site")
3286 self.logger.debug(" (but should contain %d events" % \
3287 num_events_catalog[run_number]["all_sites"])
3288 else:
3289 self.logger.debug(" at all sites combined there are %d events" % \
3290 num_events_catalog[run_number]["all_sites"])
3291 for site_name in site_names:
3292 num_events_catalog[run_number][site_name] = sum([i[0] for i in files_info[run_number].values() if site_name in i[1]])
3293 self.logger.debug(" at site `%s' there are %d events" % \
3294 (site_name, num_events_catalog[run_number][site_name]))
3295 num_events_catalog[run_number]["mirrored"] = mirrored
3296
3297
3298 return num_events_catalog
3299
3300
3301
3302
3303
3304
3305
3306
3307
3308
3309
3310
3311
3312
3313
3314
3315
3316
3317
3318
3319
3320
3321
3322
3323
3324
3325
3326
3327
3328
3329
3330
3331
3332
3333
3334
3335
3336
3337
3338
3339
3340
3341
3342
3343
3344
3345
3346
3347
3348
3349
3350
3351
3352
3353
3354
3355
3356
3357 def build_dataset_list(self, input_method, input_name):
3358 """Build a list of all datasets to be processed.
3359
3360 """
3361
3362 dataset_names = []
3363
3364
3365
3366
3367 if input_method is None:
3368 pass
3369 elif input_method == "dataset":
3370
3371
3372
3373
3374 self.logger.info("Asking DBS for dataset names")
3375 dataset_names = self.dbs_resolve_dataset_name(input_name)
3376 elif input_method == "datasetfile":
3377
3378
3379
3380
3381 self.logger.info("Reading input from list file `%s'" % \
3382 input_name)
3383 try:
3384 listfile = open("/afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/bin/%s" %input_name, "r")
3385 print("open listfile")
3386 for dataset in listfile:
3387
3388 dataset_stripped = dataset.strip()
3389 if len(dataset_stripped) < 1:
3390 continue
3391
3392 if dataset_stripped[0] != "#":
3393 dataset_names.extend(self. \
3394 dbs_resolve_dataset_name(dataset_stripped))
3395 listfile.close()
3396 except IOError:
3397 msg = "ERROR: Could not open input list file `%s'" % \
3398 input_name
3399 self.logger.fatal(msg)
3400 raise Error(msg)
3401 else:
3402
3403
3404 assert False, "Unknown input method `%s'" % input_method
3405
3406
3407
3408
3409
3410
3411
3412 dataset_names = sorted(set(dataset_names))
3413
3414
3415
3416 return dataset_names
3417
3418
3419
3420 def build_dataset_use_list(self):
3421 """Build a list of datasets to process.
3422
3423 """
3424
3425 self.logger.info("Building list of datasets to consider...")
3426
3427 input_method = self.input_method["datasets"]["use"]
3428 input_name = self.input_name["datasets"]["use"]
3429 dataset_names = self.build_dataset_list(input_method,
3430 input_name)
3431 self.datasets_to_use = dict(list(zip(dataset_names,
3432 [None] * len(dataset_names))))
3433
3434 self.logger.info(" found %d dataset(s) to process:" % \
3435 len(dataset_names))
3436 for dataset in dataset_names:
3437 self.logger.info(" `%s'" % dataset)
3438
3439
3440
3441
3442
3443 def build_dataset_ignore_list(self):
3444 """Build a list of datasets to ignore.
3445
3446 NOTE: We should always have a list of datasets to process, but
3447 it may be that we don't have a list of datasets to ignore.
3448
3449 """
3450
3451 self.logger.info("Building list of datasets to ignore...")
3452
3453 input_method = self.input_method["datasets"]["ignore"]
3454 input_name = self.input_name["datasets"]["ignore"]
3455 dataset_names = self.build_dataset_list(input_method,
3456 input_name)
3457 self.datasets_to_ignore = dict(list(zip(dataset_names,
3458 [None] * len(dataset_names))))
3459
3460 self.logger.info(" found %d dataset(s) to ignore:" % \
3461 len(dataset_names))
3462 for dataset in dataset_names:
3463 self.logger.info(" `%s'" % dataset)
3464
3465
3466
3467
3468
3469 def build_runs_list(self, input_method, input_name):
3470
3471 runs = []
3472
3473
3474
3475 if input_method is None:
3476 pass
3477 elif input_method == "runs":
3478
3479
3480 self.logger.info("Reading list of runs from the " \
3481 "command line")
3482 runs.extend([int(i.strip()) \
3483 for i in input_name.split(",") \
3484 if len(i.strip()) > 0])
3485 elif input_method == "runslistfile":
3486
3487 self.logger.info("Reading list of runs from file `%s'" % \
3488 input_name)
3489 try:
3490 listfile = open(input_name, "r")
3491 for run in listfile:
3492
3493 run_stripped = run.strip()
3494 if len(run_stripped) < 1:
3495 continue
3496
3497 if run_stripped[0] != "#":
3498 runs.append(int(run_stripped))
3499 listfile.close()
3500 except IOError:
3501 msg = "ERROR: Could not open input list file `%s'" % \
3502 input_name
3503 self.logger.fatal(msg)
3504 raise Error(msg)
3505
3506 else:
3507
3508
3509 assert False, "Unknown input method `%s'" % input_method
3510
3511
3512
3513 runs = list(set(runs))
3514
3515
3516 return runs
3517
3518
3519
3520 def build_runs_use_list(self):
3521 """Build a list of runs to process.
3522
3523 """
3524
3525 self.logger.info("Building list of runs to consider...")
3526
3527 input_method = self.input_method["runs"]["use"]
3528 input_name = self.input_name["runs"]["use"]
3529 runs = self.build_runs_list(input_method, input_name)
3530 self.runs_to_use = dict(list(zip(runs, [None] * len(runs))))
3531
3532 self.logger.info(" found %d run(s) to process:" % \
3533 len(runs))
3534 if len(runs) > 0:
3535 self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3536
3537
3538
3539
3540
3541 def build_runs_ignore_list(self):
3542 """Build a list of runs to ignore.
3543
3544 NOTE: We should always have a list of runs to process, but
3545 it may be that we don't have a list of runs to ignore.
3546
3547 """
3548
3549 self.logger.info("Building list of runs to ignore...")
3550
3551 input_method = self.input_method["runs"]["ignore"]
3552 input_name = self.input_name["runs"]["ignore"]
3553 runs = self.build_runs_list(input_method, input_name)
3554 self.runs_to_ignore = dict(list(zip(runs, [None] * len(runs))))
3555
3556 self.logger.info(" found %d run(s) to ignore:" % \
3557 len(runs))
3558 if len(runs) > 0:
3559 self.logger.info(" %s" % ", ".join([str(i) for i in runs]))
3560
3561
3562
3563
3564
3565 def process_dataset_ignore_list(self):
3566 """Update the list of datasets taking into account the ones to
3567 ignore.
3568
3569 Both lists have been generated before from DBS and both are
3570 assumed to be unique.
3571
3572 NOTE: The advantage of creating the ignore list from DBS (in
3573 case a regexp is given) and matching that instead of directly
3574 matching the ignore criterion against the list of datasets (to
3575 consider) built from DBS is that in the former case we're sure
3576 that all regexps are treated exactly as DBS would have done
3577 without the cmsHarvester.
3578
3579 NOTE: This only removes complete samples. Exclusion of single
3580 runs is done by the book keeping. So the assumption is that a
3581 user never wants to harvest just part (i.e. n out of N runs)
3582 of a sample.
3583
3584 """
3585
3586 self.logger.info("Processing list of datasets to ignore...")
3587
3588 self.logger.debug("Before processing ignore list there are %d " \
3589 "datasets in the list to be processed" % \
3590 len(self.datasets_to_use))
3591
3592
3593 dataset_names_filtered = copy.deepcopy(self.datasets_to_use)
3594 for dataset_name in self.datasets_to_use.keys():
3595 if dataset_name in self.datasets_to_ignore.keys():
3596 del dataset_names_filtered[dataset_name]
3597
3598 self.logger.info(" --> Removed %d dataset(s)" % \
3599 (len(self.datasets_to_use) -
3600 len(dataset_names_filtered)))
3601
3602 self.datasets_to_use = dataset_names_filtered
3603
3604 self.logger.debug("After processing ignore list there are %d " \
3605 "datasets in the list to be processed" % \
3606 len(self.datasets_to_use))
3607
3608
3609
3610
3611
3612 def process_runs_use_and_ignore_lists(self):
3613
3614 self.logger.info("Processing list of runs to use and ignore...")
3615
3616
3617
3618
3619
3620
3621
3622
3623
3624
3625 runs_to_use = self.runs_to_use
3626 runs_to_ignore = self.runs_to_ignore
3627
3628 for dataset_name in self.datasets_to_use:
3629 runs_in_dataset = self.datasets_information[dataset_name]["runs"]
3630
3631
3632 runs_to_use_tmp = []
3633 for run in runs_to_use:
3634 if not run in runs_in_dataset:
3635 self.logger.warning("Dataset `%s' does not contain " \
3636 "requested run %d " \
3637 "--> ignoring `use' of this run" % \
3638 (dataset_name, run))
3639 else:
3640 runs_to_use_tmp.append(run)
3641
3642 if len(runs_to_use) > 0:
3643 runs = runs_to_use_tmp
3644 self.logger.info("Using %d out of %d runs " \
3645 "of dataset `%s'" % \
3646 (len(runs), len(runs_in_dataset),
3647 dataset_name))
3648 else:
3649 runs = runs_in_dataset
3650
3651 if len(runs_to_ignore) > 0:
3652 runs_tmp = []
3653 for run in runs:
3654 if not run in runs_to_ignore:
3655 runs_tmp.append(run)
3656 self.logger.info("Ignoring %d out of %d runs " \
3657 "of dataset `%s'" % \
3658 (len(runs)- len(runs_tmp),
3659 len(runs_in_dataset),
3660 dataset_name))
3661 runs = runs_tmp
3662
3663 if self.todofile != "YourToDofile.txt":
3664 runs_todo = []
3665 print("Reading runs from file /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s" %self.todofile)
3666 cmd="grep %s /afs/cern.ch/cms/CAF/CMSCOMM/COMM_DQM/harvesting/%s | cut -f5 -d' '" %(dataset_name,self.todofile)
3667 (status, output)=subprocess.getstatusoutput(cmd)
3668 for run in runs:
3669 run_str="%s" %run
3670 if run_str in output:
3671 runs_todo.append(run)
3672 self.logger.info("Using %d runs " \
3673 "of dataset `%s'" % \
3674 (len(runs_todo),
3675 dataset_name))
3676 runs=runs_todo
3677
3678 Json_runs = []
3679 if self.Jsonfilename != "YourJSON.txt":
3680 good_runs = []
3681 self.Jsonlumi = True
3682
3683
3684 self.logger.info("Reading runs and lumisections from file `%s'" % \
3685 self.Jsonfilename)
3686 try:
3687 Jsonfile = open(self.Jsonfilename, "r")
3688 for names in Jsonfile:
3689 dictNames= eval(str(names))
3690 for key in dictNames:
3691 intkey=int(key)
3692 Json_runs.append(intkey)
3693 Jsonfile.close()
3694 except IOError:
3695 msg = "ERROR: Could not open Jsonfile `%s'" % \
3696 input_name
3697 self.logger.fatal(msg)
3698 raise Error(msg)
3699 for run in runs:
3700 if run in Json_runs:
3701 good_runs.append(run)
3702 self.logger.info("Using %d runs " \
3703 "of dataset `%s'" % \
3704 (len(good_runs),
3705 dataset_name))
3706 runs=good_runs
3707 if (self.Jsonrunfilename != "YourJSON.txt") and (self.Jsonfilename == "YourJSON.txt"):
3708 good_runs = []
3709
3710
3711 self.logger.info("Reading runs from file `%s'" % \
3712 self.Jsonrunfilename)
3713 try:
3714 Jsonfile = open(self.Jsonrunfilename, "r")
3715 for names in Jsonfile:
3716 dictNames= eval(str(names))
3717 for key in dictNames:
3718 intkey=int(key)
3719 Json_runs.append(intkey)
3720 Jsonfile.close()
3721 except IOError:
3722 msg = "ERROR: Could not open Jsonfile `%s'" % \
3723 input_name
3724 self.logger.fatal(msg)
3725 raise Error(msg)
3726 for run in runs:
3727 if run in Json_runs:
3728 good_runs.append(run)
3729 self.logger.info("Using %d runs " \
3730 "of dataset `%s'" % \
3731 (len(good_runs),
3732 dataset_name))
3733 runs=good_runs
3734
3735 self.datasets_to_use[dataset_name] = runs
3736
3737
3738
3739
3740
3741 def singlify_datasets(self):
3742 """Remove all but the largest part of all datasets.
3743
3744 This allows us to harvest at least part of these datasets
3745 using single-step harvesting until the two-step approach
3746 works.
3747
3748 """
3749
3750
3751 assert self.harvesting_mode == "single-step-allow-partial"
3752
3753
3754 for dataset_name in self.datasets_to_use:
3755 for run_number in self.datasets_information[dataset_name]["runs"]:
3756 max_events = max(self.datasets_information[dataset_name]["sites"][run_number].values())
3757 sites_with_max_events = [i[0] for i in self.datasets_information[dataset_name]["sites"][run_number].items() if i[1] == max_events]
3758 self.logger.warning("Singlifying dataset `%s', " \
3759 "run %d" % \
3760 (dataset_name, run_number))
3761 cmssw_version = self.datasets_information[dataset_name] \
3762 ["cmssw_version"]
3763 selected_site = self.pick_a_site(sites_with_max_events,
3764 cmssw_version)
3765
3766
3767 nevents_old = self.datasets_information[dataset_name]["num_events"][run_number]
3768 self.logger.warning(" --> " \
3769 "only harvesting partial statistics: " \
3770 "%d out of %d events (5.1%f%%) " \
3771 "at site `%s'" % \
3772 (max_events,
3773 nevents_old,
3774 100. * max_events / nevents_old,
3775 selected_site))
3776 self.logger.warning("!!! Please note that the number of " \
3777 "events in the output path name will " \
3778 "NOT reflect the actual statistics in " \
3779 "the harvested results !!!")
3780
3781
3782
3783
3784
3785
3786 self.datasets_information[dataset_name]["sites"][run_number] = {selected_site: max_events}
3787 self.datasets_information[dataset_name]["num_events"][run_number] = max_events
3788
3789
3790
3791
3792
3793
3794 def check_dataset_list(self):
3795 """Check list of dataset names for impossible ones.
3796
3797 Two kinds of checks are done:
3798 - Checks for things that do not make sense. These lead to
3799 errors and skipped datasets.
3800 - Sanity checks. For these warnings are issued but the user is
3801 considered to be the authoritative expert.
3802
3803 Checks performed:
3804 - The CMSSW version encoded in the dataset name should match
3805 self.cmssw_version. This is critical.
3806 - There should be some events in the dataset/run. This is
3807 critical in the sense that CRAB refuses to create jobs for
3808 zero events. And yes, this does happen in practice. E.g. the
3809 reprocessed CRAFT08 datasets contain runs with zero events.
3810 - A cursory check is performed to see if the harvesting type
3811 makes sense for the data type. This should prevent the user
3812 from inadvertently running RelVal for data.
3813 - It is not possible to run single-step harvesting jobs on
3814 samples that are not fully contained at a single site.
3815 - Each dataset/run has to be available at at least one site.
3816
3817 """
3818
3819 self.logger.info("Performing sanity checks on dataset list...")
3820
3821 dataset_names_after_checks = copy.deepcopy(self.datasets_to_use)
3822
3823 for dataset_name in self.datasets_to_use.keys():
3824
3825
3826 version_from_dataset = self.datasets_information[dataset_name] \
3827 ["cmssw_version"]
3828 if version_from_dataset != self.cmssw_version:
3829 msg = " CMSSW version mismatch for dataset `%s' " \
3830 "(%s vs. %s)" % \
3831 (dataset_name,
3832 self.cmssw_version, version_from_dataset)
3833 if self.force_running:
3834
3835 self.logger.warning("%s " \
3836 "--> `force mode' active: " \
3837 "run anyway" % msg)
3838 else:
3839 del dataset_names_after_checks[dataset_name]
3840 self.logger.warning("%s " \
3841 "--> skipping" % msg)
3842 continue
3843
3844
3845
3846
3847
3848
3849
3850
3851 suspicious = False
3852 datatype = self.datasets_information[dataset_name]["datatype"]
3853 if datatype == "data":
3854
3855 if self.harvesting_type != "DQMOffline":
3856 suspicious = True
3857 elif datatype == "mc":
3858 if self.harvesting_type == "DQMOffline":
3859 suspicious = True
3860 else:
3861
3862 assert False, "ERROR Impossible data type `%s' " \
3863 "for dataset `%s'" % \
3864 (datatype, dataset_name)
3865 if suspicious:
3866 msg = " Normally one does not run `%s' harvesting " \
3867 "on %s samples, are you sure?" % \
3868 (self.harvesting_type, datatype)
3869 if self.force_running:
3870 self.logger.warning("%s " \
3871 "--> `force mode' active: " \
3872 "run anyway" % msg)
3873 else:
3874 del dataset_names_after_checks[dataset_name]
3875 self.logger.warning("%s " \
3876 "--> skipping" % msg)
3877 continue
3878
3879
3880
3881
3882
3883
3884
3885
3886
3887
3888
3889
3890 if datatype == "data":
3891 if self.globaltag is None:
3892 msg = "For data datasets (like `%s') " \
3893 "we need a GlobalTag" % \
3894 dataset_name
3895 del dataset_names_after_checks[dataset_name]
3896 self.logger.warning("%s " \
3897 "--> skipping" % msg)
3898 continue
3899
3900
3901
3902
3903
3904
3905
3906
3907 globaltag = self.datasets_information[dataset_name]["globaltag"]
3908 if not globaltag in self.globaltag_check_cache:
3909 if self.check_globaltag(globaltag):
3910 self.globaltag_check_cache.append(globaltag)
3911 else:
3912 msg = "Something is wrong with GlobalTag `%s' " \
3913 "used by dataset `%s'!" % \
3914 (globaltag, dataset_name)
3915 if self.use_ref_hists:
3916 msg += "\n(Either it does not exist or it " \
3917 "does not contain the required key to " \
3918 "be used with reference histograms.)"
3919 else:
3920 msg += "\n(It probably just does not exist.)"
3921 self.logger.fatal(msg)
3922 raise Usage(msg)
3923
3924
3925
3926
3927 runs_without_sites = [i for (i, j) in \
3928 self.datasets_information[dataset_name] \
3929 ["sites"].items() \
3930 if len(j) < 1 and \
3931 i in self.datasets_to_use[dataset_name]]
3932 if len(runs_without_sites) > 0:
3933 for run_without_sites in runs_without_sites:
3934 try:
3935 dataset_names_after_checks[dataset_name].remove(run_without_sites)
3936 except KeyError:
3937 pass
3938 self.logger.warning(" removed %d unavailable run(s) " \
3939 "from dataset `%s'" % \
3940 (len(runs_without_sites), dataset_name))
3941 self.logger.debug(" (%s)" % \
3942 ", ".join([str(i) for i in \
3943 runs_without_sites]))
3944
3945
3946
3947
3948
3949 if not self.harvesting_mode == "two-step":
3950 for run_number in self.datasets_to_use[dataset_name]:
3951
3952
3953
3954
3955 num_sites = len(self.datasets_information[dataset_name] \
3956 ["sites"][run_number])
3957 if num_sites > 1 and \
3958 not self.datasets_information[dataset_name] \
3959 ["mirrored"][run_number]:
3960
3961
3962
3963 msg = " Dataset `%s', run %d is spread across more " \
3964 "than one site.\n" \
3965 " Cannot run single-step harvesting on " \
3966 "samples spread across multiple sites" % \
3967 (dataset_name, run_number)
3968 try:
3969 dataset_names_after_checks[dataset_name].remove(run_number)
3970 except KeyError:
3971 pass
3972 self.logger.warning("%s " \
3973 "--> skipping" % msg)
3974
3975
3976
3977
3978
3979
3980
3981
3982 tmp = [j for (i, j) in self.datasets_information \
3983 [dataset_name]["num_events"].items() \
3984 if i in self.datasets_to_use[dataset_name]]
3985 num_events_dataset = sum(tmp)
3986
3987 if num_events_dataset < 1:
3988 msg = " dataset `%s' is empty" % dataset_name
3989 del dataset_names_after_checks[dataset_name]
3990 self.logger.warning("%s " \
3991 "--> skipping" % msg)
3992
3993
3994
3995
3996
3997
3998
3999
4000
4001 continue
4002
4003 tmp = [i for i in \
4004 self.datasets_information[dataset_name] \
4005 ["num_events"].items() if i[1] < 1]
4006 tmp = [i for i in tmp if i[0] in self.datasets_to_use[dataset_name]]
4007 empty_runs = dict(tmp)
4008 if len(empty_runs) > 0:
4009 for empty_run in empty_runs:
4010 try:
4011 dataset_names_after_checks[dataset_name].remove(empty_run)
4012 except KeyError:
4013 pass
4014 self.logger.info(" removed %d empty run(s) from dataset `%s'" % \
4015 (len(empty_runs), dataset_name))
4016 self.logger.debug(" (%s)" % \
4017 ", ".join([str(i) for i in empty_runs]))
4018
4019
4020
4021
4022
4023 dataset_names_after_checks_tmp = copy.deepcopy(dataset_names_after_checks)
4024 for (dataset_name, runs) in dataset_names_after_checks.items():
4025 if len(runs) < 1:
4026 self.logger.warning(" Removing dataset without any runs " \
4027 "(left) `%s'" % \
4028 dataset_name)
4029 del dataset_names_after_checks_tmp[dataset_name]
4030 dataset_names_after_checks = dataset_names_after_checks_tmp
4031
4032
4033
4034 self.logger.warning(" --> Removed %d dataset(s)" % \
4035 (len(self.datasets_to_use) -
4036 len(dataset_names_after_checks)))
4037
4038
4039 self.datasets_to_use = dataset_names_after_checks
4040
4041
4042
4043
4044
4045 def escape_dataset_name(self, dataset_name):
4046 """Escape a DBS dataset name.
4047
4048 Escape a DBS dataset name such that it does not cause trouble
4049 with the file system. This means turning each `/' into `__',
4050 except for the first one which is just removed.
4051
4052 """
4053
4054 escaped_dataset_name = dataset_name
4055 escaped_dataset_name = escaped_dataset_name.strip("/")
4056 escaped_dataset_name = escaped_dataset_name.replace("/", "__")
4057
4058 return escaped_dataset_name
4059
4060
4061
4062
4063
4064 def create_config_file_name(self, dataset_name, run_number):
4065 """Generate the name of the configuration file to be run by
4066 CRAB.
4067
4068 Depending on the harvesting mode (single-step or two-step)
4069 this is the name of the real harvesting configuration or the
4070 name of the first-step ME summary extraction configuration.
4071
4072 """
4073
4074 if self.harvesting_mode == "single-step":
4075 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4076 elif self.harvesting_mode == "single-step-allow-partial":
4077 config_file_name = self.create_harvesting_config_file_name(dataset_name)
4078
4079
4080
4081
4082
4083
4084 elif self.harvesting_mode == "two-step":
4085 config_file_name = self.create_me_summary_config_file_name(dataset_name)
4086 else:
4087 assert False, "ERROR Unknown harvesting mode `%s'" % \
4088 self.harvesting_mode
4089
4090
4091 return config_file_name
4092
4093
4094
4095
4096 def create_harvesting_config_file_name(self, dataset_name):
4097 "Generate the name to be used for the harvesting config file."
4098
4099 file_name_base = "harvesting.py"
4100 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4101 config_file_name = file_name_base.replace(".py",
4102 "_%s.py" % \
4103 dataset_name_escaped)
4104
4105
4106 return config_file_name
4107
4108
4109
4110 def create_me_summary_config_file_name(self, dataset_name):
4111 "Generate the name of the ME summary extraction config file."
4112
4113 file_name_base = "me_extraction.py"
4114 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4115 config_file_name = file_name_base.replace(".py",
4116 "_%s.py" % \
4117 dataset_name_escaped)
4118
4119
4120 return config_file_name
4121
4122
4123
4124 def create_output_file_name(self, dataset_name, run_number=None):
4125 """Create the name of the output file name to be used.
4126
4127 This is the name of the output file of the `first step'. In
4128 the case of single-step harvesting this is already the final
4129 harvesting output ROOT file. In the case of two-step
4130 harvesting it is the name of the intermediary ME summary
4131 file.
4132
4133 """
4134
4135
4136
4137
4138
4139
4140
4141
4142
4143 if self.harvesting_mode == "single-step":
4144
4145 assert not run_number is None
4146
4147 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4148 elif self.harvesting_mode == "single-step-allow-partial":
4149
4150 assert not run_number is None
4151
4152 output_file_name = self.create_harvesting_output_file_name(dataset_name, run_number)
4153 elif self.harvesting_mode == "two-step":
4154
4155 assert run_number is None
4156
4157 output_file_name = self.create_me_summary_output_file_name(dataset_name)
4158 else:
4159
4160 assert False, "ERROR Unknown harvesting mode `%s'" % \
4161 self.harvesting_mode
4162
4163
4164 return output_file_name
4165
4166
4167
4168 def create_harvesting_output_file_name(self, dataset_name, run_number):
4169 """Generate the name to be used for the harvesting output file.
4170
4171 This harvesting output file is the _final_ ROOT output file
4172 containing the harvesting results. In case of two-step
4173 harvesting there is an intermediate ME output file as well.
4174
4175 """
4176
4177 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4178
4179
4180
4181
4182
4183
4184
4185 output_file_name = "DQM_V0001_R%09d__%s.root" % \
4186 (run_number, dataset_name_escaped)
4187 if self.harvesting_mode.find("partial") > -1:
4188
4189
4190 if self.datasets_information[dataset_name] \
4191 ["mirrored"][run_number] == False:
4192 output_file_name = output_file_name.replace(".root", \
4193 "_partial.root")
4194
4195
4196 return output_file_name
4197
4198
4199
4200 def create_me_summary_output_file_name(self, dataset_name):
4201 """Generate the name of the intermediate ME file name to be
4202 used in two-step harvesting.
4203
4204 """
4205
4206 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4207 output_file_name = "me_summary_%s.root" % \
4208 dataset_name_escaped
4209
4210
4211 return output_file_name
4212
4213
4214
4215 def create_multicrab_block_name(self, dataset_name, run_number, index):
4216 """Create the block name to use for this dataset/run number.
4217
4218 This is what appears in the brackets `[]' in multicrab.cfg. It
4219 is used as the name of the job and to create output
4220 directories.
4221
4222 """
4223
4224 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4225 block_name = "%s_%09d_%s" % (dataset_name_escaped, run_number, index)
4226
4227
4228 return block_name
4229
4230
4231
4232 def create_crab_config(self):
4233 """Create a CRAB configuration for a given job.
4234
4235 NOTE: This is _not_ a complete (as in: submittable) CRAB
4236 configuration. It is used to store the common settings for the
4237 multicrab configuration.
4238
4239 NOTE: Only CERN CASTOR area (/castor/cern.ch/) is supported.
4240
4241 NOTE: According to CRAB, you `Must define exactly two of
4242 total_number_of_events, events_per_job, or
4243 number_of_jobs.'. For single-step harvesting we force one job,
4244 for the rest we don't really care.
4245
4246 # BUG BUG BUG
4247 # With the current version of CRAB (2.6.1), in which Daniele
4248 # fixed the behaviour of no_block_boundary for me, one _has to
4249 # specify_ the total_number_of_events and one single site in
4250 # the se_white_list.
4251 # BUG BUG BUG end
4252
4253 """
4254
4255 tmp = []
4256
4257
4258 castor_prefix = self.castor_prefix
4259
4260 tmp.append(self.config_file_header())
4261 tmp.append("")
4262
4263
4264
4265 tmp.append("[CRAB]")
4266 tmp.append("jobtype = cmssw")
4267 tmp.append("")
4268
4269
4270
4271 tmp.append("[GRID]")
4272 tmp.append("virtual_organization=cms")
4273 tmp.append("")
4274
4275
4276
4277 tmp.append("[USER]")
4278 tmp.append("copy_data = 1")
4279 tmp.append("")
4280
4281
4282
4283 tmp.append("[CMSSW]")
4284 tmp.append("# This reveals data hosted on T1 sites,")
4285 tmp.append("# which is normally hidden by CRAB.")
4286 tmp.append("show_prod = 1")
4287 tmp.append("number_of_jobs = 1")
4288 if self.Jsonlumi == True:
4289 tmp.append("lumi_mask = %s" % self.Jsonfilename)
4290 tmp.append("total_number_of_lumis = -1")
4291 else:
4292 if self.harvesting_type == "DQMOffline":
4293 tmp.append("total_number_of_lumis = -1")
4294 else:
4295 tmp.append("total_number_of_events = -1")
4296 if self.harvesting_mode.find("single-step") > -1:
4297 tmp.append("# Force everything to run in one job.")
4298 tmp.append("no_block_boundary = 1")
4299 tmp.append("")
4300
4301
4302
4303 tmp.append("[CAF]")
4304
4305 crab_config = "\n".join(tmp)
4306
4307
4308 return crab_config
4309
4310
4311
4312 def create_multicrab_config(self):
4313 """Create a multicrab.cfg file for all samples.
4314
4315 This creates the contents for a multicrab.cfg file that uses
4316 the crab.cfg file (generated elsewhere) for the basic settings
4317 and contains blocks for each run of each dataset.
4318
4319 # BUG BUG BUG
4320 # The fact that it's necessary to specify the se_white_list
4321 # and the total_number_of_events is due to our use of CRAB
4322 # version 2.6.1. This should no longer be necessary in the
4323 # future.
4324 # BUG BUG BUG end
4325
4326 """
4327
4328 cmd="who i am | cut -f1 -d' '"
4329 (status, output)=subprocess.getstatusoutput(cmd)
4330 UserName = output
4331
4332 if self.caf_access == True:
4333 print("Extracting %s as user name" %UserName)
4334
4335 number_max_sites = self.nr_max_sites + 1
4336
4337 multicrab_config_lines = []
4338 multicrab_config_lines.append(self.config_file_header())
4339 multicrab_config_lines.append("")
4340 multicrab_config_lines.append("[MULTICRAB]")
4341 multicrab_config_lines.append("cfg = crab.cfg")
4342 multicrab_config_lines.append("")
4343
4344 dataset_names = sorted(self.datasets_to_use.keys())
4345
4346 for dataset_name in dataset_names:
4347 runs = self.datasets_to_use[dataset_name]
4348 dataset_name_escaped = self.escape_dataset_name(dataset_name)
4349 castor_prefix = self.castor_prefix
4350
4351 for run in runs:
4352
4353
4354 castor_dir = self.datasets_information[dataset_name] \
4355 ["castor_path"][run]
4356
4357 cmd = "rfdir %s" % castor_dir
4358 (status, output) = subprocess.getstatusoutput(cmd)
4359
4360 if len(output) <= 0:
4361
4362
4363
4364
4365
4366 assert (len(self.datasets_information[dataset_name] \
4367 ["sites"][run]) == 1) or \
4368 self.datasets_information[dataset_name]["mirrored"]
4369
4370
4371 site_names = self.datasets_information[dataset_name] \
4372 ["sites"][run].keys()
4373
4374 for i in range(1, number_max_sites, 1):
4375 if len(site_names) > 0:
4376 index = "site_%02d" % (i)
4377
4378 config_file_name = self. \
4379 create_config_file_name(dataset_name, run)
4380 output_file_name = self. \
4381 create_output_file_name(dataset_name, run)
4382
4383
4384
4385
4386
4387
4388
4389 loop = 0
4390
4391 if len(site_names) > 1:
4392 cmssw_version = self.datasets_information[dataset_name] \
4393 ["cmssw_version"]
4394 self.logger.info("Picking site for mirrored dataset " \
4395 "`%s', run %d" % \
4396 (dataset_name, run))
4397 site_name = self.pick_a_site(site_names, cmssw_version)
4398 if site_name in site_names:
4399 site_names.remove(site_name)
4400
4401 else:
4402 site_name = site_names[0]
4403 site_names.remove(site_name)
4404
4405 if site_name is self.no_matching_site_found_str:
4406 if loop < 1:
4407 break
4408
4409 nevents = self.datasets_information[dataset_name]["num_events"][run]
4410
4411
4412 multicrab_block_name = self.create_multicrab_block_name( \
4413 dataset_name, run, index)
4414 multicrab_config_lines.append("[%s]" % \
4415 multicrab_block_name)
4416
4417
4418
4419 if site_name == "caf.cern.ch":
4420 multicrab_config_lines.append("CRAB.use_server=0")
4421 multicrab_config_lines.append("CRAB.scheduler=caf")
4422 else:
4423 multicrab_config_lines.append("scheduler = glite")
4424
4425
4426
4427 if site_name == "caf.cern.ch":
4428 pass
4429 else:
4430 multicrab_config_lines.append("GRID.se_white_list = %s" % \
4431 site_name)
4432 multicrab_config_lines.append("# This removes the default blacklisting of T1 sites.")
4433 multicrab_config_lines.append("GRID.remove_default_blacklist = 1")
4434 multicrab_config_lines.append("GRID.rb = CERN")
4435 if not self.non_t1access:
4436 multicrab_config_lines.append("GRID.role = t1access")
4437
4438
4439
4440
4441 castor_dir = castor_dir.replace(castor_prefix, "")
4442 multicrab_config_lines.append("USER.storage_element=srm-cms.cern.ch")
4443 multicrab_config_lines.append("USER.user_remote_dir = %s" % \
4444 castor_dir)
4445 multicrab_config_lines.append("USER.check_user_remote_dir=0")
4446
4447 if site_name == "caf.cern.ch":
4448 multicrab_config_lines.append("USER.storage_path=%s" % castor_prefix)
4449
4450
4451
4452
4453 else:
4454 multicrab_config_lines.append("USER.storage_path=/srm/managerv2?SFN=%s" % castor_prefix)
4455
4456
4457
4458
4459
4460
4461 multicrab_config_lines.append("CMSSW.pset = %s" % \
4462 config_file_name)
4463 multicrab_config_lines.append("CMSSW.datasetpath = %s" % \
4464 dataset_name)
4465 multicrab_config_lines.append("CMSSW.runselection = %d" % \
4466 run)
4467
4468 if self.Jsonlumi == True:
4469 pass
4470 else:
4471 if self.harvesting_type == "DQMOffline":
4472 pass
4473 else:
4474 multicrab_config_lines.append("CMSSW.total_number_of_events = %d" % \
4475 nevents)
4476
4477 multicrab_config_lines.append("CMSSW.output_file = %s" % \
4478 output_file_name)
4479
4480
4481
4482 if site_name == "caf.cern.ch":
4483 multicrab_config_lines.append("CAF.queue=cmscaf1nd")
4484
4485
4486
4487 multicrab_config_lines.append("")
4488
4489 loop = loop + 1
4490
4491 self.all_sites_found = True
4492
4493 multicrab_config = "\n".join(multicrab_config_lines)
4494
4495
4496 return multicrab_config
4497
4498
4499
4500 def check_globaltag(self, globaltag=None):
4501 """Check if globaltag exists.
4502
4503 Check if globaltag exists as GlobalTag in the database given
4504 by self.frontier_connection_name['globaltag']. If globaltag is
4505 None, self.globaltag is used instead.
4506
4507 If we're going to use reference histograms this method also
4508 checks for the existence of the required key in the GlobalTag.
4509
4510 """
4511
4512 if globaltag is None:
4513 globaltag = self.globaltag
4514
4515
4516 if globaltag.endswith("::All"):
4517 globaltag = globaltag[:-5]
4518
4519 connect_name = self.frontier_connection_name["globaltag"]
4520
4521
4522
4523
4524
4525
4526
4527 connect_name = connect_name.replace("frontier://",
4528 "frontier://cmsfrontier:8000/")
4529
4530 connect_name += self.db_account_name_cms_cond_globaltag()
4531
4532 tag_exists = self.check_globaltag_exists(globaltag, connect_name)
4533
4534
4535
4536 tag_contains_ref_hist_key = False
4537 if self.use_ref_hists and tag_exists:
4538
4539 tag_contains_ref_hist_key = self.check_globaltag_contains_ref_hist_key(globaltag, connect_name)
4540
4541
4542
4543 if self.use_ref_hists:
4544 ret_val = tag_exists and tag_contains_ref_hist_key
4545 else:
4546 ret_val = tag_exists
4547
4548
4549
4550
4551 return ret_val
4552
4553
4554
4555 def check_globaltag_exists(self, globaltag, connect_name):
4556 """Check if globaltag exists.
4557
4558 """
4559
4560 self.logger.info("Checking existence of GlobalTag `%s'" % \
4561 globaltag)
4562 self.logger.debug(" (Using database connection `%s')" % \
4563 connect_name)
4564
4565 cmd = "cmscond_tagtree_list -c %s -T %s" % \
4566 (connect_name, globaltag)
4567 (status, output) = subprocess.getstatusoutput(cmd)
4568 if status != 0 or \
4569 output.find("error") > -1:
4570 msg = "Could not check existence of GlobalTag `%s' in `%s'" % \
4571 (globaltag, connect_name)
4572 if output.find(".ALL_TABLES not found") > -1:
4573 msg = "%s\n" \
4574 "Missing database account `%s'" % \
4575 (msg, output.split(".ALL_TABLES")[0].split()[-1])
4576 self.logger.fatal(msg)
4577 self.logger.debug("Command used:")
4578 self.logger.debug(" %s" % cmd)
4579 self.logger.debug("Output received:")
4580 self.logger.debug(output)
4581 raise Error(msg)
4582 if output.find("does not exist") > -1:
4583 self.logger.debug("GlobalTag `%s' does not exist in `%s':" % \
4584 (globaltag, connect_name))
4585 self.logger.debug("Output received:")
4586 self.logger.debug(output)
4587 tag_exists = False
4588 else:
4589 tag_exists = True
4590 self.logger.info(" GlobalTag exists? -> %s" % tag_exists)
4591
4592
4593 return tag_exists
4594
4595
4596
4597 def check_globaltag_contains_ref_hist_key(self, globaltag, connect_name):
4598 """Check if globaltag contains the required RefHistos key.
4599
4600 """
4601
4602
4603 tag_contains_key = None
4604 ref_hist_key = "RefHistos"
4605 self.logger.info("Checking existence of reference " \
4606 "histogram key `%s' in GlobalTag `%s'" % \
4607 (ref_hist_key, globaltag))
4608 self.logger.debug(" (Using database connection `%s')" % \
4609 connect_name)
4610 cmd = "cmscond_tagtree_list -c %s -T %s -n %s" % \
4611 (connect_name, globaltag, ref_hist_key)
4612 (status, output) = subprocess.getstatusoutput(cmd)
4613 if status != 0 or \
4614 output.find("error") > -1:
4615 msg = "Could not check existence of key `%s'" % \
4616 (ref_hist_key, connect_name)
4617 self.logger.fatal(msg)
4618 self.logger.debug("Command used:")
4619 self.logger.debug(" %s" % cmd)
4620 self.logger.debug("Output received:")
4621 self.logger.debug(" %s" % output)
4622 raise Error(msg)
4623 if len(output) < 1:
4624 self.logger.debug("Required key for use of reference " \
4625 "histograms `%s' does not exist " \
4626 "in GlobalTag `%s':" % \
4627 (ref_hist_key, globaltag))
4628 self.logger.debug("Output received:")
4629 self.logger.debug(output)
4630 tag_contains_key = False
4631 else:
4632 tag_contains_key = True
4633
4634 self.logger.info(" GlobalTag contains `%s' key? -> %s" % \
4635 (ref_hist_key, tag_contains_key))
4636
4637
4638 return tag_contains_key
4639
4640
4641
4642 def check_ref_hist_tag(self, tag_name):
4643 """Check the existence of tag_name in database connect_name.
4644
4645 Check if tag_name exists as a reference histogram tag in the
4646 database given by self.frontier_connection_name['refhists'].
4647
4648 """
4649
4650 connect_name = self.frontier_connection_name["refhists"]
4651 connect_name += self.db_account_name_cms_cond_dqm_summary()
4652
4653 self.logger.debug("Checking existence of reference " \
4654 "histogram tag `%s'" % \
4655 tag_name)
4656 self.logger.debug(" (Using database connection `%s')" % \
4657 connect_name)
4658
4659 cmd = "cmscond_list_iov -c %s" % \
4660 connect_name
4661 (status, output) = subprocess.getstatusoutput(cmd)
4662 if status != 0:
4663 msg = "Could not check existence of tag `%s' in `%s'" % \
4664 (tag_name, connect_name)
4665 self.logger.fatal(msg)
4666 self.logger.debug("Command used:")
4667 self.logger.debug(" %s" % cmd)
4668 self.logger.debug("Output received:")
4669 self.logger.debug(output)
4670 raise Error(msg)
4671 if not tag_name in output.split():
4672 self.logger.debug("Reference histogram tag `%s' " \
4673 "does not exist in `%s'" % \
4674 (tag_name, connect_name))
4675 self.logger.debug(" Existing tags: `%s'" % \
4676 "', `".join(output.split()))
4677 tag_exists = False
4678 else:
4679 tag_exists = True
4680 self.logger.debug(" Reference histogram tag exists? " \
4681 "-> %s" % tag_exists)
4682
4683
4684 return tag_exists
4685
4686
4687
4688 def create_harvesting_config(self, dataset_name):
4689 """Create the Python harvesting configuration for harvesting.
4690
4691 The basic configuration is created by
4692 Configuration.PyReleaseValidation.ConfigBuilder. (This mimics
4693 what cmsDriver.py does.) After that we add some specials
4694 ourselves.
4695
4696 NOTE: On one hand it may not be nice to circumvent
4697 cmsDriver.py, on the other hand cmsDriver.py does not really
4698 do anything itself. All the real work is done by the
4699 ConfigBuilder so there is not much risk that we miss out on
4700 essential developments of cmsDriver in the future.
4701
4702 """
4703
4704
4705 config_options = defaultOptions
4706
4707
4708
4709
4710 config_options.name = "harvesting"
4711 config_options.scenario = "pp"
4712 config_options.number = 1
4713 config_options.arguments = self.ident_string()
4714 config_options.evt_type = config_options.name
4715 config_options.customisation_file = None
4716 config_options.filein = "dummy_value"
4717 config_options.filetype = "EDM"
4718
4719 config_options.gflash = "dummy_value"
4720
4721
4722
4723 config_options.dbsquery = ""
4724
4725
4726
4727
4728
4729
4730 config_options.step = "HARVESTING:%s" % \
4731 self.harvesting_info[self.harvesting_type] \
4732 ["step_string"]
4733 config_options.beamspot = self.harvesting_info[self.harvesting_type] \
4734 ["beamspot"]
4735 config_options.eventcontent = self.harvesting_info \
4736 [self.harvesting_type] \
4737 ["eventcontent"]
4738 config_options.harvesting = self.harvesting_info \
4739 [self.harvesting_type] \
4740 ["harvesting"]
4741
4742
4743
4744
4745
4746 datatype = self.datasets_information[dataset_name]["datatype"]
4747 config_options.isMC = (datatype.lower() == "mc")
4748 config_options.isData = (datatype.lower() == "data")
4749 globaltag = self.datasets_information[dataset_name]["globaltag"]
4750
4751 config_options.conditions = self.format_conditions_string(globaltag)
4752
4753
4754
4755 if "with_input" in getargspec(ConfigBuilder.__init__)[0]:
4756
4757 config_builder = ConfigBuilder(config_options, with_input=True)
4758 else:
4759
4760 config_builder = ConfigBuilder(config_options)
4761 config_builder.prepare(True)
4762 config_contents = config_builder.pythonCfgCode
4763
4764
4765
4766
4767
4768
4769 marker_lines = []
4770 sep = "#" * 30
4771 marker_lines.append(sep)
4772 marker_lines.append("# Code between these markers was generated by")
4773 marker_lines.append("# Configuration.PyReleaseValidation." \
4774 "ConfigBuilder")
4775
4776 marker_lines.append(sep)
4777 marker = "\n".join(marker_lines)
4778
4779 tmp = [self.config_file_header()]
4780 tmp.append("")
4781 tmp.append(marker)
4782 tmp.append("")
4783 tmp.append(config_contents)
4784 tmp.append("")
4785 tmp.append(marker)
4786 tmp.append("")
4787 config_contents = "\n".join(tmp)
4788
4789
4790
4791
4792 customisations = [""]
4793
4794 customisations.append("# Now follow some customisations")
4795 customisations.append("")
4796 connect_name = self.frontier_connection_name["globaltag"]
4797 connect_name += self.db_account_name_cms_cond_globaltag()
4798 customisations.append("process.GlobalTag.connect = \"%s\"" % \
4799 connect_name)
4800
4801
4802 if self.saveByLumiSection == True:
4803 customisations.append("process.dqmSaver.saveByLumiSection = 1")
4804
4805
4806
4807 customisations.append("")
4808
4809
4810
4811
4812
4813
4814
4815
4816
4817
4818
4819
4820
4821 use_es_prefer = (self.harvesting_type == "RelVal")
4822 use_refs = use_es_prefer or \
4823 (not self.harvesting_type == "MC")
4824
4825 use_refs = use_refs and self.use_ref_hists
4826
4827 if not use_refs:
4828
4829
4830
4831
4832
4833 customisations.append("print \"Not using reference histograms\"")
4834 customisations.append("if hasattr(process, \"dqmRefHistoRootFileGetter\"):")
4835 customisations.append(" for (sequence_name, sequence) in process.sequences.items():")
4836 customisations.append(" if sequence.remove(process.dqmRefHistoRootFileGetter):")
4837 customisations.append(" print \"Removed process.dqmRefHistoRootFileGetter from sequence `%s'\" % \\")
4838 customisations.append(" sequence_name")
4839 customisations.append("process.dqmSaver.referenceHandling = \"skip\"")
4840 else:
4841
4842
4843 customisations.append("process.dqmSaver.referenceHandling = \"all\"")
4844 if use_es_prefer:
4845 es_prefer_snippet = self.create_es_prefer_snippet(dataset_name)
4846 customisations.append(es_prefer_snippet)
4847
4848
4849
4850 workflow_name = dataset_name
4851 if self.harvesting_mode == "single-step-allow-partial":
4852 workflow_name += "_partial"
4853 customisations.append("process.dqmSaver.workflow = \"%s\"" % \
4854 workflow_name)
4855
4856
4857
4858
4859
4860
4861
4862
4863
4864
4865
4866
4867
4868
4869
4870
4871
4872
4873
4874
4875
4876
4877
4878
4879
4880
4881
4882
4883
4884
4885
4886
4887
4888
4889
4890 config_contents = config_contents + "\n".join(customisations)
4891
4892
4893
4894
4895 return config_contents
4896
4897
4898
4899
4900
4901
4902
4903
4904
4905
4906
4907
4908
4909
4910
4911
4912
4913
4914 def create_me_extraction_config(self, dataset_name):
4915 """
4916
4917 """
4918
4919
4920
4921 tmp = []
4922 tmp.append(self.config_file_header())
4923 tmp.append("")
4924 tmp.append("import FWCore.ParameterSet.Config as cms")
4925 tmp.append("")
4926 tmp.append("process = cms.Process(\"ME2EDM\")")
4927 tmp.append("")
4928 tmp.append("# Import of standard configurations")
4929 tmp.append("process.load(\"Configuration/EventContent/EventContent_cff\")")
4930 tmp.append("")
4931 tmp.append("# We don't really process any events, just keep this set to one to")
4932 tmp.append("# make sure things work.")
4933 tmp.append("process.maxEvents = cms.untracked.PSet(")
4934 tmp.append(" input = cms.untracked.int32(1)")
4935 tmp.append(" )")
4936 tmp.append("")
4937 tmp.append("process.options = cms.untracked.PSet(")
4938 tmp.append(" Rethrow = cms.untracked.vstring(\"ProductNotFound\")")
4939 tmp.append(" )")
4940 tmp.append("")
4941 tmp.append("process.source = cms.Source(\"PoolSource\",")
4942 tmp.append(" processingMode = \\")
4943 tmp.append(" cms.untracked.string(\"RunsAndLumis\"),")
4944 tmp.append(" fileNames = \\")
4945 tmp.append(" cms.untracked.vstring(\"no_file_specified\")")
4946 tmp.append(" )")
4947 tmp.append("")
4948 tmp.append("# Output definition: drop everything except for the monitoring.")
4949 tmp.append("process.output = cms.OutputModule(")
4950 tmp.append(" \"PoolOutputModule\",")
4951 tmp.append(" outputCommands = \\")
4952 tmp.append(" cms.untracked.vstring(\"drop *\", \\")
4953 tmp.append(" \"keep *_MEtoEDMConverter_*_*\"),")
4954 output_file_name = self. \
4955 create_output_file_name(dataset_name)
4956 tmp.append(" fileName = \\")
4957 tmp.append(" cms.untracked.string(\"%s\")," % output_file_name)
4958 tmp.append(" dataset = cms.untracked.PSet(")
4959 tmp.append(" dataTier = cms.untracked.string(\"RECO\"),")
4960 tmp.append(" filterName = cms.untracked.string(\"\")")
4961 tmp.append(" )")
4962 tmp.append(" )")
4963 tmp.append("")
4964 tmp.append("# Additional output definition")
4965 tmp.append("process.out_step = cms.EndPath(process.output)")
4966 tmp.append("")
4967 tmp.append("# Schedule definition")
4968 tmp.append("process.schedule = cms.Schedule(process.out_step)")
4969 tmp.append("")
4970
4971 config_contents = "\n".join(tmp)
4972
4973
4974 return config_contents
4975
4976
4977
4978
4979
4980
4981
4982
4983
4984
4985
4986
4987
4988
4989
4990
4991
4992
4993
4994
4995
4996
4997
4998
4999
5000
5001
5002
5003
5004
5005
5006
5007
5008
5009
5010
5011 def write_crab_config(self):
5012 """Write a CRAB job configuration Python file.
5013
5014 """
5015
5016 self.logger.info("Writing CRAB configuration...")
5017
5018 file_name_base = "crab.cfg"
5019
5020
5021 crab_contents = self.create_crab_config()
5022
5023
5024 crab_file_name = file_name_base
5025 try:
5026 crab_file = file(crab_file_name, "w")
5027 crab_file.write(crab_contents)
5028 crab_file.close()
5029 except IOError:
5030 self.logger.fatal("Could not write " \
5031 "CRAB configuration to file `%s'" % \
5032 crab_file_name)
5033 raise Error("ERROR: Could not write to file `%s'!" % \
5034 crab_file_name)
5035
5036
5037
5038
5039
5040 def write_multicrab_config(self):
5041 """Write a multi-CRAB job configuration Python file.
5042
5043 """
5044
5045 self.logger.info("Writing multi-CRAB configuration...")
5046
5047 file_name_base = "multicrab.cfg"
5048
5049
5050 multicrab_contents = self.create_multicrab_config()
5051
5052
5053 multicrab_file_name = file_name_base
5054 try:
5055 multicrab_file = file(multicrab_file_name, "w")
5056 multicrab_file.write(multicrab_contents)
5057 multicrab_file.close()
5058 except IOError:
5059 self.logger.fatal("Could not write " \
5060 "multi-CRAB configuration to file `%s'" % \
5061 multicrab_file_name)
5062 raise Error("ERROR: Could not write to file `%s'!" % \
5063 multicrab_file_name)
5064
5065
5066
5067
5068
5069 def write_harvesting_config(self, dataset_name):
5070 """Write a harvesting job configuration Python file.
5071
5072 NOTE: This knows nothing about single-step or two-step
5073 harvesting. That's all taken care of by
5074 create_harvesting_config.
5075
5076 """
5077
5078 self.logger.debug("Writing harvesting configuration for `%s'..." % \
5079 dataset_name)
5080
5081
5082 config_contents = self.create_harvesting_config(dataset_name)
5083
5084
5085 config_file_name = self. \
5086 create_harvesting_config_file_name(dataset_name)
5087 try:
5088 config_file = file(config_file_name, "w")
5089 config_file.write(config_contents)
5090 config_file.close()
5091 except IOError:
5092 self.logger.fatal("Could not write " \
5093 "harvesting configuration to file `%s'" % \
5094 config_file_name)
5095 raise Error("ERROR: Could not write to file `%s'!" % \
5096 config_file_name)
5097
5098
5099
5100
5101
5102 def write_me_extraction_config(self, dataset_name):
5103 """Write an ME-extraction configuration Python file.
5104
5105 This `ME-extraction' (ME = Monitoring Element) is the first
5106 step of the two-step harvesting.
5107
5108 """
5109
5110 self.logger.debug("Writing ME-extraction configuration for `%s'..." % \
5111 dataset_name)
5112
5113
5114 config_contents = self.create_me_extraction_config(dataset_name)
5115
5116
5117 config_file_name = self. \
5118 create_me_summary_config_file_name(dataset_name)
5119 try:
5120 config_file = file(config_file_name, "w")
5121 config_file.write(config_contents)
5122 config_file.close()
5123 except IOError:
5124 self.logger.fatal("Could not write " \
5125 "ME-extraction configuration to file `%s'" % \
5126 config_file_name)
5127 raise Error("ERROR: Could not write to file `%s'!" % \
5128 config_file_name)
5129
5130
5131
5132
5133
5134
5135 def ref_hist_mappings_needed(self, dataset_name=None):
5136 """Check if we need to load and check the reference mappings.
5137
5138 For data the reference histograms should be taken
5139 automatically from the GlobalTag, so we don't need any
5140 mappings. For RelVals we need to know a mapping to be used in
5141 the es_prefer code snippet (different references for each of
5142 the datasets.)
5143
5144 WARNING: This implementation is a bit convoluted.
5145
5146 """
5147
5148
5149
5150 if not dataset_name is None:
5151 data_type = self.datasets_information[dataset_name] \
5152 ["datatype"]
5153 mappings_needed = (data_type == "mc")
5154
5155 if not mappings_needed:
5156 assert data_type == "data"
5157
5158 else:
5159 tmp = [self.ref_hist_mappings_needed(dataset_name) \
5160 for dataset_name in \
5161 self.datasets_information.keys()]
5162 mappings_needed = (True in tmp)
5163
5164
5165 return mappings_needed
5166
5167
5168
5169 def load_ref_hist_mappings(self):
5170 """Load the reference histogram mappings from file.
5171
5172 The dataset name to reference histogram name mappings are read
5173 from a text file specified in self.ref_hist_mappings_file_name.
5174
5175 """
5176
5177
5178 assert len(self.ref_hist_mappings) < 1, \
5179 "ERROR Should not be RE-loading " \
5180 "reference histogram mappings!"
5181
5182
5183 self.logger.info("Loading reference histogram mappings " \
5184 "from file `%s'" % \
5185 self.ref_hist_mappings_file_name)
5186
5187 mappings_lines = None
5188 try:
5189 mappings_file = file(self.ref_hist_mappings_file_name, "r")
5190 mappings_lines = mappings_file.readlines()
5191 mappings_file.close()
5192 except IOError:
5193 msg = "ERROR: Could not open reference histogram mapping "\
5194 "file `%s'" % self.ref_hist_mappings_file_name
5195 self.logger.fatal(msg)
5196 raise Error(msg)
5197
5198
5199
5200
5201
5202
5203
5204
5205 for mapping in mappings_lines:
5206
5207 if not mapping.startswith("#"):
5208 mapping = mapping.strip()
5209 if len(mapping) > 0:
5210 mapping_pieces = mapping.split()
5211 if len(mapping_pieces) != 2:
5212 msg = "ERROR: The reference histogram mapping " \
5213 "file contains a line I don't " \
5214 "understand:\n %s" % mapping
5215 self.logger.fatal(msg)
5216 raise Error(msg)
5217 dataset_name = mapping_pieces[0].strip()
5218 ref_hist_name = mapping_pieces[1].strip()
5219
5220
5221
5222 if dataset_name in self.ref_hist_mappings:
5223 msg = "ERROR: The reference histogram mapping " \
5224 "file contains multiple mappings for " \
5225 "dataset `%s'."
5226 self.logger.fatal(msg)
5227 raise Error(msg)
5228
5229
5230 self.ref_hist_mappings[dataset_name] = ref_hist_name
5231
5232
5233
5234 self.logger.info(" Successfully loaded %d mapping(s)" % \
5235 len(self.ref_hist_mappings))
5236 max_len = max([len(i) for i in self.ref_hist_mappings.keys()])
5237 for (map_from, map_to) in self.ref_hist_mappings.items():
5238 self.logger.info(" %-*s -> %s" % \
5239 (max_len, map_from, map_to))
5240
5241
5242
5243
5244
5245 def check_ref_hist_mappings(self):
5246 """Make sure all necessary reference histograms exist.
5247
5248 Check that for each of the datasets to be processed a
5249 reference histogram is specified and that that histogram
5250 exists in the database.
5251
5252 NOTE: There's a little complication here. Since this whole
5253 thing was designed to allow (in principle) harvesting of both
5254 data and MC datasets in one go, we need to be careful to check
5255 the availability fof reference mappings only for those
5256 datasets that need it.
5257
5258 """
5259
5260 self.logger.info("Checking reference histogram mappings")
5261
5262 for dataset_name in self.datasets_to_use:
5263 try:
5264 ref_hist_name = self.ref_hist_mappings[dataset_name]
5265 except KeyError:
5266 msg = "ERROR: No reference histogram mapping found " \
5267 "for dataset `%s'" % \
5268 dataset_name
5269 self.logger.fatal(msg)
5270 raise Error(msg)
5271
5272 if not self.check_ref_hist_tag(ref_hist_name):
5273 msg = "Reference histogram tag `%s' " \
5274 "(used for dataset `%s') does not exist!" % \
5275 (ref_hist_name, dataset_name)
5276 self.logger.fatal(msg)
5277 raise Usage(msg)
5278
5279 self.logger.info(" Done checking reference histogram mappings.")
5280
5281
5282
5283
5284
5285 def build_datasets_information(self):
5286 """Obtain all information on the datasets that we need to run.
5287
5288 Use DBS to figure out all required information on our
5289 datasets, like the run numbers and the GlobalTag. All
5290 information is stored in the datasets_information member
5291 variable.
5292
5293 """
5294
5295
5296
5297
5298
5299
5300
5301
5302
5303
5304
5305 self.datasets_information = {}
5306 self.logger.info("Collecting information for all datasets to process")
5307 dataset_names = sorted(self.datasets_to_use.keys())
5308 for dataset_name in dataset_names:
5309
5310
5311 sep_line = "-" * 30
5312 self.logger.info(sep_line)
5313 self.logger.info(" `%s'" % dataset_name)
5314 self.logger.info(sep_line)
5315
5316 runs = self.dbs_resolve_runs(dataset_name)
5317 self.logger.info(" found %d run(s)" % len(runs))
5318 if len(runs) > 0:
5319 self.logger.debug(" run number(s): %s" % \
5320 ", ".join([str(i) for i in runs]))
5321 else:
5322
5323
5324 self.logger.warning(" --> skipping dataset "
5325 "without any runs")
5326 assert False, "Panic: found a dataset without runs " \
5327 "after DBS checks!"
5328
5329
5330 cmssw_version = self.dbs_resolve_cmssw_version(dataset_name)
5331 self.logger.info(" found CMSSW version `%s'" % cmssw_version)
5332
5333
5334 datatype = self.dbs_resolve_datatype(dataset_name)
5335 self.logger.info(" sample is data or MC? --> %s" % \
5336 datatype)
5337
5338
5339
5340
5341 if self.globaltag is None:
5342 globaltag = self.dbs_resolve_globaltag(dataset_name)
5343 else:
5344 globaltag = self.globaltag
5345
5346 self.logger.info(" found GlobalTag `%s'" % globaltag)
5347
5348
5349 if globaltag == "":
5350
5351
5352 assert datatype == "data", \
5353 "ERROR Empty GlobalTag for MC dataset!!!"
5354
5355
5356
5357
5358
5359
5360
5361 sites_catalog = self.dbs_check_dataset_spread(dataset_name)
5362
5363
5364 num_events = {}
5365 for run_number in sites_catalog.keys():
5366 num_events[run_number] = sites_catalog \
5367 [run_number]["all_sites"]
5368 del sites_catalog[run_number]["all_sites"]
5369
5370
5371
5372 mirror_catalog = {}
5373 for run_number in sites_catalog.keys():
5374 mirror_catalog[run_number] = sites_catalog \
5375 [run_number]["mirrored"]
5376 del sites_catalog[run_number]["mirrored"]
5377
5378
5379
5380
5381
5382
5383
5384
5385
5386
5387
5388
5389
5390
5391
5392
5393
5394
5395
5396
5397
5398
5399
5400
5401
5402
5403
5404
5405 self.datasets_information[dataset_name] = {}
5406 self.datasets_information[dataset_name]["runs"] = runs
5407 self.datasets_information[dataset_name]["cmssw_version"] = \
5408 cmssw_version
5409 self.datasets_information[dataset_name]["globaltag"] = globaltag
5410 self.datasets_information[dataset_name]["datatype"] = datatype
5411 self.datasets_information[dataset_name]["num_events"] = num_events
5412 self.datasets_information[dataset_name]["mirrored"] = mirror_catalog
5413 self.datasets_information[dataset_name]["sites"] = sites_catalog
5414
5415
5416
5417 castor_path_common = self.create_castor_path_name_common(dataset_name)
5418 self.logger.info(" output will go into `%s'" % \
5419 castor_path_common)
5420
5421 castor_paths = dict(list(zip(runs,
5422 [self.create_castor_path_name_special(dataset_name, i, castor_path_common) \
5423 for i in runs])))
5424 for path_name in castor_paths.values():
5425 self.logger.debug(" %s" % path_name)
5426 self.datasets_information[dataset_name]["castor_path"] = \
5427 castor_paths
5428
5429
5430
5431
5432
5433 def show_exit_message(self):
5434 """Tell the user what to do now, after this part is done.
5435
5436 This should provide the user with some (preferably
5437 copy-pasteable) instructions on what to do now with the setups
5438 and files that have been created.
5439
5440 """
5441
5442
5443
5444
5445
5446 sep_line = "-" * 60
5447
5448 self.logger.info("")
5449 self.logger.info(sep_line)
5450 self.logger.info(" Configuration files have been created.")
5451 self.logger.info(" From here on please follow the usual CRAB instructions.")
5452 self.logger.info(" Quick copy-paste instructions are shown below.")
5453 self.logger.info(sep_line)
5454
5455 self.logger.info("")
5456 self.logger.info(" Create all CRAB jobs:")
5457 self.logger.info(" multicrab -create")
5458 self.logger.info("")
5459 self.logger.info(" Submit all CRAB jobs:")
5460 self.logger.info(" multicrab -submit")
5461 self.logger.info("")
5462 self.logger.info(" Check CRAB status:")
5463 self.logger.info(" multicrab -status")
5464 self.logger.info("")
5465
5466 self.logger.info("")
5467 self.logger.info(" For more information please see the CMS Twiki:")
5468 self.logger.info(" %s" % twiki_url)
5469 self.logger.info(sep_line)
5470
5471
5472
5473 if not self.all_sites_found:
5474 self.logger.warning(" For some of the jobs no matching " \
5475 "site could be found")
5476 self.logger.warning(" --> please scan your multicrab.cfg" \
5477 "for occurrences of `%s'." % \
5478 self.no_matching_site_found_str)
5479 self.logger.warning(" You will have to fix those " \
5480 "by hand, sorry.")
5481
5482
5483
5484
5485
5486 def run(self):
5487 "Main entry point of the CMS harvester."
5488
5489
5490 exit_code = 0
5491
5492 try:
5493
5494 try:
5495
5496
5497 self.parse_cmd_line_options()
5498
5499 self.check_input_status()
5500
5501
5502 self.check_cmssw()
5503
5504
5505 self.check_dbs()
5506
5507 self.setup_dbs()
5508
5509
5510
5511
5512 self.setup_harvesting_info()
5513
5514
5515 self.build_dataset_use_list()
5516
5517 self.build_dataset_ignore_list()
5518
5519
5520 self.build_runs_use_list()
5521 self.build_runs_ignore_list()
5522
5523
5524
5525
5526
5527
5528 self.process_dataset_ignore_list()
5529
5530
5531
5532 self.build_datasets_information()
5533
5534 if self.use_ref_hists and \
5535 self.ref_hist_mappings_needed():
5536
5537
5538 self.load_ref_hist_mappings()
5539
5540
5541
5542 self.check_ref_hist_mappings()
5543 else:
5544 self.logger.info("No need to load reference " \
5545 "histogram mappings file")
5546
5547
5548
5549
5550
5551
5552
5553
5554
5555
5556
5557
5558
5559
5560 self.process_runs_use_and_ignore_lists()
5561
5562
5563
5564
5565 if self.harvesting_mode == "single-step-allow-partial":
5566 self.singlify_datasets()
5567
5568
5569 self.check_dataset_list()
5570
5571 if len(self.datasets_to_use) < 1:
5572 self.logger.info("After all checks etc. " \
5573 "there are no datasets (left?) " \
5574 "to process")
5575 else:
5576
5577 self.logger.info("After all checks etc. we are left " \
5578 "with %d dataset(s) to process " \
5579 "for a total of %d runs" % \
5580 (len(self.datasets_to_use),
5581 sum([len(i) for i in \
5582 self.datasets_to_use.values()])))
5583
5584
5585
5586
5587
5588
5589
5590
5591
5592
5593
5594
5595
5596
5597
5598
5599
5600
5601
5602
5603
5604
5605
5606
5607
5608
5609 self.create_and_check_castor_dirs()
5610
5611
5612
5613 self.write_crab_config()
5614 self.write_multicrab_config()
5615
5616
5617
5618
5619
5620
5621
5622
5623
5624 for dataset_name in self.datasets_to_use.keys():
5625 try:
5626 self.write_harvesting_config(dataset_name)
5627 if self.harvesting_mode == "two-step":
5628 self.write_me_extraction_config(dataset_name)
5629 except:
5630
5631 raise
5632 else:
5633
5634
5635 tmp = {}
5636 for run_number in self.datasets_to_use[dataset_name]:
5637 tmp[run_number] = self.datasets_information \
5638 [dataset_name]["num_events"][run_number]
5639 if dataset_name in self.book_keeping_information:
5640 self.book_keeping_information[dataset_name].update(tmp)
5641 else:
5642 self.book_keeping_information[dataset_name] = tmp
5643
5644
5645 self.show_exit_message()
5646
5647 except Usage as err:
5648
5649
5650 pass
5651
5652 except Error as err:
5653
5654 exit_code = 1
5655
5656 except Exception as err:
5657
5658
5659
5660
5661
5662
5663
5664
5665 if isinstance(err, SystemExit):
5666 self.logger.fatal(err.code)
5667 elif not isinstance(err, KeyboardInterrupt):
5668 self.logger.fatal("!" * 50)
5669 self.logger.fatal(" This looks like a serious problem.")
5670 self.logger.fatal(" If you are sure you followed all " \
5671 "instructions")
5672 self.logger.fatal(" please copy the below stack trace together")
5673 self.logger.fatal(" with a description of what you were doing to")
5674 self.logger.fatal(" jeroen.hegeman@cern.ch.")
5675 self.logger.fatal(" %s" % self.ident_string())
5676 self.logger.fatal("!" * 50)
5677 self.logger.fatal(str(err))
5678 import traceback
5679 traceback_string = traceback.format_exc()
5680 for line in traceback_string.split("\n"):
5681 self.logger.fatal(line)
5682 self.logger.fatal("!" * 50)
5683 exit_code = 2
5684
5685
5686
5687
5688
5689
5690
5691 finally:
5692
5693 self.cleanup()
5694
5695
5696
5697 if self.crab_submission == True:
5698 os.system("multicrab -create")
5699 os.system("multicrab -submit")
5700
5701
5702 return exit_code
5703
5704
5705
5706
5707
5708
5709
5710 if __name__ == "__main__":
5711 "Main entry point for harvesting."
5712
5713 CMSHarvester().run()
5714
5715
5716
5717