Back to home page

Project CMSSW displayed by LXR

 
 

    


Warning, /DQMServices/Components/scripts/dqm-access is written in an unsupported language. File is not indexed.

0001 #!/usr/bin/env python3
0002 
0003 from DQMServices.Components.HTTP import RequestManager
0004 from DQMServices.Components.X509 import SSLOptions
0005 from optparse import OptionParser
0006 from time import strptime, time
0007 import sys, re, json, pycurl
0008 from urllib.parse import quote
0009 
0010 DIR = 0
0011 FILE = 1
0012 ANY = 2
0013 
0014 ident = "DQMAccess/1.0 python/%s.%s.%s" % sys.version_info[:3]
0015 url_content = "/%(section)s/%(run)d%(dataset)s%(path)s"
0016 ssl_opts = None
0017 reqman = None
0018 nreq = 0
0019 found = []
0020 
0021 #-------------------------------------------------------------------------------
0022 class filter:
0023   type = FILE
0024   recurse = False
0025   pattern = ""
0026   rx = None
0027 
0028   def __repr__(self):
0029     return "(filter pattern='%s' type=%s recurse=%s)" \
0030       % (self.pattern, self.type, self.recurse)
0031 
0032 #-------------------------------------------------------------------------------
0033 def pattern_to_filter(pattern):
0034   """Converts a search pattern into a search filter.
0035 
0036   The pattern must be of the form of path with '*' wild card pattern, for
0037   example "/*/EventInfo/*Summary". A single star matches any string except
0038   slashes, i.e. matching within a single directory. A double star matches
0039   directories recursively.
0040 
0041   A name with trailing slash matches directory. A name without the trailing
0042   slash matches non-directories. A triple star will match either directory
0043   or non-directory.
0044 
0045   The patterns match against the full path, and therefore must always start
0046   with a slash. If you want to search entire tree, use "/**/Name*".
0047 
0048   The pattern "/*/EventInfo/" matches folders named 'EventInfo' one level
0049   down from the top; it will not recurse further down in the tree as it is
0050   known matches deeper inside are not possible.  "/*/EventInfo/**/*Summary"
0051   pattern matches any non-directory object name ending in "Summary" anywhere
0052   inside "EventInfo" one level down from the top.
0053 
0054   Returns a list of `filter` expressions representing the pattern, each of
0055   which represents one or more levels of matching/recursion.
0056   """
0057   filters = []
0058 
0059   # Check the pattern starts with '/'
0060   if not pattern.startswith("/"):
0061     raise ValueError("pattern must start with slash")
0062 
0063   # Process pattern as directory search specs, but collapse
0064   # repeated slashes into one slash first.
0065   for part in re.sub("/+", "/", pattern).split('/')[1:]:
0066     if filters and filters[-1].type == FILE:
0067       filters[-1].type = DIR
0068     f = filter()
0069     filters.append(f)
0070     for term in re.split("([*]+)", part):
0071       if term == "***":
0072         f.pattern += ".*"
0073         f.recurse = True
0074         f.type = ANY
0075       elif term == "**":
0076         f.pattern += ".*"
0077         f.recurse = True
0078         f.type = DIR
0079       elif term == "*":
0080         f.pattern += "[^/]*"
0081         f.type = FILE
0082       elif term:
0083         f.pattern += re.escape(term)
0084         f.type = FILE
0085     if f.pattern != ".*":
0086       f.pattern = "^%s$" % f.pattern
0087     f.rx = re.compile(f.pattern)
0088 
0089   last = filters[-1]
0090   if last.type == FILE and not last.recurse and last.pattern == "^$":
0091     filters.pop()
0092 
0093   return filters
0094 
0095 #-------------------------------------------------------------------------------
0096 def should_process_sample(s, expr):
0097   """Evaluate sample predicate expression `expr` against sample `s`.
0098   Returns True if the sample should be processed, False otherwise."""
0099   try:
0100     s['match'] = lambda rx, str: re.match(rx, str)
0101     s['run'] = int(s['run'])
0102     val = eval(expr, {}, s)
0103     del s['match']
0104     return val
0105   except:
0106     return False
0107 
0108 #-------------------------------------------------------------------------------
0109 def find_matching_samples(options):
0110   """Generator which returns all samples at target sever which
0111   match the requested predicate expression."""
0112   all_samples = {}
0113 
0114   def req_error(c, url, errmsg, errno):
0115     print("%s: failed to retrieve samples: %s (%d)" \
0116       % (options.server, errmsg, errno), file=sys.stderr)
0117     sys.exit(1)
0118 
0119   def req_done(c):
0120     json_decoder = json.decoder.JSONDecoder()
0121     samples = c.buffer.getvalue().decode('utf-8')
0122     all_samples['result'] = json_decoder.decode(samples)
0123 
0124   reqman = RequestManager(ssl_opts = ssl_opts,
0125                           user_agent = ident,
0126                           request_respond = req_done,
0127                           request_error = req_error)
0128   print(options.server + "/samples")
0129   reqman.put((options.server + "/samples",))
0130   reqman.process()
0131 
0132   if not all_samples:
0133     print("%s: no samples" % options.server, file=sys.stderr)
0134     sys.exit(1)
0135 
0136   for sample_type in all_samples['result']['samples']:
0137     for sample in sample_type['items']:
0138       if should_process_sample(sample, options.sample_expr):
0139         yield sample
0140 
0141 #-------------------------------------------------------------------------------
0142 def fetch_tstreamerinfo(options, dataset):
0143   topdir = {}
0144 
0145   def req_error(c, url, errmsg, errno):
0146     print("%s: failed to retrieve TStreamerInfo: %s (%d)" \
0147       % (options.server, errmsg, errno), file=sys.stderr)
0148     sys.exit(1)
0149 
0150   def req_done(c):
0151     json_decoder = json.decoder.JSONDecoder()
0152     topdir["contents"] = json_decoder.decode(c.buffer.getvalue().decode('utf-8'))['contents']
0153 
0154   reqman = RequestManager(ssl_opts = ssl_opts,
0155                           user_agent = ident,
0156                           request_respond = req_done,
0157                           request_error = req_error)
0158 
0159   reqman.put((options.server + "/archive/" + dataset + "?rootcontent=1",))
0160   reqman.process()
0161 
0162   return topdir["contents"][0]["streamerinfo"]
0163 
0164 #-------------------------------------------------------------------------------
0165 def request_init(c, options, sample, path, filterspec):
0166   sample.update(path = path)
0167   c.url = options.server + quote(url_content % sample)
0168   if options.fetch_root:
0169     c.url+="?rootcontent=1"
0170   c.setopt(pycurl.URL, c.url)
0171   if False and options.verbose:
0172     print(c.url)
0173 
0174 #-------------------------------------------------------------------------------
0175 def report_error(c, task, errmsg, errno):
0176   print("FAILED to retrieve %s: %s (%d)" % (task, errmsg, errno), file=sys.stderr)
0177 
0178 #-------------------------------------------------------------------------------
0179 def match_filters(item, filters, poslist):
0180   newposlist = []
0181   descend = False
0182   matched = False
0183   name = None
0184 
0185   for idx in poslist:
0186     assert idx < len(filters)
0187     f = filters[idx]
0188     fmatched = False
0189     if 'subdir' in item \
0190        and (f.type == DIR or f.type == ANY) \
0191        and f.rx.match(item['subdir']):
0192       fmatched = descend = True
0193       name = item['subdir']
0194     elif 'obj' in item \
0195          and (f.type == FILE or f.type == ANY) \
0196          and f.rx.match(item['obj']):
0197       fmatched = True
0198       name = item['obj']
0199 
0200     if fmatched:
0201       if idx == len(filters)-1:
0202         matched = True
0203       if f.recurse:
0204         newposlist.append(idx)
0205       if idx < len(filters) - 1:
0206         newposlist.append(idx+1)
0207 
0208   return name, matched, descend, newposlist
0209 
0210 #-------------------------------------------------------------------------------
0211 def process(c):
0212   global found, nreq
0213   options, sample, path, filterspec = c.task
0214   json_decoder = json.decoder.JSONDecoder()
0215 
0216   nreq += 1
0217   if options.verbose and nreq % 10 == 0:
0218     sys.stdout.write(".")
0219     sys.stdout.flush()
0220     if nreq % 750 == 0:
0221       print()
0222 
0223   reply = c.buffer.getvalue().decode('utf-8')
0224   reply = re.sub(r'("value": ")"([A-Za-z0-9_]+")"', r'\1\2', reply)
0225   reply = re.sub(r'("(?:mean|rms|min|max)":) nan,', r'\1 "NaN",', reply)
0226   reply = re.sub(r'("(?:mean|rms|min|max|nentries)":) inf,', r'\1 "float(\'inf\')",', reply)
0227   reply = re.sub(r'("(?:mean|rms|min|max|nentries)":) -inf,', r'\1 "-float(\'inf\')",', reply)
0228   reply = json_decoder.decode(reply)
0229 
0230   newreq = {}
0231   for item in reply['contents']:
0232     for filters, pos in filterspec:
0233       name, match, descend, newpos = match_filters(item, filters, pos)
0234       if match and (not found or found[-1][0] != path + name):
0235         found.append((path + name, item))
0236       if descend:
0237         newpath = path + name + "/"
0238         if newpath not in newreq:
0239           newreq[newpath] = []
0240         newreq[newpath].append((filters, newpos))
0241 
0242   for path, filterspec in iter(newreq.items()):
0243     reqman.put((options, sample, path, filterspec))
0244 
0245 #-------------------------------------------------------------------------------
0246 op = OptionParser()
0247 op.add_option("-v", "--verbose", dest = "verbose",
0248               action = "store_true", default = False,
0249               help = "Show verbose scan information")
0250 op.add_option("-c", dest = "dqmCompliant",
0251               action = "store_true", default = False,
0252               help = "Fetch DQM compliant root content")
0253 op.add_option("-d", "--debug", dest = "debug",
0254               action = "store_true", default = False,
0255               help = "Show debug information")
0256 op.add_option("-k", "--debug-streamers", dest = "debug_streamers",
0257               action = "store_true", default = False,
0258               help = "Show debug information on StreamerInfo objects")
0259 op.add_option("-l", dest = "long_listing",
0260               action = "store_true", default = False,
0261               help = "Enable long listing")
0262 op.add_option("-r", dest = "fetch_root",
0263               action = "store_true", default = False,
0264               help = "Fetch root content!")
0265 op.add_option("-w", dest = "write",
0266               action = "store_true", default = False,
0267               help = "Write fetched root objects on disk")
0268 op.add_option("-n", "--connections", dest = "connections",
0269               type = "int", action = "store", metavar = "NUM",
0270               default = 10, help = "Use NUM concurrent connections")
0271 op.add_option("-s", "--server", dest = "server",
0272               type = "string", action = "store", metavar = "SERVER",
0273               default = "https://cmsweb.cern.ch/dqm/relval/data/json",
0274               help = "Pull content from SERVER")
0275 op.add_option("-e", "--samples", dest = "sample_expr", metavar = "EXPR",
0276               help = "Evaluate EXPR to decide which samples to scan")
0277 op.add_option("-f", "--filter", dest = "glob",
0278               type = "string", action = "append", metavar = "GLOB",
0279               default = [],
0280               help = "Filter monitor elements matching GLOB pattern")
0281 options, args = op.parse_args()
0282 if args:
0283   print("Too many arguments", file=sys.stderr)
0284   sys.exit(1)
0285 if not options.sample_expr:
0286   print("Sample predicate expression required", file=sys.stderr)
0287   sys.exit(1)
0288 if not options.glob:
0289   print("Monitor element filter expression(s) required", file=sys.stderr)
0290   sys.exit(1)
0291 if not options.server:
0292   print("Server contact string required", file=sys.stderr)
0293   sys.exit(1)
0294 
0295 # Adjust options
0296 if options.debug:
0297   options.verbose = True
0298 
0299 if options.write:
0300   options.fetch_root = True
0301   from ROOT import TFile
0302   from DQMServices.Components.ROOTData import *
0303 
0304 ssl_opts = SSLOptions()
0305 if options.verbose:
0306   print("Using SSL cert dir", ssl_opts.ca_path)
0307   print("Using SSL private key", ssl_opts.key_file)
0308   print("Using SSL public key", ssl_opts.cert_file)
0309 
0310 # Convert each glob pattern into a filter expression.
0311 filter_sets = map(lambda glob: pattern_to_filter(glob), options.glob)
0312 
0313 # Start a request manager.
0314 reqman = RequestManager(num_connections = options.connections,
0315                         ssl_opts = ssl_opts,
0316                         user_agent = ident,
0317                         request_init = request_init,
0318                         request_respond = process,
0319                         request_error = report_error)
0320 
0321 # Process all samples matching the predicate.
0322 ntotreq = 0
0323 nfound = 0
0324 start = time()
0325 for sample in find_matching_samples(options):
0326   tstreamerinfo = None
0327   if options.write:
0328     tstreamerinfo = fetch_tstreamerinfo(options, "%(run)d%(dataset)s" % sample)
0329     literal2root(tstreamerinfo, "TStreamerInfo", options.debug_streamers)
0330 
0331   nreq = 0
0332   found = []
0333   sample['section'] = 'archive'
0334   if options.verbose:
0335     print("Scanning %s" % sample)
0336   reqman.put((options, sample, "/", list(map(lambda f: (f, [0]), filter_sets))))
0337   reqman.process()
0338   if options.verbose:
0339     print()
0340   if found:
0341     print("%(section)s/%(run)d%(dataset)s:" % sample)
0342     found.sort()
0343     cwd = None
0344 
0345     if options.dqmCompliant:
0346       fname = "DQM_V0001_R%09d" % sample['run'] \
0347               + ("_%(dataset)s.root" % sample)[1:].replace("/", "__")
0348     else:
0349       fname = ("%(dataset)s__run%(run)s.root" % sample)[1:].replace("/", "__")
0350 
0351     # If writing, create output file with directories
0352     ofile = None
0353     if options.write:
0354       ofile = TFile(fname, "RECREATE")
0355       ofile.cd()
0356 
0357     for path, item in found:
0358       if options.dqmCompliant:
0359           path = "DQMData/Run %d" % sample['run'] \
0360           + re.sub("^/"+path.split('/')[1],
0361                    "/" + path.split('/')[1] + "/Run summary",
0362                    path)
0363 
0364       # We are treating a directory
0365       if 'subdir' in item:
0366         print(" %s/%s" % (path, (options.long_listing and " = directory") or ""))
0367         if options.write:
0368           tfile_cd(path,ofile)
0369 
0370       # We are treating an int/double/string
0371       elif 'value' in item:
0372         print(" %s%s" % (path, (options.long_listing and " = %s" % item['value']) or ""))
0373 
0374       # We have a TObject: histo, graph, profile...; maybe write to a file.
0375       else:
0376         message = " %s" % path
0377         if options.long_listing:
0378           message += " = [%s # %d]" % (item['properties']['type'], item['nentries'])
0379         if options.debug:
0380           message += " %s" % item['rootobj']
0381         print(message)
0382         if options.write:
0383           indir = path.rsplit("/", 1)[0]
0384           if cwd != indir:
0385             tfile_cd(indir, ofile)
0386             cwd = indir
0387           obj = literal2root(item['rootobj'], item['properties']['type'])
0388           obj.Write()
0389 
0390     if options.write and ofile:
0391       ofile.Close()
0392 
0393   nfound += len(found)
0394   ntotreq += nreq
0395 end = time()
0396 
0397 if options.verbose:
0398   print("\nFound %d objects in %d directories in %.3f seconds" % (nfound, ntotreq, end - start))