File indexing completed on 2024-11-25 02:29:02
0001
0002 import abc
0003 import csv
0004 import os
0005 import re
0006
0007 import Utilities.General.cmssw_das_client as das_client
0008
0009 from .utilities import cache
0010
0011 class DatasetError(Exception): pass
0012
0013 defaultdasinstance = "prod/global"
0014
0015 class RunRange(object):
0016 def __init__(self, firstrun, lastrun, runs):
0017 self.firstrun = firstrun
0018 self.lastrun = lastrun
0019 self.runs = runs
0020
0021 def __contains__(self, run):
0022 if self.runs and run not in self.runs: return False
0023 return self.firstrun <= run <= self.lastrun
0024
0025 def dasquery(dasQuery, dasLimit=0):
0026 dasData = das_client.get_data(dasQuery, dasLimit)
0027 if isinstance(dasData, str):
0028 jsondict = json.loads( dasData )
0029 else:
0030 jsondict = dasData
0031
0032 try:
0033 error = findinjson(jsondict, "data","error")
0034 except KeyError:
0035 error = None
0036 if error or findinjson(jsondict, "status") != 'ok' or "data" not in jsondict:
0037 try:
0038 jsonstr = findinjson(jsondict, "reason")
0039 except KeyError:
0040 jsonstr = str(jsondict)
0041 if len(jsonstr) > 10000:
0042 jsonfile = "das_query_output_%i.txt"
0043 i = 0
0044 while os.path.lexists(jsonfile % i):
0045 i += 1
0046 jsonfile = jsonfile % i
0047 theFile = open( jsonfile, "w" )
0048 theFile.write( jsonstr )
0049 theFile.close()
0050 msg = "The DAS query returned an error. The output is very long, and has been stored in:\n" + jsonfile
0051 else:
0052 msg = "The DAS query returned a error. Here is the output\n" + jsonstr
0053 msg += "\nIt's possible that this was a server error. If so, it may work if you try again later"
0054 raise DatasetError(msg)
0055 return findinjson(jsondict, "data")
0056
0057 def getrunnumbersfromfile(filename, trydas=True, allowunknown=False, dasinstance=defaultdasinstance):
0058 parts = filename.split("/")
0059 error = None
0060 if parts[0] != "" or parts[1] != "store":
0061 error = "does not start with /store"
0062 elif parts[2] in ["mc", "relval"]:
0063 return [1]
0064 elif not parts[-1].endswith(".root"):
0065 error = "does not end with something.root"
0066 elif len(parts) != 12:
0067 error = "should be exactly 11 slashes counting the first one"
0068 else:
0069 runnumberparts = parts[-5:-2]
0070 if not all(len(part)==3 for part in runnumberparts):
0071 error = "the 3 directories {} do not have length 3 each".format("/".join(runnumberparts))
0072 try:
0073 return [int("".join(runnumberparts))]
0074 except ValueError:
0075 error = "the 3 directories {} do not form an integer".format("/".join(runnumberparts))
0076
0077 if error and trydas:
0078 try:
0079 query = "run file={} instance={}".format(filename, dasinstance)
0080 dasoutput = dasquery(query)
0081 result = findinjson(dasoutput, "run")
0082 return sum((findinjson(run, "run_number") for run in result), [])
0083 except Exception as e:
0084 error = str(e)
0085
0086 if error and allowunknown:
0087 return [-1]
0088
0089 if error:
0090 error = "could not figure out which run number this file is from.\nMaybe try with allowunknown=True?\n {}\n{}".format(filename, error)
0091 raise DatasetError(error)
0092
0093 def findinjson(jsondict, *strings):
0094 if len(strings) == 0:
0095 return jsondict
0096 if isinstance(jsondict,dict):
0097 if strings[0] in jsondict:
0098 try:
0099 return findinjson(jsondict[strings[0]], *strings[1:])
0100 except KeyError:
0101 pass
0102 else:
0103 for a in jsondict:
0104 if strings[0] in a:
0105 try:
0106 return findinjson(a[strings[0]], *strings[1:])
0107 except (TypeError, KeyError):
0108 pass
0109
0110 raise KeyError("Can't find " + strings[0])
0111
0112 class DataFile(object):
0113 def __init__(self, filename, nevents, runs=None, trydas=True, allowunknown=False, dasinstance=defaultdasinstance):
0114 self.filename = filename
0115 self.nevents = int(nevents)
0116 if runs is None:
0117 runs = getrunnumbersfromfile(filename, trydas=trydas, allowunknown=allowunknown, dasinstance=dasinstance)
0118 if isinstance(runs, str):
0119 runs = runs.split()
0120 self.runs = [int(_) for _ in runs]
0121
0122 def getdict(self):
0123 return {"filename": self.filename, "nevents": str(self.nevents), "runs": " ".join(str(_) for _ in self.runs)}
0124
0125 class DatasetBase(object):
0126 __metaclass__ = abc.ABCMeta
0127
0128 @abc.abstractmethod
0129 def getfiles(self, usecache):
0130 pass
0131
0132 @abc.abstractproperty
0133 def headercomment(self):
0134 pass
0135
0136 def writefilelist_validation(self, firstrun, lastrun, runs, maxevents, outputfile=None, usecache=True):
0137 runrange = RunRange(firstrun=firstrun, lastrun=lastrun, runs=runs)
0138
0139 if outputfile is None:
0140 outputfile = os.path.join(os.environ["CMSSW_BASE"], "src", "Alignment", "OfflineValidation", "python", self.filenamebase+"_cff.py")
0141
0142 if maxevents < 0: maxevents = float("inf")
0143 totalevents = sum(datafile.nevents for datafile in self.getfiles(usecache) if all(run in runrange for run in datafile.runs))
0144 if totalevents == 0:
0145 raise ValueError("No events within the run range!")
0146 accepted = rejected = 0.
0147
0148 fractiontoaccept = 1.*maxevents / totalevents
0149
0150 with open(outputfile, "w") as f:
0151 f.write("#"+self.headercomment+"\n")
0152 f.write(validationheader)
0153 for datafile in self.getfiles(usecache):
0154 if all(run in runrange for run in datafile.runs):
0155 if accepted == 0 or accepted / (accepted+rejected) <= fractiontoaccept:
0156 f.write('"' + datafile.filename + '",\n')
0157 accepted += datafile.nevents
0158 else:
0159 rejected += datafile.nevents
0160 elif any(run in runrange for run in datafile.runs):
0161 raise DatasetError("file {} has multiple runs {}, which straddle firstrun or lastrun".format(datafile.filename, datafile.runs))
0162 f.write("#total events in these files: {}".format(accepted))
0163 f.write(validationfooter)
0164
0165 def writefilelist_hippy(self, firstrun, lastrun, runs, eventsperjob, maxevents, outputfile, usecache=True):
0166 runrange = RunRange(firstrun=firstrun, lastrun=lastrun, runs=runs)
0167 if maxevents < 0: maxevents = float("inf")
0168 totalevents = sum(datafile.nevents for datafile in self.getfiles(usecache) if all(run in runrange for run in datafile.runs))
0169 if totalevents == 0:
0170 raise ValueError("No events within the run range!")
0171 accepted = rejected = inthisjob = 0.
0172
0173 fractiontoaccept = 1.*maxevents / totalevents
0174 writecomma = False
0175
0176 with open(outputfile, "w") as f:
0177 for datafile in self.getfiles(usecache):
0178 if all(run in runrange for run in datafile.runs):
0179 if accepted == 0 or accepted / (accepted+rejected) <= fractiontoaccept:
0180 if writecomma: f.write(",")
0181 f.write("'" + datafile.filename + "'")
0182 accepted += datafile.nevents
0183 inthisjob += datafile.nevents
0184 if inthisjob >= eventsperjob:
0185 f.write("\n")
0186 inthisjob = 0
0187 writecomma = False
0188 else:
0189 writecomma = True
0190 else:
0191 rejected += datafile.nevents
0192 elif any(run in runrange for run in datafile.runs):
0193 raise DatasetError("file {} has multiple runs {}, which straddle firstrun or lastrun".format(datafile.filename, datafile.runs))
0194 f.write("\n")
0195
0196 class Dataset(DatasetBase):
0197 def __init__(self, datasetname, dasinstance=defaultdasinstance):
0198 self.datasetname = datasetname
0199 if re.match(r'/.+/.+/.+', datasetname):
0200 self.official = True
0201 self.filenamebase = "Dataset" + self.datasetname.replace("/","_")
0202 else:
0203 self.official = False
0204 self.filenamebase = datasetname
0205
0206 self.dasinstance = dasinstance
0207
0208 @cache
0209 def getfiles(self, usecache):
0210 filename = os.path.join(os.environ["CMSSW_BASE"], "src", "Alignment", "CommonAlignment", "data", self.filenamebase+".csv")
0211 if not usecache:
0212 try:
0213 os.remove(filename)
0214 except OSError as e:
0215 if os.path.exists(filename):
0216 raise
0217
0218 result = []
0219 try:
0220 with open(filename) as f:
0221 for row in csv.DictReader(f):
0222 result.append(DataFile(**row))
0223 return result
0224 except IOError:
0225 pass
0226
0227 query = "file dataset={} instance={} detail=true | grep file.name, file.nevents".format(self.datasetname, self.dasinstance)
0228 dasoutput = dasquery(query)
0229 if not dasoutput:
0230 raise DatasetError("No files are available for the dataset '{}'. This can be "
0231 "due to a typo or due to a DAS problem. Please check the "
0232 "spelling of the dataset and/or try again.".format(datasetname))
0233 result = [DataFile(findinjson(_, "file", "name"), findinjson(_, "file", "nevents")) for _ in dasoutput if int(findinjson(_, "file", "nevents"))]
0234 try:
0235 with open(filename, "w") as f:
0236 writer = csv.DictWriter(f, ("filename", "nevents", "runs"))
0237 writer.writeheader()
0238 for datafile in result:
0239 writer.writerow(datafile.getdict())
0240 except Exception as e:
0241 print("Couldn't write the dataset csv file:\n\n{}".format(e))
0242 return result
0243
0244 @property
0245 def headercomment(self):
0246 return self.datasetname
0247
0248 class MultipleDatasets(DatasetBase):
0249 def __init__(self, *datasets, **kwargs):
0250 dasinstance = defaultdasinstance
0251 for kw, kwarg in kwargs.iteritems():
0252 if kw == "dasinstance":
0253 dasinstance = kwarg
0254 else:
0255 raise TypeError("Unknown kwarg {}={}".format(kw, kwarg))
0256 self.datasets = [Dataset(dataset, dasinstance=dasinstance) for dataset in datasets]
0257
0258 @cache
0259 def getfiles(self, usecache):
0260 return sum([d.getfiles(usecache=usecache) for d in self.datasets], [])
0261
0262 @property
0263 def headercomment(self):
0264 return ", ".join(d.headercomment for d in self.datasets)
0265
0266 validationheader = """
0267 import FWCore.ParameterSet.Config as cms
0268
0269 maxEvents = cms.untracked.PSet( input = cms.untracked.int32(-1) )
0270 readFiles = cms.untracked.vstring()
0271 secFiles = cms.untracked.vstring()
0272 source = cms.Source ("PoolSource",fileNames = readFiles, secondaryFileNames = secFiles)
0273 readFiles.extend( [
0274 """
0275
0276 validationfooter = """
0277 ] )
0278 """