Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:20

0001 """
0002 
0003 This file contains all testing suites to be used to test the framework.
0004 
0005 Note: avoid checking for lengths of lists, or characteristics of data from db.
0006 The db may change + this shouldn't cause tests to fail.
0007 
0008 TODO: Change code so that all connections are used when testing queries - if this isn't too bad a thing to do with the DBs.
0009 
0010 """
0011 
0012 import unittest
0013 import sys
0014 import datetime
0015 import pprint
0016 
0017 import CondCore.Utilities.CondDBFW.querying as querying
0018 import CondCore.Utilities.CondDBFW.data_sources as data_sources
0019 import CondCore.Utilities.CondDBFW.data_formats as data_formats
0020 import CondCore.Utilities.CondDBFW.shell as shell
0021 from CondCore.Utilities.CondDBFW import querying_framework_api as api
0022 
0023 secrets_file = "/afs/cern.ch/cms/DB/conddb/.cms_cond/netrc"
0024 
0025 class querying_tests(unittest.TestCase):
0026 
0027     def setUp(self):
0028         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0029         self.connection = querying.connect(connection_data)
0030 
0031     def test_check_connection(self):
0032         self.assertTrue(self.connection != None)
0033 
0034     def test_get_tag(self):
0035         # hard code tag for now
0036         tag_name = "EBAlignment_measured_v01_express"
0037         tag = self.connection.tag(name=tag_name)
0038         # this tag exists, so shouldn't be NoneType
0039         self.assertTrue(tag != None)
0040         # we gave the name, so that should at least be in the tag object
0041         self.assertTrue(tag.name != None)
0042         self.assertEqual(tag.__class__.__name__.lower(), "tag")
0043 
0044     def test_get_empty_tag(self):
0045         tag = self.connection.tag()
0046         self.assertTrue(tag != None)
0047         self.assertEqual(tag.__class__.__name__.lower(), "tag")
0048 
0049     def test_get_global_tag(self):
0050         # hard coded global tag for now
0051         global_tag_name = "74X_dataRun1_HLT_frozen_v2"
0052         global_tag = self.connection.global_tag(name=global_tag_name)
0053         self.assertTrue(global_tag != None)
0054         self.assertTrue(global_tag.name != None)
0055         self.assertTrue(global_tag.tags() != None)
0056         self.assertEqual(global_tag.__class__.__name__.lower(), "globaltag")
0057 
0058     def test_get_payload(self):
0059         # hard coded payload for now
0060         payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
0061         payload = self.connection.payload(hash=payload_hash)
0062         self.assertTrue(payload != None)
0063         self.assertEqual(payload.__class__.__name__.lower(), "payload")
0064 
0065     def test_get_empty_payload(self):
0066         payload = self.connection.payload()
0067         self.assertTrue(payload != None)
0068         self.assertEqual(payload.__class__.__name__.lower(), "payload")
0069 
0070     def test_get_parent_tags_payload(self):
0071         payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
0072         payload = self.connection.payload(hash=payload_hash)
0073         self.assertTrue(payload != None)
0074         parent_tags = payload.parent_tags()
0075         self.assertTrue(parent_tags != None)
0076 
0077     def test_get_parent_global_tags(self):
0078         tag_name = "EBAlignment_measured_v01_express"
0079         tag = self.connection.tag(name=tag_name)
0080         self.assertTrue(tag != None)
0081         parent_global_tags = tag.parent_global_tags()
0082         self.assertTrue(parent_global_tags != None)
0083 
0084     def tests_all_tags_empty_tag(self):
0085         empty_tag = self.connection.tag()
0086         all_tags = empty_tag.all(10)
0087         self.assertTrue(all_tags != None)
0088         # there are always tags in the db
0089         self.assertTrue(len(all_tags.data()) != 0)
0090 
0091     def tests_all_tags_non_empty_tag(self):
0092         tag_name = "EBAlignment_measured_v01_express"
0093         empty_tag = self.connection.tag(name=tag_name)
0094         all_tags = empty_tag.all(10)
0095         self.assertTrue(all_tags != None)
0096         # there are always tags in the db
0097         self.assertTrue(len(all_tags.data()) != 0)      
0098 
0099     def tests_all_global_tags_empty_gt(self):
0100         empty_gt = self.connection.global_tag()
0101         all_gts = empty_gt.all(10)
0102         self.assertTrue(all_gts != None)
0103         self.assertTrue(len(all_gts.data()) != 0)
0104 
0105     def test_search_everything(self):
0106         string_for_non_empty_result = "ecal"
0107         data = self.connection.search_everything(string_for_non_empty_result)
0108         self.assertTrue(len(data.get("global_tags").data()) != 0)
0109         self.assertTrue(len(data.get("tags").data()) != 0)
0110         self.assertTrue(len(data.get("payloads").data()) != 0)
0111 
0112     def test_factory_multiple_results(self):
0113         tags = self.connection.tag(time_type="Run")
0114         self.assertTrue(len(tags.data()) > 1)
0115 
0116     def test_factory_single_result(self):
0117         tag = self.connection.tag(name="EBAlignment_hlt")
0118         self.assertEqual(tag.__class__.__name__.lower(), "tag")
0119 
0120     def test_factory_empty_result(self):
0121         tag = self.connection.tag()
0122         self.assertTrue(tag.empty)
0123 
0124     def test_factory_no_result(self):
0125         tag = self.connection.tag(name="dfasdf")
0126         self.assertTrue(tag == None)
0127 
0128     def tearDown(self):
0129         self.connection.close_session()
0130 
0131 class data_sources_tests(unittest.TestCase):
0132 
0133     def setUp(self):
0134         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0135         self.connection = querying.connect(connection_data)
0136 
0137     def test_make_json_list(self):
0138         test_list = list(range(0, 10))
0139         json_list_object = data_sources.json_data_node.make(test_list)
0140         self.assertTrue(json_list_object != None)
0141         self.assertEqual(json_list_object.data(), test_list)
0142         for n in range(0, len(test_list)):
0143             self.assertEqual(json_list_object.get(n).data(), test_list[n])
0144 
0145     def test_make_json_dict(self):
0146         test_dict = {"key1" : "value1", "key2" : "value2", "key3" : "value3"}
0147         json_dict_object = data_sources.json_data_node.make(test_dict)
0148         self.assertTrue(json_dict_object != None)
0149         self.assertEqual(json_dict_object.data(), test_dict)
0150         for key in test_dict:
0151             self.assertEqual(json_dict_object.get(key).data(), test_dict[key])
0152 
0153     def test_json_navigation(self):
0154         structure = {"key1" : [{"a" : 1, "b" : 3}, {"a" : 4, "b" : 8}], "key2" : ["header1", "header2", "header3"]}
0155         json_structure_object = data_sources.json_data_node.make(structure)
0156         self.assertEqual(json_structure_object.get("key1").data(), structure["key1"])
0157         self.assertEqual(json_structure_object.get("key2").data(), structure["key2"])
0158 
0159     def test_json_building(self):
0160         structure = {"key1" : [{"a" : 1, "b" : 3}, {"a" : 4, "b" : 8}], "key2" : ["header1", "header2", "header3"]}
0161         new_structure = data_sources.json_data_node.make({})
0162         new_structure.add_key([], "key1")
0163         new_structure.get("key1").add_child({"a" : 1, "b" : 3})
0164         new_structure.get("key1").add_child({"a" : 4, "b" : 8})
0165         new_structure.add_key([], "key2")
0166         new_structure.get("key2").add_child("header1")
0167         new_structure.get("key2").add_child("header2")
0168         new_structure.get("key2").add_child("header3")
0169         self.assertEqual(new_structure.data(), structure)
0170 
0171     def test_check_types(self):
0172         test_list = list(range(0, 10))
0173         test_dict = {"key1" : "value1", "key2" : "value2", "key3" : "value3"}
0174         json_list_object = data_sources.json_data_node.make(test_list)
0175         json_dict_object = data_sources.json_data_node.make(test_dict)
0176         self.assertEqual(json_list_object.__class__.__name__, "json_list")
0177         self.assertEqual(json_dict_object.__class__.__name__, "json_dict")
0178 
0179     def test_type_all_tags(self):
0180         all_tags = self.connection.tag().all(10)
0181         self.assertEqual(all_tags.__class__.__name__, "json_list")
0182 
0183     def test_type_parent_global_tags(self):
0184         tag_name = "EBAlignment_measured_v01_express"
0185         tag = self.connection.tag(name=tag_name)
0186         self.assertTrue(tag != None)
0187         parent_gts = tag.parent_global_tags()
0188         self.assertEqual(parent_gts.__class__.__name__, "json_list")
0189 
0190     def test_type_parent_tags(self):
0191         payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
0192         payload = self.connection.payload(hash=payload_hash)
0193         self.assertTrue(payload != None)
0194         parent_tags = payload.parent_tags()
0195         self.assertEqual(parent_tags.__class__.__name__, "json_list")
0196 
0197     def test_type_iovs_of_global_tag(self):
0198         global_tag_name = "74X_dataRun1_HLT_frozen_v2"
0199         global_tag = self.connection.global_tag(name=global_tag_name)
0200         self.assertTrue(global_tag != None)
0201         iovs = global_tag.iovs(valid=True)
0202 
0203     def test_validity_of_iovs_global_tag(self):
0204         global_tag_name = "74X_dataRun1_HLT_frozen_v2"
0205         global_tag = self.connection.global_tag(name=global_tag_name)
0206         self.assertTrue(global_tag != None)
0207         iovs = global_tag.iovs(valid=True)
0208         self.assertEqual(iovs.__class__.__name__, "json_list")
0209         snapshot_time = global_tag.snapshot_time
0210         for iov in iovs:
0211             insertion_time_as_datetime = datetime.datetime.strptime(iov.insertion_time, '%Y-%m-%d %H:%M:%S,%f')
0212             self.assertTrue(insertion_time_as_datetime < snapshot_time)
0213 
0214     def tearDown(self):
0215         self.connection.close_session()
0216 
0217 class data_formats_tests(unittest.TestCase):
0218 
0219     def setUp(self):
0220         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0221         self.connection = querying.connect(connection_data)
0222 
0223     def test_orm_objects_to_dicts(self):
0224         tags = self.connection.tag().all(10)
0225         list_of_dicts = data_formats._objects_to_dicts(tags)
0226         self.assertEqual(list_of_dicts.__class__.__name__, "json_list")
0227         for tag in tags:
0228             self.assertEqual(tag.__class__.__name__.lower(), "tag")
0229 
0230     def test_dicts_to_orm_objects(self):
0231         models_to_test = map(self.connection.model, ["global_tag", "tag", "payload", "iov"])
0232         for model in models_to_test:
0233             model_name = self.connection.class_name_to_column(model).lower()
0234             objects = getattr(self.connection, model_name)().all(5)
0235             dicts = data_formats._objects_to_dicts(objects)
0236             orm_objects = data_formats._dicts_to_orm_objects(self.connection.model(model_name), dicts)
0237             self.assertTrue(dicts != None)
0238             for obj in orm_objects:
0239                 self.assertEqual(self.connection.class_name_to_column(obj.__class__).lower(), model_name)
0240                 headers = model.headers
0241                 for header in headers:
0242                     try:
0243                         test = getattr(obj, header)
0244                         header_exists = True
0245                     except:
0246                         print("'%s' doesn't exist." % header)
0247                         header_exists = False
0248                     self.assertTrue(header_exists)
0249 
0250     def tearDown(self):
0251         self.connection.close_session()
0252 
0253 class shell_tests(unittest.TestCase):
0254 
0255     def setUp(self):
0256         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0257         self.connection = querying.connect(connection_data)
0258 
0259     def test_init_shell(self):
0260         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0261         connection = shell.connect(connection_data)
0262         self.assertTrue(connection != None)
0263 
0264     def tearDown(self):
0265         self.connection.close_session()
0266 
0267 class connection_tests(unittest.TestCase):
0268 
0269     def setUp(self):
0270         pass
0271 
0272     def test_orapro_connect(self):
0273         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0274         connection = querying.connect(connection_data)
0275         self.assertTrue(connection != None)
0276         # can cause failure if the database is down
0277         close_session_result = connection.close_session()
0278         self.assertEqual(close_session_result, True)
0279 
0280     """def test_oradev_connect(self):
0281         connection_data = {"db_alias" : "oradev", "host" : "oracle", "schema" : "", "secrets" : secrets_file_1}
0282         connection = querying.connect(connection_data)
0283         self.assertTrue(connection != None)
0284         # can cause failure if the database is down
0285         close_session_result = connection.close_session()
0286         self.assertEqual(close_session_result, True)"""
0287 
0288     def test_frontier_connect(self):
0289         for alias in ["pro", "arc", "int", "dev"]:
0290             connection_data = {"db_alias" : alias, "host" : "frontier", "schema" : "cms_conditions"}
0291             connection = querying.connect(connection_data)
0292             # can cause failure if the database is down
0293             close_session_result = connection.close_session()
0294             self.assertEqual(close_session_result, True)
0295 
0296     def test_frontier_query(self):
0297             connection_data = {"db_alias" : "pro", "host" : "frontier", "schema" : "cms_conditions"}
0298             connection = querying.connect(connection_data)
0299             # query for a tag
0300             tag = connection.tag(name="EBAlignment_measured_v06_offline")
0301             # can cause failure if the database is down
0302             close_session_result = connection.close_session()
0303             self.assertEqual(close_session_result, True)
0304 
0305     def tearDown(self):
0306         pass
0307 
0308 class script_tests(unittest.TestCase):
0309 
0310     def setUp(self):
0311         self.connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0312         self.connection = querying.connect(self.connection_data)
0313 
0314     def test_script(self):
0315         class script():
0316             def script(self_instance, connection):
0317                 tag = self.connection.tag(name="EBAlignment_hlt")
0318                 valid_iovs = tag.iovs()
0319                 return valid_iovs
0320         api_obj = api(self.connection_data)
0321         data = api_obj.run_script(script(), output=False)
0322         #pprint.pprint(data.data())
0323         self.assertEqual(data.get(0).data().payload_hash, "1480c559bbbdacfec514c3cbcf2eb978403efd74")
0324 
0325     def test_script_with_decorator(self):
0326         class script():
0327             @data_formats.objects_to_dicts
0328             def script(self_instance, connection):
0329                 tag = self.connection.tag(name="EBAlignment_hlt")
0330                 valid_iovs = tag.iovs()
0331                 return valid_iovs
0332         api_obj = api(self.connection_data)
0333         data = api_obj.run_script(script(), output=False)
0334         #pprint.pprint(data.data())
0335         self.assertEqual(data.get(0, "payload_hash").data(), "1480c559bbbdacfec514c3cbcf2eb978403efd74")
0336 
0337     def tearDown(self):
0338         self.connection.close_session()