File indexing completed on 2023-03-17 10:46:31
0001
0002 """
0003
0004 Joshua Dawes - CERN, CMS - The University of Manchester
0005
0006 This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data.
0007
0008 """
0009
0010 import os
0011 import json
0012 import base64
0013 from datetime import datetime
0014 from urllib.parse import urlencode
0015 import math
0016 import sys
0017 import traceback
0018 import netrc
0019
0020 from .url_query import url_query
0021 from . import models
0022 from . import errors
0023 from . import data_sources
0024 from . import querying
0025 from .errors import *
0026 from .utils import to_timestamp, to_datetime, friendly_since
0027
0028 def friendly_since(time_type, since):
0029 """
0030 Takes a since and, if it is Run-based expressed as Lumi-based, returns the run number.
0031 Otherwise, returns the since without transformations.
0032 """
0033 if time_type == "Run" and (since & 0xffffff) == 0:
0034 return since >> 32
0035 else:
0036 return since
0037
0038
0039 def log(file_handle, message):
0040 """
0041 Very simple logging function, used by output class.
0042 """
0043 file_handle.write("[%s] %s\n" % (to_timestamp(datetime.utcnow()), message))
0044
0045 def new_log_file_id():
0046 """
0047 Find a new client-side log file name.
0048
0049 Note: This cannot use the upload session token since logs need to be written before this is opened.
0050 However, this can be changed so that the filename that uses the token is written to once
0051 it is obtained.
0052 """
0053
0054
0055 log_files = [file for file in os.listdir(os.path.join(os.getcwd(), "upload_logs")) if "upload_log" in file]
0056 new_id = len(log_files)+1
0057 return new_id
0058
0059 class output():
0060 """
0061 Used to control output to the console and to the client-side log.
0062 """
0063
0064 def __init__(self, log_handle=None, verbose=False):
0065
0066 self.current_output_length = 0
0067 self._verbose = verbose
0068 self._log_handle = log_handle
0069
0070 def write(self, message="", ignore_verbose=False):
0071 """
0072 Write to the console and to the log file held by self.
0073 """
0074 if ignore_verbose:
0075 print(message)
0076 elif self._verbose:
0077 print(message)
0078 if self._log_handle != None:
0079 log(self._log_handle, message)
0080
0081 class uploader(object):
0082 """
0083 Upload session controller - creates, tracks, and deletes upload sessions on the server.
0084 """
0085
0086 def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs):
0087 """
0088 Upload constructor:
0089 Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
0090
0091 Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
0092
0093 Note: default value of service_url should be changed for production.
0094 """
0095
0096 self._debug = debug
0097 self._verbose = verbose
0098 self._testing = testing
0099
0100 self._log_data = ""
0101 self._SERVICE_URL = server
0102 self.upload_session_id = None
0103
0104
0105 self.upload_log_file_name = "upload_logs/upload_log_%d" % new_log_file_id()
0106 self._handle = open(self.upload_log_file_name, "a")
0107
0108
0109 self._outputter = output(verbose=verbose, log_handle=self._handle)
0110 self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL)
0111
0112
0113 if metadata_source == None:
0114
0115 self.exit_upload("A source of metadata must be given so CondDBFW knows how to upload conditions.")
0116 else:
0117
0118 self.metadata_source = metadata_source.data()
0119
0120
0121
0122 if self.metadata_source.get("destinationTags") == None:
0123 self.exit_upload("No destination Tag was given.")
0124 else:
0125 if type(self.metadata_source.get("destinationTags")) == dict and list(self.metadata_source.get("destinationTags").keys())[0] == None:
0126 self.exit_upload("No destination Tag was given.")
0127
0128
0129 if self.metadata_source.get("destinationDatabase") == None:
0130 self.exit_upload("No destination database was given.")
0131
0132
0133 if self.metadata_source.get("sourceDB") == None and self.metadata_source.get("hashToUse") == None:
0134 """
0135 If we have neither an sqlite file nor the command line data
0136 """
0137 self.exit_upload("You must give either an SQLite database file, or the necessary command line arguments to replace one."\
0138 + "\nSee --help for command line argument information.")
0139 elif self.metadata_source.get("sourceDB") != None:
0140 """
0141 We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
0142 We now extract the Tag and IOV data from SQLite. It is added to the dictionary for sending over HTTPs later.
0143 """
0144
0145
0146 self.input_tag = metadata_source.data().get("inputTag")
0147 if self.input_tag == None:
0148 self.exit_upload("No input Tag name was given.")
0149
0150
0151 result_dictionary = {}
0152 self.sqlite_file_name = self.metadata_source["sourceDB"]
0153 if not(os.path.isfile(self.sqlite_file_name)):
0154 self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name)
0155 sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name))
0156
0157 self._outputter.write("Getting Tag and IOVs from SQLite database.")
0158
0159
0160 tag = sqlite_con.tag(name=self.input_tag)
0161 if tag == None:
0162 self.exit_upload("The source Tag '%s' you gave was not found in the SQLite file." % self.input_tag)
0163 tag = tag.as_dicts(convert_timestamps=True)
0164
0165
0166 iovs = sqlite_con.iov(tag_name=self.input_tag)
0167 if iovs == None:
0168 self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag)
0169 iovs = iovs.as_dicts(convert_timestamps=True)
0170 iovs = [iovs] if type(iovs) != list else iovs
0171
0172 """
0173 Finally, get the list of all Payload hashes of IOVs,
0174 then compute the list of hashes for which there is no Payload for
0175 this is used later to decide if we can continue the upload if the Payload was not found on the server.
0176 """
0177 iovs_for_hashes = sqlite_con.iov(tag_name=self.input_tag)
0178 if iovs_for_hashes.__class__ == data_sources.json_list:
0179 hashes_of_iovs = iovs_for_hashes.get_members("payload_hash").data()
0180 else:
0181 hashes_of_iovs = [iovs_for_hashes.payload_hash]
0182 self.hashes_with_no_local_payload = [payload_hash for payload_hash in hashes_of_iovs if sqlite_con.payload(hash=payload_hash) == None]
0183
0184
0185 sqlite_con.close_session()
0186
0187 elif metadata_source.data().get("hashToUse") != None:
0188 """
0189 Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
0190 We now use Tag and IOV data from command line. It is added to the dictionary for sending over HTTPs later.
0191 """
0192
0193
0194 result_dictionary = {}
0195
0196 now = to_timestamp(datetime.utcnow())
0197
0198
0199 tag = self.get_tag_dictionary()
0200 self.check_response_for_error_key(tag)
0201 iovs = [{"tag_name" : self.metadata_source["destinationTag"], "since" : self.metadata_source["since"], "payload_hash" : self.metadata_source["hashToUse"],\
0202 "insertion_time" : now}]
0203
0204
0205 self.hashes_with_no_local_payload = [self.metadata_source["hashToUse"]]
0206
0207
0208
0209
0210 if tag["time_type"] == "Run":
0211 for (i, iov) in enumerate(iovs):
0212 iovs[i]["since"] = iovs[i]["since"] << 32
0213
0214 result_dictionary = {"inputTagData" : tag, "iovs" : iovs}
0215
0216
0217
0218 result_dictionary.update(metadata_source.data())
0219
0220
0221 self.data_to_send = result_dictionary
0222
0223
0224 if result_dictionary.get("since") == None:
0225 result_dictionary["since"] = sorted(iovs, key=lambda iov : iov["since"])[0]["since"]
0226 elif self.data_to_send["inputTagData"]["time_type"] == "Run":
0227
0228 self.data_to_send["since"] = self.data_to_send["since"] << 32
0229
0230 """
0231 TODO - Settle on a single destination tag format.
0232 """
0233
0234
0235 try:
0236 if type(result_dictionary["destinationTags"]) == dict:
0237 self._outputter.write("WARNING: Multiple destination tags in a single metadata source is deprecated.")
0238 except Exception as e:
0239 self._outputter.write("ERROR: %s" % str(e))
0240
0241 @check_response(check="json")
0242 def get_tag_dictionary(self):
0243 url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]}
0244 request = url_query(url=self._SERVICE_URL + "get_tag_dictionary/", url_data=url_data)
0245 response = request.send()
0246 return response
0247
0248 def check_response_for_error_key(self, response_dict, exit_if_error=True):
0249 """
0250 Checks the decoded response of an HTTP request to the server.
0251 If it is a dictionary, and one of its keys is "error", the server returned an error
0252 """
0253
0254 if type(response_dict) == dict and "error" in list(response_dict.keys()):
0255 splitter_string = "\n%s\n" % ("-"*50)
0256 self._outputter.write("\nERROR: %s" % splitter_string, ignore_verbose=True)
0257 self._outputter.write(response_dict["error"], ignore_verbose=True)
0258
0259
0260 if self._debug:
0261
0262 self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=True)
0263 if response_dict.get("traceback") != None:
0264 self._outputter.write(response_dict["traceback"], ignore_verbose=True)
0265 else:
0266 self._outputter.write("No traceback was returned from the server.", ignore_verbose=True)
0267 else:
0268 self._outputter.write("Use the --debug option to show the traceback of this error.", ignore_verbose=True)
0269
0270
0271
0272 self.write_server_side_log(response_dict.get("log_data"))
0273
0274 if exit_if_error:
0275 if self._testing:
0276 return False
0277 else:
0278 exit()
0279 elif not("error" in list(response_dict.keys())) and "log_data" in list(response_dict.keys()):
0280
0281 self._log_data = response_dict["log_data"]
0282 return True
0283
0284 def write_server_side_log(self, log_data):
0285 """
0286 Given the log data from the server, write it to a client-side log file.
0287 """
0288
0289
0290 if not(os.path.exists(os.path.join(os.getcwd(), "server_side_logs/"))):
0291 os.makedirs("server_side_logs/")
0292
0293
0294 server_log_file_name = None
0295 try:
0296
0297 if self.upload_session_id == None:
0298 raise Exception("No upload session")
0299
0300 server_log_file_name = "server_side_logs/upload_log_%s" % str(self.upload_session_id)
0301 handle = open(server_log_file_name, "w")
0302 handle.write(base64.b64decode(log_data))
0303 handle.close()
0304 except Exception as e:
0305
0306 server_log_file_name = None
0307
0308
0309
0310
0311 if server_log_file_name != None:
0312 print("Log file from server written to '%s'." % server_log_file_name)
0313 else:
0314 print("No server log file could be written locally.")
0315
0316 print("Log file from CondDBFW written to '%s'." % self.upload_log_file_name)
0317
0318 def exit_upload(self, message=None):
0319 """
0320 Used to exit the script - which only happens if an error has occurred.
0321 If the --testing flag was passed by the user, we should return False for failure, and not exit
0322 """
0323 if self.upload_session_id != None:
0324
0325 response = self.close_upload_session(self.upload_session_id)
0326 no_error = self.check_response_for_error_key(response)
0327
0328
0329 if no_error:
0330 self.write_server_side_log(self._log_data)
0331
0332 self._handle.close()
0333 if message != None:
0334 print("\n%s\n" % message)
0335 if self._testing:
0336 return False
0337 else:
0338 exit()
0339
0340 def upload(self):
0341 """
0342 Calls methods that send HTTP requests to the upload server.
0343 """
0344
0345 """
0346 Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
0347 """
0348 try:
0349
0350
0351 upload_session_data = self.get_upload_session_id()
0352 no_error = self.check_response_for_error_key(upload_session_data)
0353
0354
0355 if not(no_error) and self._testing:
0356 return False
0357
0358 self.upload_session_id = upload_session_data["id"]
0359 self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id)
0360 self.server_side_log_file = upload_session_data["log_file"]
0361
0362 except errors.NoMoreRetriesException as no_more_retries:
0363 return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.")
0364 except Exception as e:
0365
0366 self._outputter.write(traceback.format_exc(), ignore_verbose=True)
0367
0368 if not(self._verbose):
0369 self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
0370 else:
0371 self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
0372
0373 return self.exit_upload()
0374
0375 """
0376 Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
0377 """
0378 if self.data_to_send["fcsr_filter"] != None:
0379 """
0380 FCSR Filtering:
0381 Filtering the IOVs before we send them by getting the First Conditions Safe Run
0382 from the server based on the target synchronization type.
0383 """
0384 if self.data_to_send["inputTagData"]["time_type"] != "Time":
0385
0386 try:
0387 self.filter_iovs_by_fcsr(self.upload_session_id)
0388
0389
0390 except errors.NoMoreRetriesException as no_more_retries:
0391 return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
0392 except Exception as e:
0393
0394 self._outputter.write(traceback.format_exc(), ignore_verbose=True)
0395
0396 if not(self._verbose):
0397 self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
0398 else:
0399 self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
0400
0401 return self.exit_upload()
0402 else:
0403 self._outputter.write("The Tag you're uploading is time-based, so we can't do any FCSR-based validation. FCSR filtering is being skipped.")
0404
0405 """
0406 Check for the hashes that the server doesn't have - only send these (but in the next step).
0407 """
0408 try:
0409
0410 check_hashes_response = self.get_hashes_to_send(self.upload_session_id)
0411
0412 no_error = self.check_response_for_error_key(check_hashes_response)
0413
0414
0415 if not(no_error) and self._testing:
0416 return False
0417
0418
0419
0420 all_hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]]
0421 hashes_not_found = check_hashes_response["hashes_not_found"]
0422 hashes_found = list(set(all_hashes) - set(hashes_not_found))
0423 self._outputter.write("Checking for IOVs that have no Payload locally or on the server.")
0424
0425 for hash_not_found in hashes_not_found:
0426 if hash_not_found in self.hashes_with_no_local_payload:
0427 return self.exit_upload("IOV with hash '%s' does not have a Payload locally or on the server. Cannot continue." % hash_not_found)
0428
0429 for hash_found in hashes_found:
0430 if hash_found in self.hashes_with_no_local_payload:
0431 self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found)
0432
0433 self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.")
0434
0435 except errors.NoMoreRetriesException as no_more_retries:
0436
0437 return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
0438 except Exception as e:
0439
0440 self._outputter.write(traceback.format_exc(), ignore_verbose=True)
0441
0442 if not(self._verbose):
0443 self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
0444 else:
0445 self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
0446
0447 return self.exit_upload()
0448
0449 """
0450 Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
0451 exception handling is done inside this method, since it calls a method itself for each payload.
0452 """
0453 send_payloads_response = self.send_payloads(check_hashes_response["hashes_not_found"], self.upload_session_id)
0454 if self._testing and not(send_payloads_response):
0455 return False
0456
0457 """
0458 Final stage - send metadata to server (since the payloads are there now)
0459 if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
0460 """
0461 try:
0462
0463
0464 send_metadata_response = self.send_metadata(self.upload_session_id)
0465 no_error = self.check_response_for_error_key(send_metadata_response)
0466 if not(no_error) and self._testing:
0467 return False
0468
0469
0470
0471 self.write_server_side_log(self._log_data)
0472
0473 except errors.NoMoreRetriesException as no_more_retries:
0474 return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.")
0475 except Exception as e:
0476
0477 self._outputter.write(traceback.format_exc(), ignore_verbose=True)
0478
0479 if not(self._verbose):
0480 self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
0481 else:
0482 self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
0483
0484 return self.exit_upload()
0485
0486
0487 self._handle.close()
0488
0489
0490 if self._testing:
0491 return True
0492
0493 @check_response(check="json")
0494 def get_upload_session_id(self):
0495 """
0496 Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
0497 as long as the upload session is still open.
0498 """
0499 self._outputter.write("Getting upload session.")
0500
0501
0502
0503
0504
0505 body_data = base64.b64encode(json.dumps(
0506 {
0507 "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0],
0508 "username_or_token" : self.data_to_send["username"],
0509 "password" : self.data_to_send["password"]
0510 }
0511 ).encode('UTF-8'))
0512
0513 url_data = {"database" : self.data_to_send["destinationDatabase"]}
0514
0515 query = url_query(url=self._SERVICE_URL + "get_upload_session/", body=body_data, url_data=url_data)
0516 response = query.send()
0517 return response
0518
0519 @check_response(check="json")
0520 def close_upload_session(self, upload_session_id):
0521 """
0522 Close an upload session on the server by calling its close_upload_session end-point.
0523 This is done if there is an error on the client-side.
0524 """
0525 self._outputter.write("An error occurred - closing the upload session on the server.")
0526 url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
0527 query = url_query(url=self._SERVICE_URL + "close_upload_session/", url_data=url_data)
0528 response = query.send()
0529 return response
0530
0531 @check_response(check="json")
0532 def get_fcsr_from_server(self, upload_session_id):
0533 """
0534 Execute the HTTPs request to ask the server for the FCSR.
0535
0536 Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
0537 """
0538
0539
0540
0541
0542 url_data = {
0543 "database" : self.data_to_send["destinationDatabase"],
0544 "upload_session_id" : upload_session_id,
0545 "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0],
0546 "sourceTagSync" : self.data_to_send["fcsr_filter"],
0547 "tier0_response" : self.data_to_send.get("tier0_response")
0548 }
0549 query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data)
0550 result = query.send()
0551 return result
0552
0553 def filter_iovs_by_fcsr(self, upload_session_id):
0554 """
0555 Ask for the server for the FCSR based on the synchronization type of the source Tag.
0556 Then, modify the IOVs (possibly remove some) based on the FCSR we received.
0557 This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
0558 """
0559 self._outputter.write("Getting the First Condition Safe Run for the current sync type.")
0560
0561 fcsr_data = self.get_fcsr_from_server(upload_session_id)
0562 fcsr = fcsr_data["fcsr"]
0563 fcsr_changed = fcsr_data["fcsr_changed"]
0564 new_sync = fcsr_data["new_sync"]
0565
0566 if fcsr_changed:
0567 self._outputter.write("Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.data_to_send["fcsr_filter"], new_sync))
0568
0569 self._outputter.write("Synchronization '%s' gave FCSR %d for FCSR Filtering."\
0570 % (self.data_to_send["fcsr_filter"], friendly_since(self.data_to_send["inputTagData"]["time_type"], fcsr)))
0571
0572 """
0573 There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
0574 Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
0575 Note: this applies to run, lumi and timestamp run_types.
0576 """
0577
0578
0579 if fcsr > self.data_to_send["since"]:
0580
0581 if self.data_to_send["fcsr_filter"].lower() == "offline":
0582 self._outputter.write("If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
0583 self.exit_upload()
0584 self.data_to_send["since"] = fcsr
0585
0586 self._outputter.write("Final FCSR after comparison with FCSR received from server is %d."\
0587 % friendly_since(self.data_to_send["inputTagData"]["time_type"], int(self.data_to_send["since"])))
0588
0589 """
0590 Post validation processing assuming destination since is now valid.
0591
0592 Because we don't have an sqlite database to query (everything's in a dictionary),
0593 we have to go through the IOVs manually find the greatest since that's less than
0594 the destination since.
0595
0596 Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
0597 """
0598
0599 max_since_below_dest = self.data_to_send["iovs"][0]["since"]
0600 for (i, iov) in enumerate(self.data_to_send["iovs"]):
0601 if self.data_to_send["iovs"][i]["since"] <= self.data_to_send["since"] and self.data_to_send["iovs"][i]["since"] > max_since_below_dest:
0602 max_since_below_dest = self.data_to_send["iovs"][i]["since"]
0603
0604
0605
0606 self.data_to_send["iovs"] = [iov for iov in self.data_to_send["iovs"] if iov["since"] >= max_since_below_dest]
0607 for (i, iov) in enumerate(self.data_to_send["iovs"]):
0608 if self.data_to_send["iovs"][i]["since"] < self.data_to_send["since"]:
0609 self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"]
0610
0611
0612 new_time = to_timestamp(datetime.utcnow())
0613 for (i, iov) in enumerate(self.data_to_send["iovs"]):
0614 self.data_to_send["iovs"][i]["insertion_time"] = new_time
0615
0616 def get_all_hashes(self):
0617 """
0618 Get all the hashes from the dictionary of IOVs we have from the SQLite file.
0619 """
0620 self._outputter.write("\tGetting list of all hashes found in SQLite database.")
0621 hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]]
0622 return hashes
0623
0624 @check_response(check="json")
0625 def get_hashes_to_send(self, upload_session_id):
0626 """
0627 Get the hashes of the payloads we want to send that the server doesn't have yet.
0628 """
0629 self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.")
0630 post_data = json.dumps(self.get_all_hashes())
0631 url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
0632 query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data)
0633 response = query.send()
0634 return response
0635
0636 def send_payloads(self, hashes, upload_session_id):
0637 """
0638 Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
0639 """
0640
0641
0642 if len(hashes) == 0:
0643 self._outputter.write("No hashes to send - moving to metadata upload.")
0644 return True
0645 else:
0646 self._outputter.write("Sending payloads of hashes not found:")
0647
0648 database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if type(self.sqlite_file_name) == str else self.sqlite_file_name
0649
0650 self._outputter.write("\tConnecting to input SQLite database.")
0651 con = querying.connect(database, map_blobs=True)
0652
0653
0654 self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.")
0655 byte_hashes = [bytes(h, 'utf-8') for h in hashes]
0656 payloads = con.payload(hash=byte_hashes)
0657
0658 if payloads and payloads.__class__ != data_sources.json_list:
0659 payloads = data_sources.json_data_node.make([payloads])
0660
0661
0662 con.close_session()
0663
0664
0665 if payloads:
0666
0667
0668
0669
0670 dicts = payloads.as_dicts()
0671 self._outputter.write("Uploading Payload BLOBs:")
0672
0673
0674 for n, payload in enumerate(dicts):
0675 self._outputter.write("\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload["hash"]))
0676 response = self.send_blob(payload, upload_session_id)
0677
0678 no_error = self.check_response_for_error_key(response, exit_if_error=True)
0679 if not(no_error):
0680 return False
0681 self._outputter.write("\tPayload sent - moving to next one.")
0682 self._outputter.write("All Payloads uploaded.")
0683 return True
0684 else:
0685 return False
0686
0687 @check_response(check="json")
0688 def send_blob(self, payload, upload_session_id):
0689 """
0690 Send the BLOB of a payload over HTTP.
0691 The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
0692 """
0693
0694 blob_data = base64.b64encode(payload["data"])
0695
0696 url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
0697
0698
0699 for key in list(payload.keys()):
0700
0701 if key != "data":
0702 if key == "insertion_time":
0703 url_data[key] = to_timestamp(payload[key])
0704 else:
0705 url_data[key] = payload[key]
0706
0707 request = url_query(url=self._SERVICE_URL + "store_payload/", url_data=url_data, body=blob_data)
0708
0709
0710
0711 try:
0712 request_response = request.send()
0713 return request_response
0714 except Exception as e:
0715
0716 if type(e) == errors.NoMoreRetriesException:
0717 self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
0718 self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
0719 return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()})
0720
0721 @check_response(check="json")
0722 def send_metadata(self, upload_session_id):
0723 """
0724 Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
0725 The server closes the session (and releases the tag lock) after processing has been completed.
0726 """
0727
0728
0729 if self.data_to_send["userText"] in ["", None]:
0730 self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % list(self.data_to_send["destinationTags"].keys())[0]
0731
0732 self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
0733 % self.upload_session_id)
0734
0735
0736 url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id, "tier0_response" : self.data_to_send.get("tier0_response")}
0737 request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send))
0738 response = request.send()
0739 self._outputter.write("Response received - conditions upload process complete.")
0740 return response
0741
0742 if __name__ == "__main__":
0743 """
0744 This code should only be executed for testing.
0745 """
0746 import sys
0747 from .uploadConditions import parse_arguments
0748
0749 print(
0750 """
0751 This code should only be executed for testing.
0752 Any uploads done by the user should be done by calling the uploadConditions.py script.
0753 See https://cms-conddb-dev.cern.ch/cmsDbCondUpload for information on how to obtain the correct version.
0754 """
0755 )
0756
0757 upload_metadata = parse_arguments()
0758
0759 upload_metadata["sqlite_file"] = upload_metadata.get("sourceDB")
0760
0761
0762 upload_metadata_argument = {}
0763 for (key, value) in list(upload_metadata.items()):
0764 if key != "metadata_source":
0765 upload_metadata_argument[key] = value
0766
0767 upload_metadata["metadata_source"] = data_sources.json_data_node.make(upload_metadata_argument)
0768
0769 upload_controller = uploader(**upload_metadata)
0770
0771 result = upload_controller.upload()