File indexing completed on 2023-03-17 10:38:47
0001 from __future__ import print_function
0002 from __future__ import absolute_import
0003
0004 import abc
0005 import csv
0006 import os
0007 import re
0008
0009 import Utilities.General.cmssw_das_client as das_client
0010
0011 from .utilities import cache
0012
0013 class DatasetError(Exception): pass
0014
0015 defaultdasinstance = "prod/global"
0016
0017 class RunRange(object):
0018 def __init__(self, firstrun, lastrun, runs):
0019 self.firstrun = firstrun
0020 self.lastrun = lastrun
0021 self.runs = runs
0022
0023 def __contains__(self, run):
0024 if self.runs and run not in self.runs: return False
0025 return self.firstrun <= run <= self.lastrun
0026
0027 def dasquery(dasQuery, dasLimit=0):
0028 dasData = das_client.get_data(dasQuery, dasLimit)
0029 if isinstance(dasData, str):
0030 jsondict = json.loads( dasData )
0031 else:
0032 jsondict = dasData
0033
0034 try:
0035 error = findinjson(jsondict, "data","error")
0036 except KeyError:
0037 error = None
0038 if error or findinjson(jsondict, "status") != 'ok' or "data" not in jsondict:
0039 try:
0040 jsonstr = findinjson(jsondict, "reason")
0041 except KeyError:
0042 jsonstr = str(jsondict)
0043 if len(jsonstr) > 10000:
0044 jsonfile = "das_query_output_%i.txt"
0045 i = 0
0046 while os.path.lexists(jsonfile % i):
0047 i += 1
0048 jsonfile = jsonfile % i
0049 theFile = open( jsonfile, "w" )
0050 theFile.write( jsonstr )
0051 theFile.close()
0052 msg = "The DAS query returned an error. The output is very long, and has been stored in:\n" + jsonfile
0053 else:
0054 msg = "The DAS query returned a error. Here is the output\n" + jsonstr
0055 msg += "\nIt's possible that this was a server error. If so, it may work if you try again later"
0056 raise DatasetError(msg)
0057 return findinjson(jsondict, "data")
0058
0059 def getrunnumbersfromfile(filename, trydas=True, allowunknown=False, dasinstance=defaultdasinstance):
0060 parts = filename.split("/")
0061 error = None
0062 if parts[0] != "" or parts[1] != "store":
0063 error = "does not start with /store"
0064 elif parts[2] in ["mc", "relval"]:
0065 return [1]
0066 elif not parts[-1].endswith(".root"):
0067 error = "does not end with something.root"
0068 elif len(parts) != 12:
0069 error = "should be exactly 11 slashes counting the first one"
0070 else:
0071 runnumberparts = parts[-5:-2]
0072 if not all(len(part)==3 for part in runnumberparts):
0073 error = "the 3 directories {} do not have length 3 each".format("/".join(runnumberparts))
0074 try:
0075 return [int("".join(runnumberparts))]
0076 except ValueError:
0077 error = "the 3 directories {} do not form an integer".format("/".join(runnumberparts))
0078
0079 if error and trydas:
0080 try:
0081 query = "run file={} instance={}".format(filename, dasinstance)
0082 dasoutput = dasquery(query)
0083 result = findinjson(dasoutput, "run")
0084 return sum((findinjson(run, "run_number") for run in result), [])
0085 except Exception as e:
0086 error = str(e)
0087
0088 if error and allowunknown:
0089 return [-1]
0090
0091 if error:
0092 error = "could not figure out which run number this file is from.\nMaybe try with allowunknown=True?\n {}\n{}".format(filename, error)
0093 raise DatasetError(error)
0094
0095 def findinjson(jsondict, *strings):
0096 if len(strings) == 0:
0097 return jsondict
0098 if isinstance(jsondict,dict):
0099 if strings[0] in jsondict:
0100 try:
0101 return findinjson(jsondict[strings[0]], *strings[1:])
0102 except KeyError:
0103 pass
0104 else:
0105 for a in jsondict:
0106 if strings[0] in a:
0107 try:
0108 return findinjson(a[strings[0]], *strings[1:])
0109 except (TypeError, KeyError):
0110 pass
0111
0112 raise KeyError("Can't find " + strings[0])
0113
0114 class DataFile(object):
0115 def __init__(self, filename, nevents, runs=None, trydas=True, allowunknown=False, dasinstance=defaultdasinstance):
0116 self.filename = filename
0117 self.nevents = int(nevents)
0118 if runs is None:
0119 runs = getrunnumbersfromfile(filename, trydas=trydas, allowunknown=allowunknown, dasinstance=dasinstance)
0120 if isinstance(runs, str):
0121 runs = runs.split()
0122 self.runs = [int(_) for _ in runs]
0123
0124 def getdict(self):
0125 return {"filename": self.filename, "nevents": str(self.nevents), "runs": " ".join(str(_) for _ in self.runs)}
0126
0127 class DatasetBase(object):
0128 __metaclass__ = abc.ABCMeta
0129
0130 @abc.abstractmethod
0131 def getfiles(self, usecache):
0132 pass
0133
0134 @abc.abstractproperty
0135 def headercomment(self):
0136 pass
0137
0138 def writefilelist_validation(self, firstrun, lastrun, runs, maxevents, outputfile=None, usecache=True):
0139 runrange = RunRange(firstrun=firstrun, lastrun=lastrun, runs=runs)
0140
0141 if outputfile is None:
0142 outputfile = os.path.join(os.environ["CMSSW_BASE"], "src", "Alignment", "OfflineValidation", "python", self.filenamebase+"_cff.py")
0143
0144 if maxevents < 0: maxevents = float("inf")
0145 totalevents = sum(datafile.nevents for datafile in self.getfiles(usecache) if all(run in runrange for run in datafile.runs))
0146 if totalevents == 0:
0147 raise ValueError("No events within the run range!")
0148 accepted = rejected = 0.
0149
0150 fractiontoaccept = 1.*maxevents / totalevents
0151
0152 with open(outputfile, "w") as f:
0153 f.write("#"+self.headercomment+"\n")
0154 f.write(validationheader)
0155 for datafile in self.getfiles(usecache):
0156 if all(run in runrange for run in datafile.runs):
0157 if accepted == 0 or accepted / (accepted+rejected) <= fractiontoaccept:
0158 f.write('"' + datafile.filename + '",\n')
0159 accepted += datafile.nevents
0160 else:
0161 rejected += datafile.nevents
0162 elif any(run in runrange for run in datafile.runs):
0163 raise DatasetError("file {} has multiple runs {}, which straddle firstrun or lastrun".format(datafile.filename, datafile.runs))
0164 f.write("#total events in these files: {}".format(accepted))
0165 f.write(validationfooter)
0166
0167 def writefilelist_hippy(self, firstrun, lastrun, runs, eventsperjob, maxevents, outputfile, usecache=True):
0168 runrange = RunRange(firstrun=firstrun, lastrun=lastrun, runs=runs)
0169 if maxevents < 0: maxevents = float("inf")
0170 totalevents = sum(datafile.nevents for datafile in self.getfiles(usecache) if all(run in runrange for run in datafile.runs))
0171 if totalevents == 0:
0172 raise ValueError("No events within the run range!")
0173 accepted = rejected = inthisjob = 0.
0174
0175 fractiontoaccept = 1.*maxevents / totalevents
0176 writecomma = False
0177
0178 with open(outputfile, "w") as f:
0179 for datafile in self.getfiles(usecache):
0180 if all(run in runrange for run in datafile.runs):
0181 if accepted == 0 or accepted / (accepted+rejected) <= fractiontoaccept:
0182 if writecomma: f.write(",")
0183 f.write("'" + datafile.filename + "'")
0184 accepted += datafile.nevents
0185 inthisjob += datafile.nevents
0186 if inthisjob >= eventsperjob:
0187 f.write("\n")
0188 inthisjob = 0
0189 writecomma = False
0190 else:
0191 writecomma = True
0192 else:
0193 rejected += datafile.nevents
0194 elif any(run in runrange for run in datafile.runs):
0195 raise DatasetError("file {} has multiple runs {}, which straddle firstrun or lastrun".format(datafile.filename, datafile.runs))
0196 f.write("\n")
0197
0198 class Dataset(DatasetBase):
0199 def __init__(self, datasetname, dasinstance=defaultdasinstance):
0200 self.datasetname = datasetname
0201 if re.match(r'/.+/.+/.+', datasetname):
0202 self.official = True
0203 self.filenamebase = "Dataset" + self.datasetname.replace("/","_")
0204 else:
0205 self.official = False
0206 self.filenamebase = datasetname
0207
0208 self.dasinstance = dasinstance
0209
0210 @cache
0211 def getfiles(self, usecache):
0212 filename = os.path.join(os.environ["CMSSW_BASE"], "src", "Alignment", "CommonAlignment", "data", self.filenamebase+".csv")
0213 if not usecache:
0214 try:
0215 os.remove(filename)
0216 except OSError as e:
0217 if os.path.exists(filename):
0218 raise
0219
0220 result = []
0221 try:
0222 with open(filename) as f:
0223 for row in csv.DictReader(f):
0224 result.append(DataFile(**row))
0225 return result
0226 except IOError:
0227 pass
0228
0229 query = "file dataset={} instance={} detail=true | grep file.name, file.nevents".format(self.datasetname, self.dasinstance)
0230 dasoutput = dasquery(query)
0231 if not dasoutput:
0232 raise DatasetError("No files are available for the dataset '{}'. This can be "
0233 "due to a typo or due to a DAS problem. Please check the "
0234 "spelling of the dataset and/or try again.".format(datasetname))
0235 result = [DataFile(findinjson(_, "file", "name"), findinjson(_, "file", "nevents")) for _ in dasoutput if int(findinjson(_, "file", "nevents"))]
0236 try:
0237 with open(filename, "w") as f:
0238 writer = csv.DictWriter(f, ("filename", "nevents", "runs"))
0239 writer.writeheader()
0240 for datafile in result:
0241 writer.writerow(datafile.getdict())
0242 except Exception as e:
0243 print("Couldn't write the dataset csv file:\n\n{}".format(e))
0244 return result
0245
0246 @property
0247 def headercomment(self):
0248 return self.datasetname
0249
0250 class MultipleDatasets(DatasetBase):
0251 def __init__(self, *datasets, **kwargs):
0252 dasinstance = defaultdasinstance
0253 for kw, kwarg in kwargs.iteritems():
0254 if kw == "dasinstance":
0255 dasinstance = kwarg
0256 else:
0257 raise TypeError("Unknown kwarg {}={}".format(kw, kwarg))
0258 self.datasets = [Dataset(dataset, dasinstance=dasinstance) for dataset in datasets]
0259
0260 @cache
0261 def getfiles(self, usecache):
0262 return sum([d.getfiles(usecache=usecache) for d in self.datasets], [])
0263
0264 @property
0265 def headercomment(self):
0266 return ", ".join(d.headercomment for d in self.datasets)
0267
0268 validationheader = """
0269 import FWCore.ParameterSet.Config as cms
0270
0271 maxEvents = cms.untracked.PSet( input = cms.untracked.int32(-1) )
0272 readFiles = cms.untracked.vstring()
0273 secFiles = cms.untracked.vstring()
0274 source = cms.Source ("PoolSource",fileNames = readFiles, secondaryFileNames = secFiles)
0275 readFiles.extend( [
0276 """
0277
0278 validationfooter = """
0279 ] )
0280 """