Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244
#!/usr/bin/env python3
import re
import json
import ROOT
import sqlite3
import argparse
import subprocess
import multiprocessing
import fnmatch

ROOTPREFIX = "root://cms-xrd-global.cern.ch/"
#ROOTPREFIX = "root://eoscms//eos/cms" # for more local files

parser = argparse.ArgumentParser(description="Collect a MEs from DQMIO data, with maximum possible granularity")

parser.add_argument('dataset', help='dataset name, like "/StreamHIExpress/HIRun2018A-Express-v1/DQMIO"')
parser.add_argument('-o', '--output', help='SQLite file to write', default='dqmio.sqlite')
parser.add_argument('-j', '--njobs', help='Number of threads to read files', type=int, default=1)
parser.add_argument('-l', '--limit', help='Only load up to LIMIT files', type=int, default=-1)
args = parser.parse_args()


# we can save a lot of time by only scanning some types, if we know all interesting MEs are of these types.
interesting_types = {
  "TH1Fs",
  "TH1Ds",
  "TH2Fs"
}

# insert the list of needed histograms below, wild cards are usable
interesting_mes = [

"PixelPhase1/Phase1_MechanicalView/PXBarrel/adc_PXLayer*",

]

inf = re.compile("([- \[])inf([,}\]])")
nan = re.compile("([- \[])nan([,}\]])")

def check_interesting(mename):
  for pattern in interesting_mes:
    if fnmatch.fnmatch(mename,pattern):
      return True
  return False

def tosqlite(x):
    if isinstance(x, ROOT.string):
        try:
            return unicode(x.data())
        except:
            return buffer(x.data())
    if isinstance(x, int):
        return x
    if isinstance(x, float):
        return x
    if isinstance(x, int):
        return x
    else:
        try: 
            rootobj = unicode(ROOT.TBufferJSON.ConvertToJSON(x))
            # turns out ROOT does not generate valid JSON for NaN/inf
            clean = nan.sub('\\g<1>0\\g<2>', inf.sub('\\g<1>1e38\\g<2>', rootobj))
            obj = json.loads(clean)
            jsonobj = json.dumps(obj, allow_nan=False)
            return jsonobj
        except Exception as e:
            return json.dumps({"root2sqlite_error": e.__repr__(), "root2sqlite_object": x.__repr__()})

def dasquery(dataset):
    if not dataset.endswith("DQMIO"):
        raise Exception("This tool probably cannot read the dataset you specified. The name should end with DQMIO.")
    dasquery = ["dasgoclient",  "-query=file dataset=%s" % dataset]
    print("Querying das ... %s" % dasquery)
    files = subprocess.check_output(dasquery)
    files = files.splitlines()
    print("Got %d files." % len(files))
    return files


treenames = { 
  0: "Ints",
  1: "Floats",
  2: "Strings",
  3: "TH1Fs",
  4: "TH1Ss",
  5: "TH1Ds",
  6: "TH2Fs",
  7: "TH2Ss",
  8: "TH2Ds",
  9: "TH3Fs",
  10: "TProfiles",
  11: "TProfile2Ds",
}

maketable = """
  CREATE TABLE IF NOT EXISTS monitorelements (
    name,
    fromrun, fromlumi, torun, tolumi,
    metype,
    value
  ); """
makeindex = """
  CREATE INDEX runorder ON monitorelements(fromrun, fromlumi);
"""
insertinto = """
  INSERT INTO monitorelements (
    name,
    fromrun, fromlumi, torun, tolumi,
    metype,
    value
  ) VALUES (
    ?, ?, ?, ?, ?, ?, ?
  ); """
dumpmes = """
  SELECT fromlumi, tolumi, fromrun, name, value FROM monitorelements ORDER BY fromrun, fromlumi ASC;
"""

db = sqlite3.connect(args.output)
db.execute(maketable)
db.execute(makeindex)

