File indexing completed on 2023-03-17 10:48:51
0001 import sqlalchemy
0002 import sqlalchemy.ext.declarative
0003 import logging
0004
0005 import CondCore.Utilities.credentials as auth
0006
0007 prod_db_service = ['cms_orcon_prod', 'cms_orcon_prod/cms_cond_general_w']
0008 dev_db_service = ['cms_orcoff_prep', 'cms_orcoff_prep/cms_cond_general_w']
0009 schema_dict = {'cms_orcon_prod':'cms_cond_o2o', 'cms_orcoff_prep':'cms_cond_strip'}
0010 sqlalchemy_tpl = 'oracle://%s:%s@%s'
0011 coral_tpl = 'oracle://%s/%s'
0012 private_db = 'sqlite:///post_o2o.db'
0013
0014 _Base = sqlalchemy.ext.declarative.declarative_base()
0015
0016 def make_dbtype(base_class, schema=None):
0017 import re
0018 name = re.sub('Def$', '', base_class.__name__)
0019 members = {}
0020 members['__tablename__'] = base_class.__tablename__
0021 members['__table_args__'] = None
0022 if schema:
0023 name = name + schema
0024 members['__table_args__'] = {'schema' : schema}
0025 dbType = type(name, (_Base, base_class), members)
0026 return dbType
0027
0028
0029 class DbManager(object):
0030 def __init__(self, db, authPath=None):
0031 self.authPath = authPath
0032 if db == 'prod':
0033 self.db_service = prod_db_service
0034 elif db == 'dev':
0035 self.db_service = dev_db_service
0036 elif db == 'private':
0037 self.db_service = None
0038 else:
0039 raise RuntimeError('Option db(=%s) is not in the supported database list: [prod, dev, private]' % db)
0040
0041 logging.info('Connecting to %s database' % self.db_service[0] if self.db_service else private_db)
0042
0043 self.schema = schema_dict[self.db_service[0]] if self.db_service else None
0044
0045 def get_url(self, force_schema=False):
0046 if self.db_service == None:
0047 url = private_db
0048 else:
0049 authEntry = self.db_service[1]
0050 if force_schema and self.schema:
0051 authEntry = '%s/%s' % (self.db_service[0], self.schema)
0052 username, _, pwd = auth.get_credentials( authEntry, self.authPath)
0053 url = sqlalchemy_tpl % (username, pwd, self.db_service[0])
0054 return url
0055
0056 def check_table(self, table_def, table_class):
0057 self.engine = sqlalchemy.create_engine(self.get_url())
0058 if not self.engine.has_table(table_def.__tablename__, self.schema):
0059 logging.info('Creating table %s on %s' % (table_def.__tablename__,
0060 self.db_service[0] if self.db_service else private_db))
0061 self.engine = sqlalchemy.create_engine(self.get_url(True))
0062 table_class.__table__.create(bind=self.engine)
0063 self.session = sqlalchemy.orm.scoped_session(sqlalchemy.orm.sessionmaker(bind=self.engine))
0064
0065 def connect(self, url=None):
0066 engine = sqlalchemy.create_engine(url) if url else self.engine
0067 session = sqlalchemy.orm.scoped_session(sqlalchemy.orm.sessionmaker(bind=engine))
0068 return session
0069