File indexing completed on 2024-11-25 02:29:20
0001 """
0002
0003 This file contains all testing suites to be used to test the framework.
0004
0005 Note: avoid checking for lengths of lists, or characteristics of data from db.
0006 The db may change + this shouldn't cause tests to fail.
0007
0008 TODO: Change code so that all connections are used when testing queries - if this isn't too bad a thing to do with the DBs.
0009
0010 """
0011
0012 import unittest
0013 import sys
0014 import datetime
0015 import pprint
0016
0017 import CondCore.Utilities.CondDBFW.querying as querying
0018 import CondCore.Utilities.CondDBFW.data_sources as data_sources
0019 import CondCore.Utilities.CondDBFW.data_formats as data_formats
0020 import CondCore.Utilities.CondDBFW.shell as shell
0021 from CondCore.Utilities.CondDBFW import querying_framework_api as api
0022
0023 secrets_file = "/afs/cern.ch/cms/DB/conddb/.cms_cond/netrc"
0024
0025 class querying_tests(unittest.TestCase):
0026
0027 def setUp(self):
0028 connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0029 self.connection = querying.connect(connection_data)
0030
0031 def test_check_connection(self):
0032 self.assertTrue(self.connection != None)
0033
0034 def test_get_tag(self):
0035
0036 tag_name = "EBAlignment_measured_v01_express"
0037 tag = self.connection.tag(name=tag_name)
0038
0039 self.assertTrue(tag != None)
0040
0041 self.assertTrue(tag.name != None)
0042 self.assertEqual(tag.__class__.__name__.lower(), "tag")
0043
0044 def test_get_empty_tag(self):
0045 tag = self.connection.tag()
0046 self.assertTrue(tag != None)
0047 self.assertEqual(tag.__class__.__name__.lower(), "tag")
0048
0049 def test_get_global_tag(self):
0050
0051 global_tag_name = "74X_dataRun1_HLT_frozen_v2"
0052 global_tag = self.connection.global_tag(name=global_tag_name)
0053 self.assertTrue(global_tag != None)
0054 self.assertTrue(global_tag.name != None)
0055 self.assertTrue(global_tag.tags() != None)
0056 self.assertEqual(global_tag.__class__.__name__.lower(), "globaltag")
0057
0058 def test_get_payload(self):
0059
0060 payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
0061 payload = self.connection.payload(hash=payload_hash)
0062 self.assertTrue(payload != None)
0063 self.assertEqual(payload.__class__.__name__.lower(), "payload")
0064
0065 def test_get_empty_payload(self):
0066 payload = self.connection.payload()
0067 self.assertTrue(payload != None)
0068 self.assertEqual(payload.__class__.__name__.lower(), "payload")
0069
0070 def test_get_parent_tags_payload(self):
0071 payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
0072 payload = self.connection.payload(hash=payload_hash)
0073 self.assertTrue(payload != None)
0074 parent_tags = payload.parent_tags()
0075 self.assertTrue(parent_tags != None)
0076
0077 def test_get_parent_global_tags(self):
0078 tag_name = "EBAlignment_measured_v01_express"
0079 tag = self.connection.tag(name=tag_name)
0080 self.assertTrue(tag != None)
0081 parent_global_tags = tag.parent_global_tags()
0082 self.assertTrue(parent_global_tags != None)
0083
0084 def tests_all_tags_empty_tag(self):
0085 empty_tag = self.connection.tag()
0086 all_tags = empty_tag.all(10)
0087 self.assertTrue(all_tags != None)
0088
0089 self.assertTrue(len(all_tags.data()) != 0)
0090
0091 def tests_all_tags_non_empty_tag(self):
0092 tag_name = "EBAlignment_measured_v01_express"
0093 empty_tag = self.connection.tag(name=tag_name)
0094 all_tags = empty_tag.all(10)
0095 self.assertTrue(all_tags != None)
0096
0097 self.assertTrue(len(all_tags.data()) != 0)
0098
0099 def tests_all_global_tags_empty_gt(self):
0100 empty_gt = self.connection.global_tag()
0101 all_gts = empty_gt.all(10)
0102 self.assertTrue(all_gts != None)
0103 self.assertTrue(len(all_gts.data()) != 0)
0104
0105 def test_search_everything(self):
0106 string_for_non_empty_result = "ecal"
0107 data = self.connection.search_everything(string_for_non_empty_result)
0108 self.assertTrue(len(data.get("global_tags").data()) != 0)
0109 self.assertTrue(len(data.get("tags").data()) != 0)
0110 self.assertTrue(len(data.get("payloads").data()) != 0)
0111
0112 def test_factory_multiple_results(self):
0113 tags = self.connection.tag(time_type="Run")
0114 self.assertTrue(len(tags.data()) > 1)
0115
0116 def test_factory_single_result(self):
0117 tag = self.connection.tag(name="EBAlignment_hlt")
0118 self.assertEqual(tag.__class__.__name__.lower(), "tag")
0119
0120 def test_factory_empty_result(self):
0121 tag = self.connection.tag()
0122 self.assertTrue(tag.empty)
0123
0124 def test_factory_no_result(self):
0125 tag = self.connection.tag(name="dfasdf")
0126 self.assertTrue(tag == None)
0127
0128 def tearDown(self):
0129 self.connection.close_session()
0130
0131 class data_sources_tests(unittest.TestCase):
0132
0133 def setUp(self):
0134 connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0135 self.connection = querying.connect(connection_data)
0136
0137 def test_make_json_list(self):
0138 test_list = list(range(0, 10))
0139 json_list_object = data_sources.json_data_node.make(test_list)
0140 self.assertTrue(json_list_object != None)
0141 self.assertEqual(json_list_object.data(), test_list)
0142 for n in range(0, len(test_list)):
0143 self.assertEqual(json_list_object.get(n).data(), test_list[n])
0144
0145 def test_make_json_dict(self):
0146 test_dict = {"key1" : "value1", "key2" : "value2", "key3" : "value3"}
0147 json_dict_object = data_sources.json_data_node.make(test_dict)
0148 self.assertTrue(json_dict_object != None)
0149 self.assertEqual(json_dict_object.data(), test_dict)
0150 for key in test_dict:
0151 self.assertEqual(json_dict_object.get(key).data(), test_dict[key])
0152
0153 def test_json_navigation(self):
0154 structure = {"key1" : [{"a" : 1, "b" : 3}, {"a" : 4, "b" : 8}], "key2" : ["header1", "header2", "header3"]}
0155 json_structure_object = data_sources.json_data_node.make(structure)
0156 self.assertEqual(json_structure_object.get("key1").data(), structure["key1"])
0157 self.assertEqual(json_structure_object.get("key2").data(), structure["key2"])
0158
0159 def test_json_building(self):
0160 structure = {"key1" : [{"a" : 1, "b" : 3}, {"a" : 4, "b" : 8}], "key2" : ["header1", "header2", "header3"]}
0161 new_structure = data_sources.json_data_node.make({})
0162 new_structure.add_key([], "key1")
0163 new_structure.get("key1").add_child({"a" : 1, "b" : 3})
0164 new_structure.get("key1").add_child({"a" : 4, "b" : 8})
0165 new_structure.add_key([], "key2")
0166 new_structure.get("key2").add_child("header1")
0167 new_structure.get("key2").add_child("header2")
0168 new_structure.get("key2").add_child("header3")
0169 self.assertEqual(new_structure.data(), structure)
0170
0171 def test_check_types(self):
0172 test_list = list(range(0, 10))
0173 test_dict = {"key1" : "value1", "key2" : "value2", "key3" : "value3"}
0174 json_list_object = data_sources.json_data_node.make(test_list)
0175 json_dict_object = data_sources.json_data_node.make(test_dict)
0176 self.assertEqual(json_list_object.__class__.__name__, "json_list")
0177 self.assertEqual(json_dict_object.__class__.__name__, "json_dict")
0178
0179 def test_type_all_tags(self):
0180 all_tags = self.connection.tag().all(10)
0181 self.assertEqual(all_tags.__class__.__name__, "json_list")
0182
0183 def test_type_parent_global_tags(self):
0184 tag_name = "EBAlignment_measured_v01_express"
0185 tag = self.connection.tag(name=tag_name)
0186 self.assertTrue(tag != None)
0187 parent_gts = tag.parent_global_tags()
0188 self.assertEqual(parent_gts.__class__.__name__, "json_list")
0189
0190 def test_type_parent_tags(self):
0191 payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
0192 payload = self.connection.payload(hash=payload_hash)
0193 self.assertTrue(payload != None)
0194 parent_tags = payload.parent_tags()
0195 self.assertEqual(parent_tags.__class__.__name__, "json_list")
0196
0197 def test_type_iovs_of_global_tag(self):
0198 global_tag_name = "74X_dataRun1_HLT_frozen_v2"
0199 global_tag = self.connection.global_tag(name=global_tag_name)
0200 self.assertTrue(global_tag != None)
0201 iovs = global_tag.iovs(valid=True)
0202
0203 def test_validity_of_iovs_global_tag(self):
0204 global_tag_name = "74X_dataRun1_HLT_frozen_v2"
0205 global_tag = self.connection.global_tag(name=global_tag_name)
0206 self.assertTrue(global_tag != None)
0207 iovs = global_tag.iovs(valid=True)
0208 self.assertEqual(iovs.__class__.__name__, "json_list")
0209 snapshot_time = global_tag.snapshot_time
0210 for iov in iovs:
0211 insertion_time_as_datetime = datetime.datetime.strptime(iov.insertion_time, '%Y-%m-%d %H:%M:%S,%f')
0212 self.assertTrue(insertion_time_as_datetime < snapshot_time)
0213
0214 def tearDown(self):
0215 self.connection.close_session()
0216
0217 class data_formats_tests(unittest.TestCase):
0218
0219 def setUp(self):
0220 connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0221 self.connection = querying.connect(connection_data)
0222
0223 def test_orm_objects_to_dicts(self):
0224 tags = self.connection.tag().all(10)
0225 list_of_dicts = data_formats._objects_to_dicts(tags)
0226 self.assertEqual(list_of_dicts.__class__.__name__, "json_list")
0227 for tag in tags:
0228 self.assertEqual(tag.__class__.__name__.lower(), "tag")
0229
0230 def test_dicts_to_orm_objects(self):
0231 models_to_test = map(self.connection.model, ["global_tag", "tag", "payload", "iov"])
0232 for model in models_to_test:
0233 model_name = self.connection.class_name_to_column(model).lower()
0234 objects = getattr(self.connection, model_name)().all(5)
0235 dicts = data_formats._objects_to_dicts(objects)
0236 orm_objects = data_formats._dicts_to_orm_objects(self.connection.model(model_name), dicts)
0237 self.assertTrue(dicts != None)
0238 for obj in orm_objects:
0239 self.assertEqual(self.connection.class_name_to_column(obj.__class__).lower(), model_name)
0240 headers = model.headers
0241 for header in headers:
0242 try:
0243 test = getattr(obj, header)
0244 header_exists = True
0245 except:
0246 print("'%s' doesn't exist." % header)
0247 header_exists = False
0248 self.assertTrue(header_exists)
0249
0250 def tearDown(self):
0251 self.connection.close_session()
0252
0253 class shell_tests(unittest.TestCase):
0254
0255 def setUp(self):
0256 connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0257 self.connection = querying.connect(connection_data)
0258
0259 def test_init_shell(self):
0260 connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0261 connection = shell.connect(connection_data)
0262 self.assertTrue(connection != None)
0263
0264 def tearDown(self):
0265 self.connection.close_session()
0266
0267 class connection_tests(unittest.TestCase):
0268
0269 def setUp(self):
0270 pass
0271
0272 def test_orapro_connect(self):
0273 connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0274 connection = querying.connect(connection_data)
0275 self.assertTrue(connection != None)
0276
0277 close_session_result = connection.close_session()
0278 self.assertEqual(close_session_result, True)
0279
0280 """def test_oradev_connect(self):
0281 connection_data = {"db_alias" : "oradev", "host" : "oracle", "schema" : "", "secrets" : secrets_file_1}
0282 connection = querying.connect(connection_data)
0283 self.assertTrue(connection != None)
0284 # can cause failure if the database is down
0285 close_session_result = connection.close_session()
0286 self.assertEqual(close_session_result, True)"""
0287
0288 def test_frontier_connect(self):
0289 for alias in ["pro", "arc", "int", "dev"]:
0290 connection_data = {"db_alias" : alias, "host" : "frontier", "schema" : "cms_conditions"}
0291 connection = querying.connect(connection_data)
0292
0293 close_session_result = connection.close_session()
0294 self.assertEqual(close_session_result, True)
0295
0296 def test_frontier_query(self):
0297 connection_data = {"db_alias" : "pro", "host" : "frontier", "schema" : "cms_conditions"}
0298 connection = querying.connect(connection_data)
0299
0300 tag = connection.tag(name="EBAlignment_measured_v06_offline")
0301
0302 close_session_result = connection.close_session()
0303 self.assertEqual(close_session_result, True)
0304
0305 def tearDown(self):
0306 pass
0307
0308 class script_tests(unittest.TestCase):
0309
0310 def setUp(self):
0311 self.connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0312 self.connection = querying.connect(self.connection_data)
0313
0314 def test_script(self):
0315 class script():
0316 def script(self_instance, connection):
0317 tag = self.connection.tag(name="EBAlignment_hlt")
0318 valid_iovs = tag.iovs()
0319 return valid_iovs
0320 api_obj = api(self.connection_data)
0321 data = api_obj.run_script(script(), output=False)
0322
0323 self.assertEqual(data.get(0).data().payload_hash, "1480c559bbbdacfec514c3cbcf2eb978403efd74")
0324
0325 def test_script_with_decorator(self):
0326 class script():
0327 @data_formats.objects_to_dicts
0328 def script(self_instance, connection):
0329 tag = self.connection.tag(name="EBAlignment_hlt")
0330 valid_iovs = tag.iovs()
0331 return valid_iovs
0332 api_obj = api(self.connection_data)
0333 data = api_obj.run_script(script(), output=False)
0334
0335 self.assertEqual(data.get(0, "payload_hash").data(), "1480c559bbbdacfec514c3cbcf2eb978403efd74")
0336
0337 def tearDown(self):
0338 self.connection.close_session()