def harvestfile(fname):
    f = ROOT.TFile.Open(ROOTPREFIX + fname)
    idxtree = getattr(f, "Indices")
    #idxtree.GetEntry._threaded = True # now the blocking call should release the GIL...

    # we have no good way to find out which lumis where processed in a job.
    # so we watch the per-lumi indices and assume that all mentioned lumis 
    # are covered in the end-of-job MEs. This might fail if there are no 
    # per-lumi MEs.
    knownlumis = set()
    mes_to_store = []

    for i in range(idxtree.GetEntries()):
        idxtree.GetEntry(i)
        run, lumi, metype = idxtree.Run, idxtree.Lumi, idxtree.Type
        if lumi != 0:
            knownlumis.add(lumi)

        if not treenames[metype] in interesting_types:
          continue

        endrun = run # assume no multi-run files for now
        if lumi == 0: # per-job ME
            endlumi = max(knownlumis)
            lumi = min(knownlumis)
        else: 
            endlumi = lumi

        # inclusive range -- for 0 entries, row is left out
        firstidx, lastidx = idxtree.FirstIndex, idxtree.LastIndex
        metree = getattr(f, treenames[metype])
        metree.GetEntry(0)
        metree.SetBranchStatus("*",0)
        metree.SetBranchStatus("FullName",1)

        for x in range(firstidx, lastidx+1):
            metree.GetEntry(x)
            mename = str(metree.FullName)

            if mename.find("AlCaReco") != -1: 
              continue

            if mename.find("Isolated") != -1:
              continue
            
            if mename.find("HLT") != -1:
              continue
            
            if not ((mename.find("SiStrip") >= 0) or (mename.find("OfflinePV") >= 0) or (mename.find("PixelPhase1") >= 0) or (mename.find("Tracking") >= 0 )):    
              continue

            if check_interesting(mename):
                metree.GetEntry(x, 1)
                value = metree.Value

                mes_to_store.append((
                  mename,
                  run, lumi, endrun, endlumi,
                  metype,
                  tosqlite(value),
                ))

    return mes_to_store

files = dasquery(args.dataset)
if args.limit > 0: files = files[:args.limit]

pool = multiprocessing.Pool(processes=args.njobs)
ctr = 0
for mes_to_store in pool.imap_unordered(harvestfile, files):
#for mes_to_store in map(harvestfile, files):
    db.executemany(insertinto, mes_to_store);
    db.commit()
    ctr += 1
    print("Processed %d files of %d, got %d MEs...\r" % (ctr, len(files), len(mes_to_store)),  end='')
print("\nDone.")

sqlite2tree = """
// Convert the sqlite format saved above back into a TTree.
// Saving TTrees with objects (TH1's) seems to be close to impossible in Python,
// so we do the roundtrip via SQLite and JSON in a ROOT macro.
// This needs a ROOT with TBufferJSON::FromJSON, which the 6.12 in CMSSW for
// for now does not have. We can load a newer version from SFT (on lxplus6,
// in (!) a cmsenv):
// source /cvmfs/sft.cern.ch/lcg/releases/ROOT/6.16.00-f8770/x86_64-slc6-gcc8-opt/bin/thisroot.sh
// root sqlite2tree.C
// It is rather slow, but the root file is a lot more compact.

int run;
int fromlumi;
int tolumi;
TString* name;
TH2F* value;

int sqlite2tree() {

  auto sql = TSQLiteServer("sqlite:///dev/shm/schneiml/CMSSW_10_5_0_pre1/src/dqmio.sqlite");
  auto query = "SELECT fromlumi, tolumi, fromrun, name, value FROM monitorelements ORDER BY fromrun, fromlumi ASC;";
  auto res = sql.Query(query);

  TFile outfile("/dev/shm/dqmio.root", "RECREATE");
  auto outtree = new TTree("MEs", "MonitorElements by run and lumisection");
  auto nameb     = outtree->Branch("name",    &name);
  auto valueb    = outtree->Branch("value",   &value,128*1024);
  auto runb      = outtree->Branch("run",     &run);
  auto fromlumib = outtree->Branch("fromlumi",&fromlumi);
  auto tolumib   = outtree->Branch("tolumi",  &tolumi);


  while (auto row = res->Next()) {
    fromlumi = atoi(row->GetField(0));
    tolumi   = atoi(row->GetField(1));
    run      = atoi(row->GetField(2));
    name  = new TString(row->GetField(3));
    value = nullptr;
    TBufferJSON::FromJSON(value, row->GetField(4));
    outtree->Fill();
  }
  return 0;
}
"""