File indexing completed on 2024-04-06 12:01:30
0001
0002 import argparse
0003 import sys
0004 import logging
0005 import sqlalchemy
0006 import copy
0007 import h5py
0008 import numpy as np
0009 import multiprocessing as mp
0010 from collections import OrderedDict
0011
0012 from CondCore.CondHDF5ESSource.hdf5Writer import writeH5File
0013 import CondCore.Utilities.conddblib as conddb
0014
0015
0016
0017
0018
0019
0020
0021
0022
0023
0024
0025
0026
0027
0028
0029
0030 def _inserted_before(_IOV,timestamp):
0031 '''To be used inside filter().
0032 '''
0033
0034 if timestamp is None:
0035
0036
0037
0038
0039 return sqlalchemy.literal(True) == sqlalchemy.literal(True)
0040
0041 return _IOV.insertion_time <= _parse_timestamp(timestamp)
0042
0043 def _parse_timestamp(timestamp):
0044 try:
0045 return datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
0046 except ValueError:
0047 pass
0048
0049 try:
0050 return datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
0051 except ValueError:
0052 pass
0053
0054 try:
0055 return datetime.datetime.strptime(timestamp, '%Y-%m-%d')
0056 except ValueError:
0057 pass
0058
0059 raise Exception("Could not parse timestamp '%s'" % timestamp)
0060
0061 def _exists(session, primary_key, value):
0062 ret = None
0063 try:
0064 ret = session.query(primary_key).\
0065 filter(primary_key == value).\
0066 count() != 0
0067 except sqlalchemy.exc.OperationalError:
0068 pass
0069
0070 return ret
0071
0072 def _connect(db, init, read_only, args, as_admin=False):
0073
0074 logging.debug('Preparing connection to %s ...', db)
0075
0076 url = conddb.make_url( db, read_only)
0077 pretty_url = url
0078 if url.drivername == 'oracle+frontier':
0079 ws = url.host.rsplit('%2F')
0080 if ws is not None:
0081 pretty_url = 'frontier://%s/%s' %(ws[-1],url.database)
0082 connTo = '%s [%s]' %(db,pretty_url)
0083 logging.info('Connecting to %s', connTo)
0084 logging.debug('DB url: %s',url)
0085 verbose= 0
0086 if args.verbose is not None:
0087 verbose = args.verbose - 1
0088 connection = conddb.connect(url, args.authPath, verbose, as_admin)
0089
0090
0091 if not read_only:
0092 if connection.is_read_only:
0093 raise Exception('Impossible to edit a read-only database.')
0094
0095 if connection.is_official:
0096 if args.force:
0097 if not args.yes:
0098 logging.warning('You are going to edit an official database. If you are not one of the Offline DB experts but have access to the password for other reasons, please stop now.')
0099 else:
0100 raise Exception('Editing official databases is forbidden. Use the official DropBox to upload conditions. If you need a special intervention on the database, see the contact help: %s' % conddb.contact_help)
0101
0102 if url.drivername == 'sqlite':
0103 if init:
0104 connection.init()
0105 if not connection._is_valid:
0106 raise Exception('No valid schema found in the database.')
0107
0108 return connection
0109
0110
0111 def connect(args, init=False, read_only=True, as_admin=False):
0112 args.force = args.force if 'force' in dir(args) else False
0113
0114 if 'destdb' in args:
0115 if args.destdb is None:
0116 args.destdb = args.db
0117 if args.db == args.destdb:
0118 conn1 = _connect(args.destdb, init, read_only, args)
0119 return conn1, conn1
0120 conn1 = _connect( args.db, init, True, args)
0121 conn2url = conddb.make_url(args.destdb, False)
0122 if conn2url.drivername == 'sqlite' and not os.path.exists(args.destdb):
0123 init = True
0124 conn2 = _connect(args.destdb, init, False, args)
0125 return conn1, conn2
0126
0127 return _connect( args.db, init, read_only, args, as_admin)
0128
0129
0130 def _high(n):
0131 return int(n) >> 32
0132
0133 def _low(n):
0134 return int(n) & 0xffffffff
0135
0136
0137
0138
0139 def get_payloads_objtype_data(session, payloads):
0140
0141 Payload = session.get_dbtype(conddb.Payload)
0142 table = session.query(Payload.hash, Payload.object_type, Payload.data).\
0143 filter(Payload.hash.in_(payloads)).order_by(Payload.hash).all()
0144 return table
0145
0146 def external_process_get_payloads_objtype_data(queue, args, payloads):
0147 connection = connect(args)
0148 session = connection.session()
0149 queue.put(get_payloads_objtype_data(session, payloads))
0150
0151
0152 class IOVSyncValue(object):
0153 def __init__(self, high, low):
0154 self.high = high
0155 self.low = low
0156
0157 class DBPayloadIterator(object):
0158 def __init__(self, args, payloads):
0159 self._args = args
0160 self._payloadHashs = payloads
0161 self._payloadCache = {}
0162 self._payloadHashsIndex = 0
0163 self._cacheChunking = 1
0164 self._safeChunkingSize = 1
0165 self._nextIndex = 0
0166 def __iter__(self):
0167 return self
0168 def __next__(self):
0169 if self._nextIndex >= len(self._payloadHashs):
0170 raise StopIteration()
0171 payloadHash = self._payloadHashs[self._nextIndex]
0172 if not self._payloadCache:
0173 self._cacheChunking = self._safeChunkingSize
0174 queue = mp.Queue()
0175 p=mp.Process(target=external_process_get_payloads_objtype_data, args=(queue, self._args, self._payloadHashs[self._payloadHashsIndex:self._payloadHashsIndex+self._cacheChunking]))
0176 p.start()
0177 table = queue.get()
0178 p.join()
0179
0180
0181 self._payloadHashsIndex +=self._cacheChunking
0182 for r in table:
0183 self._payloadCache[r[0]] = (r[1],r[2])
0184 objtype,data = self._payloadCache[payloadHash]
0185 if len(data) < 1000000:
0186 self._safeChunkingSize = 10
0187 del self._payloadCache[payloadHash]
0188 self._nextIndex +=1
0189 return DBPayload(payloadHash, canonicalProductName(objtype.encode("ascii")), data)
0190
0191
0192 class DBPayload(object):
0193 def __init__(self,hash_, type_, data):
0194 self._hash = hash_
0195 self._type = type_
0196 self._data = data
0197 def name(self):
0198 return self._hash
0199 def actualType(self):
0200 return self._type
0201 def data(self):
0202 return self._data
0203
0204 class DBDataProduct(object):
0205 def __init__(self, ctype, label, payloadHashes, args):
0206 self._type = ctype
0207 self._label = label
0208 self._payloadHashs = payloadHashes
0209 self._args = args
0210
0211 def name(self):
0212 return self._type +"@"+self._label
0213 def objtype(self):
0214 return self._type
0215 def payloads(self):
0216 return DBPayloadIterator(self._args, self._payloadHashs)
0217
0218 class DBTag(object):
0219 def __init__(self, session, args, record, productNtags):
0220 self._session = session
0221 self._args = args
0222 self._snapshot = args.snapshot
0223 self._record = record
0224 self._productLabels = [x[0] for x in productNtags]
0225 self._dbtags = [x[1] for x in productNtags]
0226 self._type = None
0227 self._iovsNPayloads = None
0228 self._time_type = None
0229 def record(self):
0230 return self._record
0231 def name(self):
0232 if len(self._dbtags) == 1:
0233 return self._dbtags[0]
0234 return self._dbtags[0]+"@joined"
0235 def __type(self):
0236 if self._type is None:
0237 self._type = recordToType(self._record)
0238 return self._type
0239 def time_type(self):
0240 if self._time_type is None:
0241 self.iovsNPayloadNames()
0242 return timeTypeName(self._time_type)
0243 def originalTagNames(self):
0244 return self._dbtags
0245 def iovsNPayloadNames(self):
0246 if self._iovsNPayloads is None:
0247 finalIOV = []
0248 for tag in self._dbtags:
0249 time_type, iovAndPayload = tagInfo(self._session, tag, self._snapshot)
0250 self._time_type = time_type
0251 if not finalIOV:
0252 finalIOV = [ [i[0],[i[1]]] for i in iovAndPayload]
0253 else:
0254 finalIOV = mergeIOVs(finalIOV, iovAndPayload)
0255
0256 firstValues, lastValues = sinceToIOV( (x[0] for x in finalIOV), time_type)
0257
0258 self._iovsNPayloads = list(zip((IOVSyncValue(x[0],x[1]) for x in firstValues), (IOVSyncValue(x[0], x[1]) for x in lastValues), (x[1] for x in finalIOV)))
0259 self._session.flush()
0260 self._session.commit()
0261 return self._iovsNPayloads
0262
0263 def dataProducts(self):
0264 t = self.__type()
0265 iovs = self.iovsNPayloadNames()
0266 payloadForProducts = []
0267 for p in self._productLabels:
0268 payloadForProducts.append(OrderedDict())
0269 for first,last,payloads in iovs:
0270 for i,p in enumerate(payloads):
0271 if p is not None:
0272 payloadForProducts[i][p]=None
0273 return [DBDataProduct(t,v,list(payloadForProducts[i]), self._args) for i,v in enumerate(self._productLabels)]
0274
0275 class DBGlobalTag(object):
0276 def __init__(self, args, session, name):
0277 self._session = session
0278 self._args = args
0279 self._snapshot = args.snapshot
0280 self._name = name
0281 self._tags = []
0282 gt = globalTagInfo(session,name)
0283 lastRcd = None
0284 tags = []
0285 for rcd, label, tag in gt:
0286 if rcd != lastRcd:
0287 if lastRcd is not None:
0288 self._tags.append(DBTag(session,args, lastRcd,tags))
0289 lastRcd = rcd
0290 tags = []
0291 tags.append((label,tag))
0292 if lastRcd is not None:
0293 self._tags.append(DBTag(session,args, lastRcd, tags))
0294 def tags(self):
0295 return self._tags
0296
0297 def timeTypeName(time_type):
0298 if time_type == conddb.TimeType.Time.value:
0299 return 'time'
0300 if time_type == conddb.TimeType.Run.value or time_type == conddb.TimeType.Lumi.value:
0301 return 'run_lumi'
0302 raise RuntimeError("unknown since time %s:"% str(time_type))
0303
0304
0305
0306 def parseSince(time_type, since):
0307 if time_type == conddb.TimeType.Time.value:
0308 return (_high(since), _low(since))
0309 if time_type == conddb.TimeType.Run.value:
0310 return (_high(since), 0)
0311 if time_type == conddb.TimeType.Lumi.value:
0312 return (_high(since), _low(since))
0313
0314 def previousSyncValue(syncValue):
0315 if syncValue[1] == 0:
0316 return (syncValue[0]-1, 0xffffffff)
0317 return (syncValue[0], syncValue[1]-1)
0318
0319 def sinceToIOV(sinceList, time_type):
0320 firstValues = []
0321 lastValues = []
0322 for since in sinceList:
0323 syncValue = parseSince(time_type, since)
0324 firstValues.append(syncValue)
0325 if len(firstValues) != 1:
0326 lastValues.append(previousSyncValue(syncValue))
0327 lastValues.append((0xFFFFFFFF,0xFFFFFFFF))
0328 return [firstValues,lastValues]
0329
0330 def globalTagInfo(session,name):
0331 GlobalTag = session.get_dbtype(conddb.GlobalTag)
0332 GlobalTagMap = session.get_dbtype(conddb.GlobalTagMap)
0333 try:
0334 is_global_tag = _exists(session, GlobalTag.name, name)
0335 if is_global_tag:
0336 return session.query(GlobalTagMap.record, GlobalTagMap.label, GlobalTagMap.tag_name).\
0337 filter(GlobalTagMap.global_tag_name == name).\
0338 order_by(GlobalTagMap.record, GlobalTagMap.label).\
0339 all()
0340 except sqlalchemy.exc.OperationalError:
0341 sys.stderr.write("No table for GlobalTags found in DB.\n\n")
0342 return None
0343
0344 def tagInfo(session, name, snapshot):
0345 Tag = session.get_dbtype(conddb.Tag)
0346 IOV = session.get_dbtype(conddb.IOV)
0347 is_tag = _exists(session, Tag.name, name)
0348 if is_tag:
0349 time_type = session.query(Tag.time_type).\
0350 filter(Tag.name == name).\
0351 scalar()
0352
0353 rawTagInfo = session.query(IOV.since, IOV.insertion_time, IOV.payload_hash).\
0354 filter(
0355 IOV.tag_name == name,
0356 _inserted_before(IOV,snapshot),
0357 ).\
0358 order_by(IOV.since.desc(), IOV.insertion_time.desc()).\
0359 from_self().\
0360 order_by(IOV.since, IOV.insertion_time).\
0361 all()
0362 filteredTagInfo = []
0363 lastSince = -1
0364 for since,insertion,payload in rawTagInfo:
0365 if lastSince == since:
0366 continue
0367 lastSince = since
0368 if time_type == conddb.TimeType.Run.value:
0369
0370
0371 since = int(since) << 32
0372 filteredTagInfo.append((since,payload))
0373
0374 if time_type == conddb.TimeType.Run.value:
0375 time_type = conddb.TimeType.Lumi.value
0376
0377 return time_type, filteredTagInfo
0378
0379
0380
0381
0382 def _checkMerge(previousIOV, newIOV, debugCopy, nExistingDataProducts):
0383
0384
0385 previousSince = -1
0386 for i,e in enumerate(previousIOV):
0387 if len(e[1]) != nExistingDataProducts+1:
0388 raise RuntimeError("entry %i has wrong number of elements %i instead of %i"%(i,len(e[1]),nExistingDataProducts+1))
0389 if previousSince >= e[0]:
0390
0391 raise RuntimeError("IOV not in order for index %i"%i)
0392 previousSince = e[0]
0393
0394 previousIndex = 0
0395 debugIndex =0
0396 while debugIndex < len(debugCopy) and previousIndex < len(previousIOV):
0397 previousSince = previousIOV[previousIndex][0]
0398 debugSince = debugCopy[debugIndex][0]
0399
0400
0401
0402 if debugSince != previousSince:
0403 previousIndex +=1
0404 continue
0405 if debugCopy[debugIndex][1] != previousIOV[previousIndex][1][:nExistingDataProducts]:
0406 raise RuntimeError("packaged were not properly copied for index %i original:%s new:%s"%(debugIndex,",".join(debugCopy[debugIndex][1]),",".join(previousIOV[previousIndex][1][:nExistingDataProducts])))
0407 debugIndex +=1
0408 previousIndex +=1
0409 if debugIndex != len(debugCopy):
0410 raise RuntimeError("failed to copy forward index %i"%debugIndex)
0411 newIndex = 0
0412 previousIndex = 0
0413 while newIndex < len(newIOV) and previousIndex < len(previousIOV):
0414 previousSince = previousIOV[previousIndex][0]
0415 newSince = newIOV[newIndex][0]
0416 if newSince != previousSince:
0417 previousIndex +=1
0418 continue
0419 if previousIOV[previousIndex][1][-1] != newIOV[newIndex][1]:
0420 raise RuntimeError("failed to append package at index %i"%newIndex)
0421 previousIndex +=1
0422 newIndex +=1
0423 if newIndex != len(newIOV):
0424 raise RuntimeError("failed to merge IOV entry %i"%newIndex)
0425
0426
0427 def mergeIOVs(previousIOV, newIOV):
0428 debugCopy = copy.deepcopy(previousIOV)
0429 previousSize = len(previousIOV)
0430 newSize = len(newIOV)
0431 previousIndex = 0
0432 newIndex =0
0433 nExistingDataProducts = len(previousIOV[0][1])
0434 while newIndex < newSize and previousIndex < previousSize:
0435
0436 previousSince = previousIOV[previousIndex][0]
0437 newSince = newIOV[newIndex][0]
0438 if previousSince == newSince:
0439 previousIOV[previousIndex][1].append(newIOV[newIndex][1])
0440 newIndex +=1
0441 previousIndex +=1
0442 continue
0443 elif newSince < previousSince:
0444 if previousIndex == 0:
0445 payloads = [None]*nExistingDataProducts
0446 payloads.append(newIOV[newIndex][1])
0447 previousIOV.insert(0,[newSince,payloads])
0448 else:
0449 payloads = previousIOV[previousIndex-1][1][:nExistingDataProducts]
0450 payloads.append(newIOV[newIndex][1])
0451 previousIOV.insert(previousIndex,[newSince,payloads])
0452 newIndex +=1
0453 previousIndex +=1
0454 previousSize +=1
0455 elif newSince > previousSince:
0456 if newIndex == 0:
0457 previousIOV[previousIndex][1].append(None)
0458 else:
0459 if len(previousIOV[previousIndex][1]) == nExistingDataProducts:
0460 previousIOV[previousIndex][1].append(newIOV[newIndex-1][1])
0461 previousIndex +=1
0462 if newIndex != newSize:
0463
0464
0465 previousPayloads = previousIOV[-1][1]
0466 while newIndex != newSize:
0467 newPayloads = previousPayloads[:]
0468 newPayloads[nExistingDataProducts] = newIOV[newIndex][1]
0469 previousIOV.append([newIOV[newIndex][0], newPayloads])
0470 newIndex +=1
0471 if previousIndex != previousSize:
0472
0473 while previousIndex < previousSize:
0474 previousIOV[previousIndex][1].append(newIOV[-1][1])
0475 previousIndex +=1
0476 _checkMerge(previousIOV, newIOV, debugCopy, nExistingDataProducts)
0477 return previousIOV
0478
0479 def writeTagImpl(tagsGroup, name, recName, time_type, IOV_payloads, payloadToRefs, originalTagNames):
0480 tagGroup = tagsGroup.create_group(name)
0481 tagGroup.attrs["time_type"] = time_type.encode("ascii")
0482 tagGroup.attrs["db_tags"] = [x.encode("ascii") for x in originalTagNames]
0483 tagGroup.attrs["record"] = recName.encode("ascii")
0484 firstValues = [x[0] for x in IOV_payloads]
0485 lastValues = [x[1] for x in IOV_payloads]
0486 syncValueType = np.dtype([("high", np.uint32),("low", np.uint32)])
0487 first_np = np.empty(shape=(len(IOV_payloads),), dtype=syncValueType)
0488 first_np['high'] = [ x.high for x in firstValues]
0489 first_np['low'] = [ x.low for x in firstValues]
0490 last_np = np.empty(shape=(len(lastValues),), dtype=syncValueType)
0491 last_np['high'] = [ x.high for x in lastValues]
0492 last_np['low'] = [ x.low for x in lastValues]
0493
0494
0495 payloads = [ [ payloadToRefs[y] for y in x[2]] for x in IOV_payloads]
0496 compressor = None
0497 if len(first_np) > 100:
0498 compressor = 'gzip'
0499 tagGroup.create_dataset("first",data=first_np, compression = compressor)
0500 tagGroup.create_dataset("last",data=last_np, compression = compressor)
0501 tagGroup.create_dataset("payload", data=payloads, dtype=h5py.ref_dtype, compression = compressor)
0502 return tagGroup.ref
0503
0504
0505 def writeTag(tagsGroup, time_type, IOV_payloads, payloadToRefs, originalTagNames, recName):
0506 name = originalTagNames[0]
0507 if len(originalTagNames) != 1:
0508 name = name+"@joined"
0509 return writeTagImpl(tagsGroup, name, recName, time_type, IOV_payloads, payloadToRefs, originalTagNames)
0510
0511
0512 def recordToType(record):
0513 import subprocess
0514 return subprocess.run(["condRecordToDataProduct",record], capture_output = True, check=True, text=True).stdout
0515
0516 __typedefs = {b"ESCondObjectContainer<ESPedestal>":"ESPedestals",
0517 b"ESCondObjectContainer<float>":"ESFloatCondObjectContainer",
0518 b"ESCondObjectContainer<ESChannelStatusCode>":"ESChannelStatus",
0519 b"EcalCondObjectContainer<EcalPedestal>":"EcalPedestals",
0520 b"EcalCondObjectContainer<EcalXtalGroupId>":"EcalWeightXtalGroups",
0521 b"EcalCondObjectContainer<EcalMGPAGainRatio>":"EcalGainRatios",
0522 b"EcalCondObjectContainer<float>":"EcalFloatCondObjectContainer",
0523 b"EcalCondObjectContainer<EcalChannelStatusCode>":"EcalChannelStatus",
0524 b"EcalCondObjectContainer<EcalMappingElement>":"EcalMappingElectronics",
0525 b"EcalCondObjectContainer<EcalTPGPedestal>":"EcalTPGPedestals",
0526 b"EcalCondObjectContainer<EcalTPGLinearizationConstant>":"EcalTPGLinearizationConst",
0527 b"EcalCondObjectContainer<EcalTPGCrystalStatusCode>":"EcalTPGCrystalStatus",
0528 b"EcalCondTowerObjectContainer<EcalChannelStatusCode>":"EcalDCSTowerStatus",
0529 b"EcalCondTowerObjectContainer<EcalDAQStatusCode>":"EcalDAQTowerStatus",
0530 b"EcalCondObjectContainer<EcalDQMStatusCode>":"EcalDQMChannelStatus",
0531 b"EcalCondTowerObjectContainer<EcalDQMStatusCode>":"EcalDQMTowerStatus",
0532 b"EcalCondObjectContainer<EcalPulseShape>":"EcalPulseShapes",
0533 b"EcalCondObjectContainer<EcalPulseCovariance>":"EcalPulseCovariances",
0534 b"EcalCondObjectContainer<EcalPulseSymmCovariance>":"EcalPulseSymmCovariances",
0535 b"HcalItemCollById<HFPhase1PMTData>": "HFPhase1PMTParams",
0536 b"l1t::CaloParams":"CaloParams",
0537 b"StorableDoubleMap<AbsOOTPileupCorrection>":"OOTPileupCorrectionMapColl",
0538 b"PhysicsTools::Calibration::Histogram3D<double,double,double,double>":"PhysicsTools::Calibration::HistogramD3D",
0539 b"PhysicsTools::Calibration::MVAComputerContainer":"MVAComputerContainer"
0540 }
0541 def canonicalProductName(product):
0542 return __typedefs.get(product,product)
0543
0544 def main():
0545 parser = argparse.ArgumentParser(description='Read from CMS Condition DB and write to HDF5 file')
0546 parser.add_argument('--db', '-d', default='pro', help='Database to run the command on. Run the help subcommand for more information: conddb help')
0547 parser.add_argument('name', nargs='+', help="Name of the global tag.")
0548 parser.add_argument('--verbose', '-v', action='count', help='Verbosity level. -v prints debugging information of this tool, like tracebacks in case of errors. -vv prints, in addition, all SQL statements issued. -vvv prints, in addition, all results returned by queries.')
0549 parser.add_argument('--authPath','-a', default=None, help='Path of the authentication .netrc file. Default: the content of the COND_AUTH_PATH environment variable, when specified.')
0550 parser.add_argument('--snapshot', '-T', default=None, help="Snapshot time. If provided, the output will represent the state of the IOVs inserted into database up to the given time. The format of the string must be one of the following: '2013-01-20', '2013-01-20 10:11:12' or '2013-01-20 10:11:12.123123'.")
0551 parser.add_argument('--exclude', '-e', nargs='*', help = 'list of records to exclude from the file (can not be used with --include)')
0552 parser.add_argument('--include', '-i', nargs='*', help = 'lost of the only records that should be included in the file (can not be used with --exclude')
0553 parser.add_argument('--output', '-o', default='test.h5cond', help='name of hdf5 output file to write')
0554 parser.add_argument('--compressor', '-c', default='zlib', choices =['zlib','lzma','none'], help="compress data using 'zlib', 'lzma' or 'none'")
0555 args = parser.parse_args()
0556
0557 if args.exclude and args.include:
0558 print("Can not use --exclude and --include at the same time")
0559 exit(-1)
0560
0561 connection = connect(args)
0562 session = connection.session()
0563
0564 excludeRecords = set()
0565 if args.exclude:
0566 excludeRecords = set(args.exclude)
0567 includeRecords = set()
0568 if args.include:
0569 includeRecords = set(args.include)
0570
0571 writeH5File(args.output, args.name, excludeRecords, includeRecords, lambda x: DBGlobalTag(args, session, x), args.compressor )
0572
0573 if __name__ == '__main__':
0574 main()