Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:02

0001 
0002 import abc
0003 import csv
0004 import os
0005 import re
0006 
0007 import Utilities.General.cmssw_das_client as das_client
0008 
0009 from .utilities import cache
0010 
0011 class DatasetError(Exception): pass
0012 
0013 defaultdasinstance = "prod/global"
0014 
0015 class RunRange(object):
0016   def __init__(self, firstrun, lastrun, runs):
0017     self.firstrun = firstrun
0018     self.lastrun = lastrun
0019     self.runs = runs
0020 
0021   def __contains__(self, run):
0022     if self.runs and run not in self.runs: return False
0023     return self.firstrun <= run <= self.lastrun
0024 
0025 def dasquery(dasQuery, dasLimit=0):
0026   dasData = das_client.get_data(dasQuery, dasLimit)
0027   if isinstance(dasData, str):
0028     jsondict = json.loads( dasData )
0029   else:
0030     jsondict = dasData
0031   # Check, if the DAS query fails
0032   try:
0033     error = findinjson(jsondict, "data","error")
0034   except KeyError:
0035     error = None
0036   if error or findinjson(jsondict, "status") != 'ok' or "data" not in jsondict:
0037     try:
0038       jsonstr = findinjson(jsondict, "reason")
0039     except KeyError: 
0040       jsonstr = str(jsondict)
0041     if len(jsonstr) > 10000:
0042       jsonfile = "das_query_output_%i.txt"
0043       i = 0
0044       while os.path.lexists(jsonfile % i):
0045         i += 1
0046       jsonfile = jsonfile % i
0047       theFile = open( jsonfile, "w" )
0048       theFile.write( jsonstr )
0049       theFile.close()
0050       msg = "The DAS query returned an error.  The output is very long, and has been stored in:\n" + jsonfile
0051     else:
0052       msg = "The DAS query returned a error.  Here is the output\n" + jsonstr
0053     msg += "\nIt's possible that this was a server error.  If so, it may work if you try again later"
0054     raise DatasetError(msg)
0055   return findinjson(jsondict, "data")
0056 
0057 def getrunnumbersfromfile(filename, trydas=True, allowunknown=False, dasinstance=defaultdasinstance):
0058   parts = filename.split("/")
0059   error = None
0060   if parts[0] != "" or parts[1] != "store":
0061     error = "does not start with /store"
0062   elif parts[2] in ["mc", "relval"]:
0063     return [1]
0064   elif not parts[-1].endswith(".root"):
0065     error = "does not end with something.root"
0066   elif len(parts) != 12:
0067     error = "should be exactly 11 slashes counting the first one"
0068   else:
0069     runnumberparts = parts[-5:-2]
0070     if not all(len(part)==3 for part in runnumberparts):
0071       error = "the 3 directories {} do not have length 3 each".format("/".join(runnumberparts))
0072     try:
0073       return [int("".join(runnumberparts))]
0074     except ValueError:
0075       error = "the 3 directories {} do not form an integer".format("/".join(runnumberparts))
0076 
0077   if error and trydas:
0078     try:
0079       query = "run file={} instance={}".format(filename, dasinstance)
0080       dasoutput = dasquery(query)
0081       result = findinjson(dasoutput, "run")
0082       return sum((findinjson(run, "run_number") for run in result), [])
0083     except Exception as e:
0084       error = str(e)
0085 
0086   if error and allowunknown:
0087     return [-1]
0088 
0089   if error:
0090     error = "could not figure out which run number this file is from.\nMaybe try with allowunknown=True?\n  {}\n{}".format(filename, error)
0091     raise DatasetError(error)
0092 
0093 def findinjson(jsondict, *strings):
0094   if len(strings) == 0:
0095     return jsondict
0096   if isinstance(jsondict,dict):
0097     if strings[0] in jsondict:
0098       try:
0099         return findinjson(jsondict[strings[0]], *strings[1:])
0100       except KeyError:
0101         pass
0102   else:
0103     for a in jsondict:
0104       if strings[0] in a:
0105         try:
0106           return findinjson(a[strings[0]], *strings[1:])
0107         except (TypeError, KeyError):  #TypeError because a could be a string and contain strings[0]
0108           pass
0109   #if it's not found
0110   raise KeyError("Can't find " + strings[0])
0111 
0112 class DataFile(object):
0113   def __init__(self, filename, nevents, runs=None, trydas=True, allowunknown=False, dasinstance=defaultdasinstance):
0114     self.filename = filename
0115     self.nevents = int(nevents)
0116     if runs is None:
0117       runs = getrunnumbersfromfile(filename, trydas=trydas, allowunknown=allowunknown, dasinstance=dasinstance)
0118     if isinstance(runs, str):
0119       runs = runs.split()
0120     self.runs = [int(_) for _ in runs]
0121 
0122   def getdict(self):
0123     return {"filename": self.filename, "nevents": str(self.nevents), "runs": " ".join(str(_) for _ in self.runs)}
0124 
0125 class DatasetBase(object):
0126   __metaclass__ = abc.ABCMeta
0127 
0128   @abc.abstractmethod
0129   def getfiles(self, usecache):
0130     pass
0131 
0132   @abc.abstractproperty
0133   def headercomment(self):
0134     pass
0135 
0136   def writefilelist_validation(self, firstrun, lastrun, runs, maxevents, outputfile=None, usecache=True):
0137     runrange = RunRange(firstrun=firstrun, lastrun=lastrun, runs=runs)
0138 
0139     if outputfile is None:
0140       outputfile = os.path.join(os.environ["CMSSW_BASE"], "src", "Alignment", "OfflineValidation", "python", self.filenamebase+"_cff.py")
0141 
0142     if maxevents < 0: maxevents = float("inf")
0143     totalevents = sum(datafile.nevents for datafile in self.getfiles(usecache) if all(run in runrange for run in datafile.runs))
0144     if totalevents == 0:
0145       raise ValueError("No events within the run range!")
0146     accepted = rejected = 0.  #float so fractions are easier
0147 
0148     fractiontoaccept = 1.*maxevents / totalevents
0149 
0150     with open(outputfile, "w") as f:
0151       f.write("#"+self.headercomment+"\n")
0152       f.write(validationheader)
0153       for datafile in self.getfiles(usecache):
0154         if all(run in runrange for run in datafile.runs):
0155           if accepted == 0 or accepted / (accepted+rejected) <= fractiontoaccept:
0156             f.write('"' + datafile.filename + '",\n')
0157             accepted += datafile.nevents
0158           else:
0159             rejected += datafile.nevents
0160         elif any(run in runrange for run in datafile.runs):
0161           raise DatasetError("file {} has multiple runs {}, which straddle firstrun or lastrun".format(datafile.filename, datafile.runs))
0162       f.write("#total events in these files: {}".format(accepted))
0163       f.write(validationfooter)
0164 
0165   def writefilelist_hippy(self, firstrun, lastrun, runs, eventsperjob, maxevents, outputfile, usecache=True):
0166     runrange = RunRange(firstrun=firstrun, lastrun=lastrun, runs=runs)
0167     if maxevents < 0: maxevents = float("inf")
0168     totalevents = sum(datafile.nevents for datafile in self.getfiles(usecache) if all(run in runrange for run in datafile.runs))
0169     if totalevents == 0:
0170       raise ValueError("No events within the run range!")
0171     accepted = rejected = inthisjob = 0.  #float so fractions are easier
0172 
0173     fractiontoaccept = 1.*maxevents / totalevents
0174     writecomma = False
0175 
0176     with open(outputfile, "w") as f:
0177       for datafile in self.getfiles(usecache):
0178         if all(run in runrange for run in datafile.runs):
0179           if accepted == 0 or accepted / (accepted+rejected) <= fractiontoaccept:
0180             if writecomma: f.write(",")
0181             f.write("'" + datafile.filename + "'")
0182             accepted += datafile.nevents
0183             inthisjob += datafile.nevents
0184             if inthisjob >= eventsperjob:
0185               f.write("\n")
0186               inthisjob = 0
0187               writecomma = False
0188             else:
0189               writecomma = True
0190           else:
0191             rejected += datafile.nevents
0192         elif any(run in runrange for run in datafile.runs):
0193           raise DatasetError("file {} has multiple runs {}, which straddle firstrun or lastrun".format(datafile.filename, datafile.runs))
0194       f.write("\n")
0195 
0196 class Dataset(DatasetBase):
0197   def __init__(self, datasetname, dasinstance=defaultdasinstance):
0198     self.datasetname = datasetname
0199     if re.match(r'/.+/.+/.+', datasetname):
0200       self.official = True
0201       self.filenamebase = "Dataset" + self.datasetname.replace("/","_")
0202     else:
0203       self.official = False
0204       self.filenamebase = datasetname
0205 
0206     self.dasinstance = dasinstance
0207 
0208   @cache
0209   def getfiles(self, usecache):
0210     filename = os.path.join(os.environ["CMSSW_BASE"], "src", "Alignment", "CommonAlignment", "data", self.filenamebase+".csv")
0211     if not usecache:
0212       try:
0213         os.remove(filename)
0214       except OSError as e:
0215         if os.path.exists(filename):
0216           raise
0217 
0218     result = []
0219     try:
0220       with open(filename) as f:
0221         for row in csv.DictReader(f):
0222           result.append(DataFile(**row))
0223         return result
0224     except IOError:
0225       pass
0226 
0227     query = "file dataset={} instance={} detail=true | grep file.name, file.nevents".format(self.datasetname, self.dasinstance)
0228     dasoutput = dasquery(query)
0229     if not dasoutput:
0230       raise DatasetError("No files are available for the dataset '{}'. This can be "
0231                          "due to a typo or due to a DAS problem. Please check the "
0232                          "spelling of the dataset and/or try again.".format(datasetname))
0233     result = [DataFile(findinjson(_, "file", "name"), findinjson(_, "file", "nevents")) for _ in dasoutput if int(findinjson(_, "file", "nevents"))]
0234     try:
0235       with open(filename, "w") as f:
0236         writer = csv.DictWriter(f, ("filename", "nevents", "runs"))
0237         writer.writeheader()
0238         for datafile in result:
0239           writer.writerow(datafile.getdict())
0240     except Exception as e:
0241       print("Couldn't write the dataset csv file:\n\n{}".format(e))
0242     return result
0243 
0244   @property
0245   def headercomment(self):
0246     return self.datasetname
0247 
0248 class MultipleDatasets(DatasetBase):
0249   def __init__(self, *datasets, **kwargs):
0250     dasinstance = defaultdasinstance
0251     for kw, kwarg in kwargs.iteritems():
0252       if kw == "dasinstance":
0253         dasinstance = kwarg
0254       else:
0255         raise TypeError("Unknown kwarg {}={}".format(kw, kwarg))
0256     self.datasets = [Dataset(dataset, dasinstance=dasinstance) for dataset in datasets]
0257 
0258   @cache
0259   def getfiles(self, usecache):
0260     return sum([d.getfiles(usecache=usecache) for d in self.datasets], [])
0261 
0262   @property
0263   def headercomment(self):
0264     return ", ".join(d.headercomment for d in self.datasets)
0265 
0266 validationheader = """
0267 import FWCore.ParameterSet.Config as cms
0268 
0269 maxEvents = cms.untracked.PSet( input = cms.untracked.int32(-1) )
0270 readFiles = cms.untracked.vstring()
0271 secFiles = cms.untracked.vstring()
0272 source = cms.Source ("PoolSource",fileNames = readFiles, secondaryFileNames = secFiles)
0273 readFiles.extend( [
0274 """
0275 
0276 validationfooter = """
0277 ] )
0278 """