Warning, /DQMServices/Components/scripts/dqm-access is written in an unsupported language. File is not indexed.
0001 #!/usr/bin/env python3
0002
0003 from DQMServices.Components.HTTP import RequestManager
0004 from DQMServices.Components.X509 import SSLOptions
0005 from optparse import OptionParser
0006 from time import strptime, time
0007 import sys, re, json, pycurl
0008 from urllib.parse import quote
0009
0010 DIR = 0
0011 FILE = 1
0012 ANY = 2
0013
0014 ident = "DQMAccess/1.0 python/%s.%s.%s" % sys.version_info[:3]
0015 url_content = "/%(section)s/%(run)d%(dataset)s%(path)s"
0016 ssl_opts = None
0017 reqman = None
0018 nreq = 0
0019 found = []
0020
0021 #-------------------------------------------------------------------------------
0022 class filter:
0023 type = FILE
0024 recurse = False
0025 pattern = ""
0026 rx = None
0027
0028 def __repr__(self):
0029 return "(filter pattern='%s' type=%s recurse=%s)" \
0030 % (self.pattern, self.type, self.recurse)
0031
0032 #-------------------------------------------------------------------------------
0033 def pattern_to_filter(pattern):
0034 """Converts a search pattern into a search filter.
0035
0036 The pattern must be of the form of path with '*' wild card pattern, for
0037 example "/*/EventInfo/*Summary". A single star matches any string except
0038 slashes, i.e. matching within a single directory. A double star matches
0039 directories recursively.
0040
0041 A name with trailing slash matches directory. A name without the trailing
0042 slash matches non-directories. A triple star will match either directory
0043 or non-directory.
0044
0045 The patterns match against the full path, and therefore must always start
0046 with a slash. If you want to search entire tree, use "/**/Name*".
0047
0048 The pattern "/*/EventInfo/" matches folders named 'EventInfo' one level
0049 down from the top; it will not recurse further down in the tree as it is
0050 known matches deeper inside are not possible. "/*/EventInfo/**/*Summary"
0051 pattern matches any non-directory object name ending in "Summary" anywhere
0052 inside "EventInfo" one level down from the top.
0053
0054 Returns a list of `filter` expressions representing the pattern, each of
0055 which represents one or more levels of matching/recursion.
0056 """
0057 filters = []
0058
0059 # Check the pattern starts with '/'
0060 if not pattern.startswith("/"):
0061 raise ValueError("pattern must start with slash")
0062
0063 # Process pattern as directory search specs, but collapse
0064 # repeated slashes into one slash first.
0065 for part in re.sub("/+", "/", pattern).split('/')[1:]:
0066 if filters and filters[-1].type == FILE:
0067 filters[-1].type = DIR
0068 f = filter()
0069 filters.append(f)
0070 for term in re.split("([*]+)", part):
0071 if term == "***":
0072 f.pattern += ".*"
0073 f.recurse = True
0074 f.type = ANY
0075 elif term == "**":
0076 f.pattern += ".*"
0077 f.recurse = True
0078 f.type = DIR
0079 elif term == "*":
0080 f.pattern += "[^/]*"
0081 f.type = FILE
0082 elif term:
0083 f.pattern += re.escape(term)
0084 f.type = FILE
0085 if f.pattern != ".*":
0086 f.pattern = "^%s$" % f.pattern
0087 f.rx = re.compile(f.pattern)
0088
0089 last = filters[-1]
0090 if last.type == FILE and not last.recurse and last.pattern == "^$":
0091 filters.pop()
0092
0093 return filters
0094
0095 #-------------------------------------------------------------------------------
0096 def should_process_sample(s, expr):
0097 """Evaluate sample predicate expression `expr` against sample `s`.
0098 Returns True if the sample should be processed, False otherwise."""
0099 try:
0100 s['match'] = lambda rx, str: re.match(rx, str)
0101 s['run'] = int(s['run'])
0102 val = eval(expr, {}, s)
0103 del s['match']
0104 return val
0105 except:
0106 return False
0107
0108 #-------------------------------------------------------------------------------
0109 def find_matching_samples(options):
0110 """Generator which returns all samples at target sever which
0111 match the requested predicate expression."""
0112 all_samples = {}
0113
0114 def req_error(c, url, errmsg, errno):
0115 print("%s: failed to retrieve samples: %s (%d)" \
0116 % (options.server, errmsg, errno), file=sys.stderr)
0117 sys.exit(1)
0118
0119 def req_done(c):
0120 json_decoder = json.decoder.JSONDecoder()
0121 samples = c.buffer.getvalue().decode('utf-8')
0122 all_samples['result'] = json_decoder.decode(samples)
0123
0124 reqman = RequestManager(ssl_opts = ssl_opts,
0125 user_agent = ident,
0126 request_respond = req_done,
0127 request_error = req_error)
0128 print(options.server + "/samples")
0129 reqman.put((options.server + "/samples",))
0130 reqman.process()
0131
0132 if not all_samples:
0133 print("%s: no samples" % options.server, file=sys.stderr)
0134 sys.exit(1)
0135
0136 for sample_type in all_samples['result']['samples']:
0137 for sample in sample_type['items']:
0138 if should_process_sample(sample, options.sample_expr):
0139 yield sample
0140
0141 #-------------------------------------------------------------------------------
0142 def fetch_tstreamerinfo(options, dataset):
0143 topdir = {}
0144
0145 def req_error(c, url, errmsg, errno):
0146 print("%s: failed to retrieve TStreamerInfo: %s (%d)" \
0147 % (options.server, errmsg, errno), file=sys.stderr)
0148 sys.exit(1)
0149
0150 def req_done(c):
0151 json_decoder = json.decoder.JSONDecoder()
0152 topdir["contents"] = json_decoder.decode(c.buffer.getvalue().decode('utf-8'))['contents']
0153
0154 reqman = RequestManager(ssl_opts = ssl_opts,
0155 user_agent = ident,
0156 request_respond = req_done,
0157 request_error = req_error)
0158
0159 reqman.put((options.server + "/archive/" + dataset + "?rootcontent=1",))
0160 reqman.process()
0161
0162 return topdir["contents"][0]["streamerinfo"]
0163
0164 #-------------------------------------------------------------------------------
0165 def request_init(c, options, sample, path, filterspec):
0166 sample.update(path = path)
0167 c.url = options.server + quote(url_content % sample)
0168 if options.fetch_root:
0169 c.url+="?rootcontent=1"
0170 c.setopt(pycurl.URL, c.url)
0171 if False and options.verbose:
0172 print(c.url)
0173
0174 #-------------------------------------------------------------------------------
0175 def report_error(c, task, errmsg, errno):
0176 print("FAILED to retrieve %s: %s (%d)" % (task, errmsg, errno), file=sys.stderr)
0177
0178 #-------------------------------------------------------------------------------
0179 def match_filters(item, filters, poslist):
0180 newposlist = []
0181 descend = False
0182 matched = False
0183 name = None
0184
0185 for idx in poslist:
0186 assert idx < len(filters)
0187 f = filters[idx]
0188 fmatched = False
0189 if 'subdir' in item \
0190 and (f.type == DIR or f.type == ANY) \
0191 and f.rx.match(item['subdir']):
0192 fmatched = descend = True
0193 name = item['subdir']
0194 elif 'obj' in item \
0195 and (f.type == FILE or f.type == ANY) \
0196 and f.rx.match(item['obj']):
0197 fmatched = True
0198 name = item['obj']
0199
0200 if fmatched:
0201 if idx == len(filters)-1:
0202 matched = True
0203 if f.recurse:
0204 newposlist.append(idx)
0205 if idx < len(filters) - 1:
0206 newposlist.append(idx+1)
0207
0208 return name, matched, descend, newposlist
0209
0210 #-------------------------------------------------------------------------------
0211 def process(c):
0212 global found, nreq
0213 options, sample, path, filterspec = c.task
0214 json_decoder = json.decoder.JSONDecoder()
0215
0216 nreq += 1
0217 if options.verbose and nreq % 10 == 0:
0218 sys.stdout.write(".")
0219 sys.stdout.flush()
0220 if nreq % 750 == 0:
0221 print()
0222
0223 reply = c.buffer.getvalue().decode('utf-8')
0224 reply = re.sub(r'("value": ")"([A-Za-z0-9_]+")"', r'\1\2', reply)
0225 reply = re.sub(r'("(?:mean|rms|min|max)":) nan,', r'\1 "NaN",', reply)
0226 reply = re.sub(r'("(?:mean|rms|min|max|nentries)":) inf,', r'\1 "float(\'inf\')",', reply)
0227 reply = re.sub(r'("(?:mean|rms|min|max|nentries)":) -inf,', r'\1 "-float(\'inf\')",', reply)
0228 reply = json_decoder.decode(reply)
0229
0230 newreq = {}
0231 for item in reply['contents']:
0232 for filters, pos in filterspec:
0233 name, match, descend, newpos = match_filters(item, filters, pos)
0234 if match and (not found or found[-1][0] != path + name):
0235 found.append((path + name, item))
0236 if descend:
0237 newpath = path + name + "/"
0238 if newpath not in newreq:
0239 newreq[newpath] = []
0240 newreq[newpath].append((filters, newpos))
0241
0242 for path, filterspec in iter(newreq.items()):
0243 reqman.put((options, sample, path, filterspec))
0244
0245 #-------------------------------------------------------------------------------
0246 op = OptionParser()
0247 op.add_option("-v", "--verbose", dest = "verbose",
0248 action = "store_true", default = False,
0249 help = "Show verbose scan information")
0250 op.add_option("-c", dest = "dqmCompliant",
0251 action = "store_true", default = False,
0252 help = "Fetch DQM compliant root content")
0253 op.add_option("-d", "--debug", dest = "debug",
0254 action = "store_true", default = False,
0255 help = "Show debug information")
0256 op.add_option("-k", "--debug-streamers", dest = "debug_streamers",
0257 action = "store_true", default = False,
0258 help = "Show debug information on StreamerInfo objects")
0259 op.add_option("-l", dest = "long_listing",
0260 action = "store_true", default = False,
0261 help = "Enable long listing")
0262 op.add_option("-r", dest = "fetch_root",
0263 action = "store_true", default = False,
0264 help = "Fetch root content!")
0265 op.add_option("-w", dest = "write",
0266 action = "store_true", default = False,
0267 help = "Write fetched root objects on disk")
0268 op.add_option("-n", "--connections", dest = "connections",
0269 type = "int", action = "store", metavar = "NUM",
0270 default = 10, help = "Use NUM concurrent connections")
0271 op.add_option("-s", "--server", dest = "server",
0272 type = "string", action = "store", metavar = "SERVER",
0273 default = "https://cmsweb.cern.ch/dqm/relval/data/json",
0274 help = "Pull content from SERVER")
0275 op.add_option("-e", "--samples", dest = "sample_expr", metavar = "EXPR",
0276 help = "Evaluate EXPR to decide which samples to scan")
0277 op.add_option("-f", "--filter", dest = "glob",
0278 type = "string", action = "append", metavar = "GLOB",
0279 default = [],
0280 help = "Filter monitor elements matching GLOB pattern")
0281 options, args = op.parse_args()
0282 if args:
0283 print("Too many arguments", file=sys.stderr)
0284 sys.exit(1)
0285 if not options.sample_expr:
0286 print("Sample predicate expression required", file=sys.stderr)
0287 sys.exit(1)
0288 if not options.glob:
0289 print("Monitor element filter expression(s) required", file=sys.stderr)
0290 sys.exit(1)
0291 if not options.server:
0292 print("Server contact string required", file=sys.stderr)
0293 sys.exit(1)
0294
0295 # Adjust options
0296 if options.debug:
0297 options.verbose = True
0298
0299 if options.write:
0300 options.fetch_root = True
0301 from ROOT import TFile
0302 from DQMServices.Components.ROOTData import *
0303
0304 ssl_opts = SSLOptions()
0305 if options.verbose:
0306 print("Using SSL cert dir", ssl_opts.ca_path)
0307 print("Using SSL private key", ssl_opts.key_file)
0308 print("Using SSL public key", ssl_opts.cert_file)
0309
0310 # Convert each glob pattern into a filter expression.
0311 filter_sets = map(lambda glob: pattern_to_filter(glob), options.glob)
0312
0313 # Start a request manager.
0314 reqman = RequestManager(num_connections = options.connections,
0315 ssl_opts = ssl_opts,
0316 user_agent = ident,
0317 request_init = request_init,
0318 request_respond = process,
0319 request_error = report_error)
0320
0321 # Process all samples matching the predicate.
0322 ntotreq = 0
0323 nfound = 0
0324 start = time()
0325 for sample in find_matching_samples(options):
0326 tstreamerinfo = None
0327 if options.write:
0328 tstreamerinfo = fetch_tstreamerinfo(options, "%(run)d%(dataset)s" % sample)
0329 literal2root(tstreamerinfo, "TStreamerInfo", options.debug_streamers)
0330
0331 nreq = 0
0332 found = []
0333 sample['section'] = 'archive'
0334 if options.verbose:
0335 print("Scanning %s" % sample)
0336 reqman.put((options, sample, "/", list(map(lambda f: (f, [0]), filter_sets))))
0337 reqman.process()
0338 if options.verbose:
0339 print()
0340 if found:
0341 print("%(section)s/%(run)d%(dataset)s:" % sample)
0342 found.sort()
0343 cwd = None
0344
0345 if options.dqmCompliant:
0346 fname = "DQM_V0001_R%09d" % sample['run'] \
0347 + ("_%(dataset)s.root" % sample)[1:].replace("/", "__")
0348 else:
0349 fname = ("%(dataset)s__run%(run)s.root" % sample)[1:].replace("/", "__")
0350
0351 # If writing, create output file with directories
0352 ofile = None
0353 if options.write:
0354 ofile = TFile(fname, "RECREATE")
0355 ofile.cd()
0356
0357 for path, item in found:
0358 if options.dqmCompliant:
0359 path = "DQMData/Run %d" % sample['run'] \
0360 + re.sub("^/"+path.split('/')[1],
0361 "/" + path.split('/')[1] + "/Run summary",
0362 path)
0363
0364 # We are treating a directory
0365 if 'subdir' in item:
0366 print(" %s/%s" % (path, (options.long_listing and " = directory") or ""))
0367 if options.write:
0368 tfile_cd(path,ofile)
0369
0370 # We are treating an int/double/string
0371 elif 'value' in item:
0372 print(" %s%s" % (path, (options.long_listing and " = %s" % item['value']) or ""))
0373
0374 # We have a TObject: histo, graph, profile...; maybe write to a file.
0375 else:
0376 message = " %s" % path
0377 if options.long_listing:
0378 message += " = [%s # %d]" % (item['properties']['type'], item['nentries'])
0379 if options.debug:
0380 message += " %s" % item['rootobj']
0381 print(message)
0382 if options.write:
0383 indir = path.rsplit("/", 1)[0]
0384 if cwd != indir:
0385 tfile_cd(indir, ofile)
0386 cwd = indir
0387 obj = literal2root(item['rootobj'], item['properties']['type'])
0388 obj.Write()
0389
0390 if options.write and ofile:
0391 ofile.Close()
0392
0393 nfound += len(found)
0394 ntotreq += nreq
0395 end = time()
0396
0397 if options.verbose:
0398 print("\nFound %d objects in %d directories in %.3f seconds" % (nfound, ntotreq, end - start))