Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:01:55

0001 """
0002 
0003 This file contains all testing suites to be used to test the framework.
0004 
0005 Note: avoid checking for lengths of lists, or characteristics of data from db.
0006 The db may change + this shouldn't cause tests to fail.
0007 
0008 TODO: Change code so that all connections are used when testing queries - if this isn't too bad a thing to do with the DBs.
0009 
0010 """
0011 from __future__ import print_function
0012 
0013 import unittest
0014 import sys
0015 import datetime
0016 import pprint
0017 
0018 import CondCore.Utilities.CondDBFW.querying as querying
0019 import CondCore.Utilities.CondDBFW.data_sources as data_sources
0020 import CondCore.Utilities.CondDBFW.data_formats as data_formats
0021 import CondCore.Utilities.CondDBFW.shell as shell
0022 from CondCore.Utilities.CondDBFW import querying_framework_api as api
0023 
0024 secrets_file = "/afs/cern.ch/cms/DB/conddb/.cms_cond/netrc"
0025 
0026 class querying_tests(unittest.TestCase):
0027 
0028     def setUp(self):
0029         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0030         self.connection = querying.connect(connection_data)
0031 
0032     def test_check_connection(self):
0033         self.assertTrue(self.connection != None)
0034 
0035     def test_get_tag(self):
0036         # hard code tag for now
0037         tag_name = "EBAlignment_measured_v01_express"
0038         tag = self.connection.tag(name=tag_name)
0039         # this tag exists, so shouldn't be NoneType
0040         self.assertTrue(tag != None)
0041         # we gave the name, so that should at least be in the tag object
0042         self.assertTrue(tag.name != None)
0043         self.assertEqual(tag.__class__.__name__.lower(), "tag")
0044 
0045     def test_get_empty_tag(self):
0046         tag = self.connection.tag()
0047         self.assertTrue(tag != None)
0048         self.assertEqual(tag.__class__.__name__.lower(), "tag")
0049 
0050     def test_get_global_tag(self):
0051         # hard coded global tag for now
0052         global_tag_name = "74X_dataRun1_HLT_frozen_v2"
0053         global_tag = self.connection.global_tag(name=global_tag_name)
0054         self.assertTrue(global_tag != None)
0055         self.assertTrue(global_tag.name != None)
0056         self.assertTrue(global_tag.tags() != None)
0057         self.assertEqual(global_tag.__class__.__name__.lower(), "globaltag")
0058 
0059     def test_get_payload(self):
0060         # hard coded payload for now
0061         payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
0062         payload = self.connection.payload(hash=payload_hash)
0063         self.assertTrue(payload != None)
0064         self.assertEqual(payload.__class__.__name__.lower(), "payload")
0065 
0066     def test_get_empty_payload(self):
0067         payload = self.connection.payload()
0068         self.assertTrue(payload != None)
0069         self.assertEqual(payload.__class__.__name__.lower(), "payload")
0070 
0071     def test_get_parent_tags_payload(self):
0072         payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
0073         payload = self.connection.payload(hash=payload_hash)
0074         self.assertTrue(payload != None)
0075         parent_tags = payload.parent_tags()
0076         self.assertTrue(parent_tags != None)
0077 
0078     def test_get_parent_global_tags(self):
0079         tag_name = "EBAlignment_measured_v01_express"
0080         tag = self.connection.tag(name=tag_name)
0081         self.assertTrue(tag != None)
0082         parent_global_tags = tag.parent_global_tags()
0083         self.assertTrue(parent_global_tags != None)
0084 
0085     def tests_all_tags_empty_tag(self):
0086         empty_tag = self.connection.tag()
0087         all_tags = empty_tag.all(10)
0088         self.assertTrue(all_tags != None)
0089         # there are always tags in the db
0090         self.assertTrue(len(all_tags.data()) != 0)
0091 
0092     def tests_all_tags_non_empty_tag(self):
0093         tag_name = "EBAlignment_measured_v01_express"
0094         empty_tag = self.connection.tag(name=tag_name)
0095         all_tags = empty_tag.all(10)
0096         self.assertTrue(all_tags != None)
0097         # there are always tags in the db
0098         self.assertTrue(len(all_tags.data()) != 0)      
0099 
0100     def tests_all_global_tags_empty_gt(self):
0101         empty_gt = self.connection.global_tag()
0102         all_gts = empty_gt.all(10)
0103         self.assertTrue(all_gts != None)
0104         self.assertTrue(len(all_gts.data()) != 0)
0105 
0106     def test_search_everything(self):
0107         string_for_non_empty_result = "ecal"
0108         data = self.connection.search_everything(string_for_non_empty_result)
0109         self.assertTrue(len(data.get("global_tags").data()) != 0)
0110         self.assertTrue(len(data.get("tags").data()) != 0)
0111         self.assertTrue(len(data.get("payloads").data()) != 0)
0112 
0113     def test_factory_multiple_results(self):
0114         tags = self.connection.tag(time_type="Run")
0115         self.assertTrue(len(tags.data()) > 1)
0116 
0117     def test_factory_single_result(self):
0118         tag = self.connection.tag(name="EBAlignment_hlt")
0119         self.assertEqual(tag.__class__.__name__.lower(), "tag")
0120 
0121     def test_factory_empty_result(self):
0122         tag = self.connection.tag()
0123         self.assertTrue(tag.empty)
0124 
0125     def test_factory_no_result(self):
0126         tag = self.connection.tag(name="dfasdf")
0127         self.assertTrue(tag == None)
0128 
0129     def tearDown(self):
0130         self.connection.close_session()
0131 
0132 class data_sources_tests(unittest.TestCase):
0133 
0134     def setUp(self):
0135         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0136         self.connection = querying.connect(connection_data)
0137 
0138     def test_make_json_list(self):
0139         test_list = list(range(0, 10))
0140         json_list_object = data_sources.json_data_node.make(test_list)
0141         self.assertTrue(json_list_object != None)
0142         self.assertEqual(json_list_object.data(), test_list)
0143         for n in range(0, len(test_list)):
0144             self.assertEqual(json_list_object.get(n).data(), test_list[n])
0145 
0146     def test_make_json_dict(self):
0147         test_dict = {"key1" : "value1", "key2" : "value2", "key3" : "value3"}
0148         json_dict_object = data_sources.json_data_node.make(test_dict)
0149         self.assertTrue(json_dict_object != None)
0150         self.assertEqual(json_dict_object.data(), test_dict)
0151         for key in test_dict:
0152             self.assertEqual(json_dict_object.get(key).data(), test_dict[key])
0153 
0154     def test_json_navigation(self):
0155         structure = {"key1" : [{"a" : 1, "b" : 3}, {"a" : 4, "b" : 8}], "key2" : ["header1", "header2", "header3"]}
0156         json_structure_object = data_sources.json_data_node.make(structure)
0157         self.assertEqual(json_structure_object.get("key1").data(), structure["key1"])
0158         self.assertEqual(json_structure_object.get("key2").data(), structure["key2"])
0159 
0160     def test_json_building(self):
0161         structure = {"key1" : [{"a" : 1, "b" : 3}, {"a" : 4, "b" : 8}], "key2" : ["header1", "header2", "header3"]}
0162         new_structure = data_sources.json_data_node.make({})
0163         new_structure.add_key([], "key1")
0164         new_structure.get("key1").add_child({"a" : 1, "b" : 3})
0165         new_structure.get("key1").add_child({"a" : 4, "b" : 8})
0166         new_structure.add_key([], "key2")
0167         new_structure.get("key2").add_child("header1")
0168         new_structure.get("key2").add_child("header2")
0169         new_structure.get("key2").add_child("header3")
0170         self.assertEqual(new_structure.data(), structure)
0171 
0172     def test_check_types(self):
0173         test_list = list(range(0, 10))
0174         test_dict = {"key1" : "value1", "key2" : "value2", "key3" : "value3"}
0175         json_list_object = data_sources.json_data_node.make(test_list)
0176         json_dict_object = data_sources.json_data_node.make(test_dict)
0177         self.assertEqual(json_list_object.__class__.__name__, "json_list")
0178         self.assertEqual(json_dict_object.__class__.__name__, "json_dict")
0179 
0180     def test_type_all_tags(self):
0181         all_tags = self.connection.tag().all(10)
0182         self.assertEqual(all_tags.__class__.__name__, "json_list")
0183 
0184     def test_type_parent_global_tags(self):
0185         tag_name = "EBAlignment_measured_v01_express"
0186         tag = self.connection.tag(name=tag_name)
0187         self.assertTrue(tag != None)
0188         parent_gts = tag.parent_global_tags()
0189         self.assertEqual(parent_gts.__class__.__name__, "json_list")
0190 
0191     def test_type_parent_tags(self):
0192         payload_hash = "00172cd62d8abae41915978d815ae62cc08ad8b9"
0193         payload = self.connection.payload(hash=payload_hash)
0194         self.assertTrue(payload != None)
0195         parent_tags = payload.parent_tags()
0196         self.assertEqual(parent_tags.__class__.__name__, "json_list")
0197 
0198     def test_type_iovs_of_global_tag(self):
0199         global_tag_name = "74X_dataRun1_HLT_frozen_v2"
0200         global_tag = self.connection.global_tag(name=global_tag_name)
0201         self.assertTrue(global_tag != None)
0202         iovs = global_tag.iovs(valid=True)
0203 
0204     def test_validity_of_iovs_global_tag(self):
0205         global_tag_name = "74X_dataRun1_HLT_frozen_v2"
0206         global_tag = self.connection.global_tag(name=global_tag_name)
0207         self.assertTrue(global_tag != None)
0208         iovs = global_tag.iovs(valid=True)
0209         self.assertEqual(iovs.__class__.__name__, "json_list")
0210         snapshot_time = global_tag.snapshot_time
0211         for iov in iovs:
0212             insertion_time_as_datetime = datetime.datetime.strptime(iov.insertion_time, '%Y-%m-%d %H:%M:%S,%f')
0213             self.assertTrue(insertion_time_as_datetime < snapshot_time)
0214 
0215     def tearDown(self):
0216         self.connection.close_session()
0217 
0218 class data_formats_tests(unittest.TestCase):
0219 
0220     def setUp(self):
0221         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0222         self.connection = querying.connect(connection_data)
0223 
0224     def test_orm_objects_to_dicts(self):
0225         tags = self.connection.tag().all(10)
0226         list_of_dicts = data_formats._objects_to_dicts(tags)
0227         self.assertEqual(list_of_dicts.__class__.__name__, "json_list")
0228         for tag in tags:
0229             self.assertEqual(tag.__class__.__name__.lower(), "tag")
0230 
0231     def test_dicts_to_orm_objects(self):
0232         models_to_test = map(self.connection.model, ["global_tag", "tag", "payload", "iov"])
0233         for model in models_to_test:
0234             model_name = self.connection.class_name_to_column(model).lower()
0235             objects = getattr(self.connection, model_name)().all(5)
0236             dicts = data_formats._objects_to_dicts(objects)
0237             orm_objects = data_formats._dicts_to_orm_objects(self.connection.model(model_name), dicts)
0238             self.assertTrue(dicts != None)
0239             for obj in orm_objects:
0240                 self.assertEqual(self.connection.class_name_to_column(obj.__class__).lower(), model_name)
0241                 headers = model.headers
0242                 for header in headers:
0243                     try:
0244                         test = getattr(obj, header)
0245                         header_exists = True
0246                     except:
0247                         print("'%s' doesn't exist." % header)
0248                         header_exists = False
0249                     self.assertTrue(header_exists)
0250 
0251     def tearDown(self):
0252         self.connection.close_session()
0253 
0254 class shell_tests(unittest.TestCase):
0255 
0256     def setUp(self):
0257         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0258         self.connection = querying.connect(connection_data)
0259 
0260     def test_init_shell(self):
0261         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0262         connection = shell.connect(connection_data)
0263         self.assertTrue(connection != None)
0264 
0265     def tearDown(self):
0266         self.connection.close_session()
0267 
0268 class connection_tests(unittest.TestCase):
0269 
0270     def setUp(self):
0271         pass
0272 
0273     def test_orapro_connect(self):
0274         connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0275         connection = querying.connect(connection_data)
0276         self.assertTrue(connection != None)
0277         # can cause failure if the database is down
0278         close_session_result = connection.close_session()
0279         self.assertEqual(close_session_result, True)
0280 
0281     """def test_oradev_connect(self):
0282         connection_data = {"db_alias" : "oradev", "host" : "oracle", "schema" : "", "secrets" : secrets_file_1}
0283         connection = querying.connect(connection_data)
0284         self.assertTrue(connection != None)
0285         # can cause failure if the database is down
0286         close_session_result = connection.close_session()
0287         self.assertEqual(close_session_result, True)"""
0288 
0289     def test_frontier_connect(self):
0290         for alias in ["pro", "arc", "int", "dev"]:
0291             connection_data = {"db_alias" : alias, "host" : "frontier", "schema" : "cms_conditions"}
0292             connection = querying.connect(connection_data)
0293             # can cause failure if the database is down
0294             close_session_result = connection.close_session()
0295             self.assertEqual(close_session_result, True)
0296 
0297     def test_frontier_query(self):
0298             connection_data = {"db_alias" : "pro", "host" : "frontier", "schema" : "cms_conditions"}
0299             connection = querying.connect(connection_data)
0300             # query for a tag
0301             tag = connection.tag(name="EBAlignment_measured_v06_offline")
0302             # can cause failure if the database is down
0303             close_session_result = connection.close_session()
0304             self.assertEqual(close_session_result, True)
0305 
0306     def tearDown(self):
0307         pass
0308 
0309 class script_tests(unittest.TestCase):
0310 
0311     def setUp(self):
0312         self.connection_data = {"db_alias" : "orapro", "host" : "oracle", "schema" : "cms_conditions", "secrets" : secrets_file}
0313         self.connection = querying.connect(self.connection_data)
0314 
0315     def test_script(self):
0316         class script():
0317             def script(self_instance, connection):
0318                 tag = self.connection.tag(name="EBAlignment_hlt")
0319                 valid_iovs = tag.iovs()
0320                 return valid_iovs
0321         api_obj = api(self.connection_data)
0322         data = api_obj.run_script(script(), output=False)
0323         #pprint.pprint(data.data())
0324         self.assertEqual(data.get(0).data().payload_hash, "1480c559bbbdacfec514c3cbcf2eb978403efd74")
0325 
0326     def test_script_with_decorator(self):
0327         class script():
0328             @data_formats.objects_to_dicts
0329             def script(self_instance, connection):
0330                 tag = self.connection.tag(name="EBAlignment_hlt")
0331                 valid_iovs = tag.iovs()
0332                 return valid_iovs
0333         api_obj = api(self.connection_data)
0334         data = api_obj.run_script(script(), output=False)
0335         #pprint.pprint(data.data())
0336         self.assertEqual(data.get(0, "payload_hash").data(), "1480c559bbbdacfec514c3cbcf2eb978403efd74")
0337 
0338     def tearDown(self):
0339         self.connection.close_session()