Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:01:54

0001 #!/usr/bin/env python3
0002 """
0003 
0004 Joshua Dawes - CERN, CMS - The University of Manchester
0005 
0006 This module holds classes to help with uploading conditions to the drop box web service, which also uses CondDBFW to read and write data.
0007 
0008 """
0009 
0010 import os
0011 import json
0012 import base64
0013 from datetime import datetime
0014 from urllib.parse import urlencode
0015 import math
0016 import sys
0017 import traceback
0018 import netrc
0019 
0020 from .url_query import url_query
0021 from . import models
0022 from . import errors
0023 from . import data_sources
0024 from . import querying
0025 from .errors import *
0026 from .utils import to_timestamp, to_datetime, friendly_since
0027 
0028 def friendly_since(time_type, since):
0029     """
0030     Takes a since and, if it is Run-based expressed as Lumi-based, returns the run number.
0031     Otherwise, returns the since without transformations.
0032     """
0033     if time_type == "Run" and (since & 0xffffff) == 0:
0034         return since >> 32
0035     else:
0036         return since
0037 
0038 # this is simple, and works for now - if logging requirements change, I will write a logging class to manage logging
0039 def log(file_handle, message):
0040     """
0041     Very simple logging function, used by output class.
0042     """
0043     file_handle.write("[%s] %s\n" % (to_timestamp(datetime.utcnow()), message))
0044 
0045 def new_log_file_id():
0046     """
0047     Find a new client-side log file name.
0048 
0049     Note: This cannot use the upload session token since logs need to be written before this is opened.
0050     However, this can be changed so that the filename that uses the token is written to once
0051     it is obtained.
0052     """
0053     # new id = number of log files + 1
0054     # (primitive - matching the hash of the upload session may be a better idea)
0055     log_files = [file for file in os.listdir(os.path.join(os.getcwd(), "upload_logs")) if "upload_log" in file]
0056     new_id = len(log_files)+1
0057     return new_id
0058 
0059 class output():
0060     """
0061     Used to control output to the console and to the client-side log.
0062     """
0063 
0064     def __init__(self, log_handle=None, verbose=False):
0065         # first time writing progress bar, don't need to go back along the line
0066         self.current_output_length = 0
0067         self._verbose = verbose
0068         self._log_handle = log_handle
0069 
0070     def write(self, message="", ignore_verbose=False):
0071         """
0072         Write to the console and to the log file held by self.
0073         """
0074         if ignore_verbose:
0075             print(message)
0076         elif self._verbose:
0077             print(message)
0078         if self._log_handle != None:
0079             log(self._log_handle, message)
0080 
0081 class uploader(object):
0082     """
0083     Upload session controller - creates, tracks, and deletes upload sessions on the server.
0084     """
0085 
0086     def __init__(self, metadata_source=None, debug=False, verbose=False, testing=False, server="https://cms-conddb-dev.cern.ch/cmsDbCondUpload/", **kwargs):
0087         """
0088         Upload constructor:
0089         Given an SQLite file and a Metadata sources, reads into a dictionary read for it to be encoded and uploaded.
0090 
0091         Note: kwargs is used to capture stray arguments - arguments that do not match keywords will not be used.
0092 
0093         Note: default value of service_url should be changed for production.
0094         """
0095         # set private variables
0096         self._debug = debug
0097         self._verbose = verbose
0098         self._testing = testing
0099         # initialise server-side log data as empty string - will be replaced when we get a response back from the server
0100         self._log_data = ""
0101         self._SERVICE_URL = server
0102         self.upload_session_id = None
0103 
0104         # set up client-side log file
0105         self.upload_log_file_name = "upload_logs/upload_log_%d" % new_log_file_id()
0106         self._handle = open(self.upload_log_file_name, "a")
0107 
0108         # set up client-side logging object
0109         self._outputter = output(verbose=verbose, log_handle=self._handle)
0110         self._outputter.write("Using server instance at '%s'." % self._SERVICE_URL)
0111 
0112         # expect a CondDBFW data_source object for metadata_source
0113         if metadata_source == None:
0114             # no upload metadat has been given - we cannot continue with the upload
0115             self.exit_upload("A source of metadata must be given so CondDBFW knows how to upload conditions.")
0116         else:
0117             # set up global metadata source variable
0118             self.metadata_source = metadata_source.data()
0119 
0120         # check for the destination tag
0121         # this is required whatever type of upload we're performing
0122         if self.metadata_source.get("destinationTags") == None:
0123             self.exit_upload("No destination Tag was given.")
0124         else:
0125             if type(self.metadata_source.get("destinationTags")) == dict and list(self.metadata_source.get("destinationTags").keys())[0] == None:
0126                 self.exit_upload("No destination Tag was given.")
0127 
0128         # make sure a destination database was given
0129         if self.metadata_source.get("destinationDatabase") == None:
0130             self.exit_upload("No destination database was given.")
0131 
0132         # get Conditions metadata
0133         if self.metadata_source.get("sourceDB") == None and self.metadata_source.get("hashToUse") == None:
0134             """
0135             If we have neither an sqlite file nor the command line data
0136             """
0137             self.exit_upload("You must give either an SQLite database file, or the necessary command line arguments to replace one."\
0138                             + "\nSee --help for command line argument information.")
0139         elif self.metadata_source.get("sourceDB") != None:
0140             """
0141             We've been given an SQLite file, so try to extract Conditions Metadata based on that and the Upload Metadata in metadata_source
0142             We now extract the Tag and IOV data from SQLite.  It is added to the dictionary for sending over HTTPs later.
0143             """
0144 
0145             # make sure we have an input tag to look for in the source db
0146             self.input_tag = metadata_source.data().get("inputTag")
0147             if self.input_tag == None:
0148                 self.exit_upload("No input Tag name was given.")
0149 
0150             # set empty dictionary to contain Tag and IOV data from SQLite
0151             result_dictionary = {}
0152             self.sqlite_file_name = self.metadata_source["sourceDB"]
0153             if not(os.path.isfile(self.sqlite_file_name)):
0154                 self.exit_upload("SQLite file '%s' given doesn't exist." % self.sqlite_file_name)
0155             sqlite_con = querying.connect("sqlite://%s" % os.path.abspath(self.sqlite_file_name))
0156 
0157             self._outputter.write("Getting Tag and IOVs from SQLite database.")
0158 
0159             # query for Tag, check for existence, then convert to dictionary
0160             tag = sqlite_con.tag(name=self.input_tag)
0161             if tag == None:
0162                 self.exit_upload("The source Tag '%s' you gave was not found in the SQLite file." % self.input_tag)
0163             tag = tag.as_dicts(convert_timestamps=True)
0164 
0165             # query for IOVs, check for existence, then convert to dictionaries
0166             iovs = sqlite_con.iov(tag_name=self.input_tag)
0167             if iovs == None:
0168                 self.exit_upload("No IOVs found in the SQLite file given for Tag '%s'." % self.input_tag)
0169             iovs = iovs.as_dicts(convert_timestamps=True)
0170             iovs = [iovs] if type(iovs) != list else iovs
0171 
0172             """
0173             Finally, get the list of all Payload hashes of IOVs,
0174             then compute the list of hashes for which there is no Payload for
0175             this is used later to decide if we can continue the upload if the Payload was not found on the server.
0176             """
0177             iovs_for_hashes = sqlite_con.iov(tag_name=self.input_tag)
0178             if iovs_for_hashes.__class__ == data_sources.json_list:
0179                 hashes_of_iovs = iovs_for_hashes.get_members("payload_hash").data()
0180             else:
0181                 hashes_of_iovs = [iovs_for_hashes.payload_hash]
0182             self.hashes_with_no_local_payload = [payload_hash for payload_hash in hashes_of_iovs if sqlite_con.payload(hash=payload_hash) == None]
0183 
0184             # close session open on SQLite database file
0185             sqlite_con.close_session()
0186 
0187         elif metadata_source.data().get("hashToUse") != None:
0188             """
0189             Assume we've been given metadata in the command line (since no sqlite file is there, and we have command line arguments).
0190             We now use Tag and IOV data from command line.  It is added to the dictionary for sending over HTTPs later.
0191             """
0192 
0193             # set empty dictionary to contain Tag and IOV data from command line
0194             result_dictionary = {}
0195 
0196             now = to_timestamp(datetime.utcnow())
0197             # tag dictionary will be taken from the server
0198             # this does not require any authentication
0199             tag = self.get_tag_dictionary()
0200             self.check_response_for_error_key(tag)
0201             iovs = [{"tag_name" : self.metadata_source["destinationTag"], "since" : self.metadata_source["since"], "payload_hash" : self.metadata_source["hashToUse"],\
0202                     "insertion_time" : now}]
0203 
0204             # hashToUse cannot be stored locally (no sqlite file is given), so register it as not found
0205             self.hashes_with_no_local_payload = [self.metadata_source["hashToUse"]]
0206 
0207             # Note: normal optimisations will still take place - since the hash checking stage can tell if hashToUse does not exist on the server side
0208 
0209         # if the source Tag is run-based, convert sinces to lumi-based sinces with lumi-section = 0
0210         if tag["time_type"] == "Run":
0211             for (i, iov) in enumerate(iovs):
0212                 iovs[i]["since"] = iovs[i]["since"] << 32
0213 
0214         result_dictionary = {"inputTagData" : tag, "iovs" : iovs}
0215 
0216         # add command line arguments to dictionary
0217         # remembering that metadata_source is a json_dict object
0218         result_dictionary.update(metadata_source.data())
0219 
0220         # store in instance variable
0221         self.data_to_send = result_dictionary
0222 
0223         # if the since doesn't exist, take the first since from the list of IOVs
0224         if result_dictionary.get("since") == None:
0225             result_dictionary["since"] = sorted(iovs, key=lambda iov : iov["since"])[0]["since"]
0226         elif self.data_to_send["inputTagData"]["time_type"] == "Run":
0227             # Tag time_type says IOVs use Runs for sinces, so we convert to Lumi-based for uniform processing
0228             self.data_to_send["since"] = self.data_to_send["since"] << 32
0229 
0230         """
0231         TODO - Settle on a single destination tag format.
0232         """
0233         # look for deprecated metadata entries - give warnings
0234         # Note - we only really support this format
0235         try:
0236             if type(result_dictionary["destinationTags"]) == dict:
0237                 self._outputter.write("WARNING: Multiple destination tags in a single metadata source is deprecated.")
0238         except Exception as e:
0239             self._outputter.write("ERROR: %s" % str(e))
0240 
0241     @check_response(check="json")
0242     def get_tag_dictionary(self):
0243         url_data = {"tag_name" : self.metadata_source["destinationTag"], "database" : self.metadata_source["destinationDatabase"]}
0244         request = url_query(url=self._SERVICE_URL + "get_tag_dictionary/", url_data=url_data)
0245         response = request.send()
0246         return response
0247 
0248     def check_response_for_error_key(self, response_dict, exit_if_error=True):
0249         """
0250         Checks the decoded response of an HTTP request to the server.
0251         If it is a dictionary, and one of its keys is "error", the server returned an error
0252         """
0253         # if the decoded response data is a dictionary and has an error key in it, we should display an error and its traceback
0254         if type(response_dict) == dict and "error" in list(response_dict.keys()):
0255             splitter_string = "\n%s\n" % ("-"*50)
0256             self._outputter.write("\nERROR: %s" % splitter_string, ignore_verbose=True)
0257             self._outputter.write(response_dict["error"], ignore_verbose=True)
0258 
0259             # if the user has given the --debug flag, show the traceback as well
0260             if self._debug:
0261                 # suggest to the user to email this to db upload experts
0262                 self._outputter.write("\nTRACEBACK (since --debug is set):%s" % splitter_string, ignore_verbose=True)
0263                 if response_dict.get("traceback") != None:
0264                     self._outputter.write(response_dict["traceback"], ignore_verbose=True)
0265                 else:
0266                     self._outputter.write("No traceback was returned from the server.", ignore_verbose=True)
0267             else:
0268                 self._outputter.write("Use the --debug option to show the traceback of this error.", ignore_verbose=True)
0269 
0270             # write server side log to client side (if we have an error from creating an upload session, the log is in its initial state (""))
0271             # if an error has occurred on the server side, a log will have been written
0272             self.write_server_side_log(response_dict.get("log_data"))
0273 
0274             if exit_if_error:
0275                 if self._testing:
0276                     return False
0277                 else:
0278                     exit()
0279         elif not("error" in list(response_dict.keys())) and "log_data" in list(response_dict.keys()):
0280             # store the log data, if it's there, in memory - this is used if a request times out and we don't get any log data back
0281             self._log_data = response_dict["log_data"]
0282             return True
0283 
0284     def write_server_side_log(self, log_data):
0285         """
0286         Given the log data from the server, write it to a client-side log file.
0287         """
0288         # if the server_side_log directory doesn't exist, create it
0289         # without it we can't write the log when we download it from the server
0290         if not(os.path.exists(os.path.join(os.getcwd(), "server_side_logs/"))):
0291             os.makedirs("server_side_logs/")
0292 
0293         # directory exists now, write to client-side log file
0294         server_log_file_name = None
0295         try:
0296             # if the upload session does not exist yet, don't try to write the log file
0297             if self.upload_session_id == None:
0298                 raise Exception("No upload session")
0299             # create a write handle to the file, decode the log data from base64, write and close
0300             server_log_file_name = "server_side_logs/upload_log_%s" % str(self.upload_session_id)
0301             handle = open(server_log_file_name, "w")
0302             handle.write(base64.b64decode(log_data))
0303             handle.close()
0304         except Exception as e:
0305             # reset log file name to None so we don't try to write it later
0306             server_log_file_name = None
0307             #self._outputter.write("Couldn't write the server-side log file.\nThis may be because no upload session could be opened.")
0308 
0309         # tell the user where the log files are
0310         # in the next iteration we may just merge the log files and store one log (how it's done in the plotter module)
0311         if server_log_file_name != None:
0312             print("Log file from server written to '%s'." % server_log_file_name)
0313         else:
0314             print("No server log file could be written locally.")
0315 
0316         print("Log file from CondDBFW written to '%s'." % self.upload_log_file_name)
0317 
0318     def exit_upload(self, message=None):
0319         """
0320         Used to exit the script - which only happens if an error has occurred.
0321         If the --testing flag was passed by the user, we should return False for failure, and not exit
0322         """
0323         if self.upload_session_id != None:
0324             # only try to close the upload session if an upload session has been obtained
0325             response = self.close_upload_session(self.upload_session_id)
0326             no_error = self.check_response_for_error_key(response)
0327             # if no error was found in the upload session closure request,
0328             # we still have to write the server side log
0329             if no_error:
0330                 self.write_server_side_log(self._log_data)
0331         # close client-side log handle
0332         self._handle.close()
0333         if message != None:
0334             print("\n%s\n" % message)
0335         if self._testing:
0336             return False
0337         else:
0338             exit()
0339 
0340     def upload(self):
0341         """
0342         Calls methods that send HTTP requests to the upload server.
0343         """
0344 
0345         """
0346         Open an upload session on the server - this also gives us a tag lock on the tag being uploaded, if it is available.
0347         """
0348         try:
0349 
0350             # get upload session, check response for error key
0351             upload_session_data = self.get_upload_session_id()
0352             no_error = self.check_response_for_error_key(upload_session_data)
0353 
0354             # if there was an error and we're testing, return False for the testing module
0355             if not(no_error) and self._testing:
0356                 return False
0357 
0358             self.upload_session_id = upload_session_data["id"]
0359             self._outputter.write("Upload session obtained with token '%s'." % self.upload_session_id)
0360             self.server_side_log_file = upload_session_data["log_file"]
0361 
0362         except errors.NoMoreRetriesException as no_more_retries:
0363             return self.exit_upload("Ran out of retries opening an upload session, where the limit was 3.")
0364         except Exception as e:
0365             # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
0366             self._outputter.write(traceback.format_exc(), ignore_verbose=True)
0367 
0368             if not(self._verbose):
0369                 self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
0370             else:
0371                 self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
0372 
0373             return self.exit_upload()
0374 
0375         """
0376         Only if a value is given for --fcsr-filter, run FCSR filtering on the IOVs locally.
0377         """
0378         if self.data_to_send["fcsr_filter"] != None:
0379             """
0380             FCSR Filtering:
0381             Filtering the IOVs before we send them by getting the First Conditions Safe Run
0382             from the server based on the target synchronization type.
0383             """
0384             if self.data_to_send["inputTagData"]["time_type"] != "Time":
0385                 # if we have a time-based tag, we can't do FCSR validation - this is also the case on the server side
0386                 try:
0387                     self.filter_iovs_by_fcsr(self.upload_session_id)
0388                     # this function does not return a value, since it just operates on data - so no point checking for an error key
0389                     # the error key check is done inside the function on the response from the server
0390                 except errors.NoMoreRetriesException as no_more_retries:
0391                     return self.exit_upload("Ran out of retries trying to filter IOVs by FCSR from server, where the limit was 3.")
0392                 except Exception as e:
0393                     # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
0394                     self._outputter.write(traceback.format_exc(), ignore_verbose=True)
0395 
0396                     if not(self._verbose):
0397                         self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
0398                     else:
0399                         self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
0400 
0401                     return self.exit_upload()
0402             else:
0403                 self._outputter.write("The Tag you're uploading is time-based, so we can't do any FCSR-based validation.  FCSR filtering is being skipped.")
0404 
0405         """
0406         Check for the hashes that the server doesn't have - only send these (but in the next step).
0407         """
0408         try:
0409 
0410             check_hashes_response = self.get_hashes_to_send(self.upload_session_id)
0411             # check for an error key in the response
0412             no_error = self.check_response_for_error_key(check_hashes_response)
0413 
0414             # if there was an error and we're testing, return False for the testing module
0415             if not(no_error) and self._testing:
0416                 return False
0417 
0418             # finally, check hashes_not_found with hashes not found locally - if there is an intersection, we stop the upload
0419             # because if a hash is not found and is not on the server, there is no data to upload
0420             all_hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]]
0421             hashes_not_found = check_hashes_response["hashes_not_found"]
0422             hashes_found = list(set(all_hashes) - set(hashes_not_found))
0423             self._outputter.write("Checking for IOVs that have no Payload locally or on the server.")
0424             # check if any hashes not found on the server is used in the local SQLite database
0425             for hash_not_found in hashes_not_found:
0426                 if hash_not_found in self.hashes_with_no_local_payload:
0427                     return self.exit_upload("IOV with hash '%s' does not have a Payload locally or on the server.  Cannot continue." % hash_not_found)
0428 
0429             for hash_found in hashes_found:
0430                 if hash_found in self.hashes_with_no_local_payload:
0431                     self._outputter.write("Payload with hash %s on server, so can upload IOV." % hash_found)
0432 
0433             self._outputter.write("All IOVs either come with Payloads or point to a Payload already on the server.")
0434 
0435         except errors.NoMoreRetriesException as no_more_retries:
0436             # for now, just write the log if we get a NoMoreRetriesException
0437             return self.exit_upload("Ran out of retries trying to check hashes of payloads to send, where the limit was 3.")
0438         except Exception as e:
0439             # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
0440             self._outputter.write(traceback.format_exc(), ignore_verbose=True)
0441 
0442             if not(self._verbose):
0443                 self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
0444             else:
0445                 self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
0446 
0447             return self.exit_upload()
0448 
0449         """
0450         Send the payloads the server told us about in the previous step (returned from get_hashes_to_send)
0451         exception handling is done inside this method, since it calls a method itself for each payload.
0452         """
0453         send_payloads_response = self.send_payloads(check_hashes_response["hashes_not_found"], self.upload_session_id)
0454         if self._testing and not(send_payloads_response):
0455             return False
0456 
0457         """
0458         Final stage - send metadata to server (since the payloads are there now)
0459         if this is successful, once it finished the upload session is closed on the server and the tag lock is released.
0460         """
0461         try:
0462 
0463             # note that the response (in send_metadata_response) is already decoded from base64 by the response check decorator
0464             send_metadata_response = self.send_metadata(self.upload_session_id)
0465             no_error = self.check_response_for_error_key(send_metadata_response)
0466             if not(no_error) and self._testing:
0467                 return False
0468 
0469             # we have to call this explicitly here since check_response_for_error_key only writes the log file
0470             # if an error has occurred, whereas it should always be written here
0471             self.write_server_side_log(self._log_data)
0472 
0473         except errors.NoMoreRetriesException as no_more_retries:
0474             return self.exit_upload("Ran out of retries trying to send metadata, where the limit was 3.")
0475         except Exception as e:
0476             # something went wrong that we have no specific exception for, so just exit and output the traceback if --debug is set.
0477             self._outputter.write(traceback.format_exc(), ignore_verbose=True)
0478 
0479             if not(self._verbose):
0480                 self._outputter.write("Something went wrong that isn't handled by code - to get the traceback, run again with --verbose.")
0481             else:
0482                 self._outputter.write("Something went wrong that isn't handled by code - the traceback is above.")
0483 
0484             return self.exit_upload()
0485 
0486         # close client side log handle
0487         self._handle.close()
0488 
0489         # if we're running the testing script, return True to say the upload has worked
0490         if self._testing:
0491             return True
0492 
0493     @check_response(check="json")
0494     def get_upload_session_id(self):
0495         """
0496         Open an upload session on the server, and get a unique token back that we can use to authenticate for all future requests,
0497         as long as the upload session is still open.
0498         """
0499         self._outputter.write("Getting upload session.")
0500 
0501         # send password in the body so it can be encrypted over https
0502         # username and password are taken from the netrc file
0503         # at this point, the value in username_or_token is always a username, since
0504         # this method's end result is obtaining a token.
0505         body_data = base64.b64encode(json.dumps(
0506                 {
0507                     "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0],
0508                     "username_or_token" : self.data_to_send["username"],
0509                     "password" : self.data_to_send["password"]
0510                 }
0511             ).encode('UTF-8'))
0512 
0513         url_data = {"database" : self.data_to_send["destinationDatabase"]}
0514 
0515         query = url_query(url=self._SERVICE_URL + "get_upload_session/", body=body_data, url_data=url_data)
0516         response = query.send()
0517         return response
0518 
0519     @check_response(check="json")
0520     def close_upload_session(self, upload_session_id):
0521         """
0522         Close an upload session on the server by calling its close_upload_session end-point.
0523         This is done if there is an error on the client-side.
0524         """
0525         self._outputter.write("An error occurred - closing the upload session on the server.")
0526         url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
0527         query = url_query(url=self._SERVICE_URL + "close_upload_session/", url_data=url_data)
0528         response = query.send()
0529         return response
0530 
0531     @check_response(check="json")
0532     def get_fcsr_from_server(self, upload_session_id):
0533         """
0534         Execute the HTTPs request to ask the server for the FCSR.
0535 
0536         Note: we do this in a separate function we so we can do the decoding check for json data with check_response.
0537         """
0538         # tiny amount of client-side logic here - all of the work is done on the server
0539         # tier0_response uses get() so if the key isn't present, we default to None
0540         # tier0_response is for replaying uploads from the old upload service, with knowledge of the tier0 response
0541         # when those uploads happened.
0542         url_data = {
0543                         "database" : self.data_to_send["destinationDatabase"],
0544                         "upload_session_id" : upload_session_id,
0545                         "destinationTag" : list(self.data_to_send["destinationTags"].keys())[0],
0546                         "sourceTagSync" : self.data_to_send["fcsr_filter"],
0547                         "tier0_response" : self.data_to_send.get("tier0_response")
0548                     }
0549         query = url_query(url=self._SERVICE_URL + "get_fcsr/", url_data=url_data)
0550         result = query.send()
0551         return result
0552 
0553     def filter_iovs_by_fcsr(self, upload_session_id):
0554         """
0555         Ask for the server for the FCSR based on the synchronization type of the source Tag.
0556         Then, modify the IOVs (possibly remove some) based on the FCSR we received.
0557         This is useful in the case that most IOVs have different payloads, and our FCSR is close to the end of the range the IOVs cover.
0558         """
0559         self._outputter.write("Getting the First Condition Safe Run for the current sync type.")
0560 
0561         fcsr_data = self.get_fcsr_from_server(upload_session_id)
0562         fcsr = fcsr_data["fcsr"]
0563         fcsr_changed = fcsr_data["fcsr_changed"]
0564         new_sync = fcsr_data["new_sync"]
0565 
0566         if fcsr_changed:
0567             self._outputter.write("Synchronization '%s' given was changed to '%s' to match destination Tag." % (self.data_to_send["fcsr_filter"], new_sync))
0568 
0569         self._outputter.write("Synchronization '%s' gave FCSR %d for FCSR Filtering."\
0570                             % (self.data_to_send["fcsr_filter"], friendly_since(self.data_to_send["inputTagData"]["time_type"], fcsr)))
0571 
0572         """
0573         There may be cases where this assumption is not correct (that we can reassign since if fcsr > since)
0574         Only set since to fcsr from server if the fcsr is further along than the user is trying to upload to
0575         Note: this applies to run, lumi and timestamp run_types.
0576         """
0577 
0578         # if the fcsr is above the since given by the user, we need to set the user since to the fcsr
0579         if fcsr > self.data_to_send["since"]:
0580             # check if we're uploading to offline sync - if so, then user since must be >= fcsr, so we should report an error
0581             if self.data_to_send["fcsr_filter"].lower() == "offline":
0582                 self._outputter.write("If you're uploading to offline, you can't upload to a since < FCSR.\nNo upload has been processed.")
0583                 self.exit_upload()
0584             self.data_to_send["since"] = fcsr
0585 
0586         self._outputter.write("Final FCSR after comparison with FCSR received from server is %d."\
0587                                 % friendly_since(self.data_to_send["inputTagData"]["time_type"], int(self.data_to_send["since"])))
0588 
0589         """
0590         Post validation processing assuming destination since is now valid.
0591 
0592         Because we don't have an sqlite database to query (everything's in a dictionary),
0593         we have to go through the IOVs manually find the greatest since that's less than
0594         the destination since.
0595 
0596         Purpose of this algorithm: move any IOV sinces that we can use up to the fcsr without leaving a hole in the Conditions coverage
0597         """
0598         
0599         max_since_below_dest = self.data_to_send["iovs"][0]["since"]
0600         for (i, iov) in enumerate(self.data_to_send["iovs"]):
0601             if self.data_to_send["iovs"][i]["since"] <= self.data_to_send["since"] and self.data_to_send["iovs"][i]["since"] > max_since_below_dest:
0602                 max_since_below_dest = self.data_to_send["iovs"][i]["since"]
0603 
0604         # only select iovs that have sinces >= max_since_below_dest
0605         # and then shift any IOVs left to the destination since
0606         self.data_to_send["iovs"] = [iov for iov in self.data_to_send["iovs"] if iov["since"] >= max_since_below_dest]
0607         for (i, iov) in enumerate(self.data_to_send["iovs"]):
0608             if self.data_to_send["iovs"][i]["since"] < self.data_to_send["since"]:
0609                 self.data_to_send["iovs"][i]["since"] = self.data_to_send["since"]
0610 
0611         # modify insertion_time of iovs
0612         new_time = to_timestamp(datetime.utcnow())
0613         for (i, iov) in enumerate(self.data_to_send["iovs"]):
0614             self.data_to_send["iovs"][i]["insertion_time"] = new_time
0615 
0616     def get_all_hashes(self):
0617         """
0618         Get all the hashes from the dictionary of IOVs we have from the SQLite file.
0619         """
0620         self._outputter.write("\tGetting list of all hashes found in SQLite database.")
0621         hashes = [iov["payload_hash"] for iov in self.data_to_send["iovs"]]
0622         return hashes
0623 
0624     @check_response(check="json")
0625     def get_hashes_to_send(self, upload_session_id):
0626         """
0627         Get the hashes of the payloads we want to send that the server doesn't have yet.
0628         """
0629         self._outputter.write("Getting list of hashes that the server does not have Payloads for, to send to server.")
0630         post_data = json.dumps(self.get_all_hashes())
0631         url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
0632         query = url_query(url=self._SERVICE_URL + "check_hashes/", url_data=url_data, body=post_data)
0633         response = query.send()
0634         return response
0635 
0636     def send_payloads(self, hashes, upload_session_id):
0637         """
0638         Send a list of payloads corresponding to hashes we got from the SQLite file and filtered by asking the server.
0639         """
0640         # if we have no hashes, we can't send anything
0641         # but don't exit since it might mean all the Payloads were already on the server
0642         if len(hashes) == 0:
0643             self._outputter.write("No hashes to send - moving to metadata upload.")
0644             return True
0645         else:
0646             self._outputter.write("Sending payloads of hashes not found:")
0647             # construct connection string for local SQLite database file
0648             database = ("sqlite://%s" % os.path.abspath(self.sqlite_file_name)) if type(self.sqlite_file_name) == str else self.sqlite_file_name
0649             # create CondDBFW connection that maps blobs - as we need to query for payload BLOBs (disabled by default in CondDBFW)
0650             self._outputter.write("\tConnecting to input SQLite database.")
0651             con = querying.connect(database, map_blobs=True)
0652 
0653             # query for the Payloads
0654             self._outputter.write("\tGetting Payloads from SQLite database based on list of hashes.")
0655             byte_hashes = [bytes(h, 'utf-8') for h in hashes]
0656             payloads = con.payload(hash=byte_hashes)
0657             # if we get a single Payload back, put it in a list and turn it into a json_list
0658             if payloads and payloads.__class__ != data_sources.json_list:
0659                 payloads = data_sources.json_data_node.make([payloads])
0660 
0661             # close the session with the SQLite database file - we won't use it again
0662             con.close_session()
0663 
0664             # if found some Payloads, send them
0665             if payloads:
0666                 # Note: there is an edge case in which the SQLite file could have been queried
0667                 # to delete the Payloads since we queried it for IOV hashes.  This may be handled in the next iteration.
0668                 # send http post with data blob in body, and everything else as URL parameters
0669                 # convert Payload to a dictionary - we can put most of this into the URL of the HTTPs request
0670                 dicts = payloads.as_dicts()
0671                 self._outputter.write("Uploading Payload BLOBs:")
0672 
0673                 # for each payload, send the BLOB to the server
0674                 for n, payload in enumerate(dicts):
0675                     self._outputter.write("\t(%d/%d) Sending payload with hash '%s'." % (n+1, len(dicts), payload["hash"]))
0676                     response = self.send_blob(payload, upload_session_id)
0677                     # check response for errors
0678                     no_error = self.check_response_for_error_key(response, exit_if_error=True)
0679                     if not(no_error):
0680                         return False
0681                     self._outputter.write("\tPayload sent - moving to next one.")
0682                 self._outputter.write("All Payloads uploaded.")
0683                 return True
0684             else:
0685                 return False
0686 
0687     @check_response(check="json")
0688     def send_blob(self, payload, upload_session_id):
0689         """
0690         Send the BLOB of a payload over HTTP.
0691         The BLOB is put in the request body, so no additional processing has to be done on the server side, apart from decoding from base64.
0692         """
0693         # encode the BLOB data of the Payload to make sure we don't send a character that will influence the HTTPs request
0694         blob_data = base64.b64encode(payload["data"])
0695 
0696         url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id}
0697 
0698         # construct the data to send in the body and header of the HTTPs request
0699         for key in list(payload.keys()):
0700             # skip blob
0701             if key != "data":
0702                 if key == "insertion_time":
0703                     url_data[key] = to_timestamp(payload[key])
0704                 else:
0705                     url_data[key] = payload[key]
0706 
0707         request = url_query(url=self._SERVICE_URL + "store_payload/", url_data=url_data, body=blob_data)
0708 
0709         # send the request and return the response
0710         # Note - the url_query module will handle retries, and will throw a NoMoreRetriesException if it runs out
0711         try:
0712             request_response = request.send()
0713             return request_response
0714         except Exception as e:
0715             # make sure we don't try again - if a NoMoreRetriesException has been thrown, retries have run out
0716             if type(e) == errors.NoMoreRetriesException:
0717                 self._outputter.write("\t\t\tPayload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
0718                 self._outputter.write("Payload with hash '%s' was not uploaded because the maximum number of retries was exceeded." % payload["hash"])
0719             return json.dumps({"error" : str(e), "traceback" : traceback.format_exc()})
0720 
0721     @check_response(check="json")
0722     def send_metadata(self, upload_session_id):
0723         """
0724         Final part of the upload process - send the Conditions metadata (Tag, IOVs - not upload metadata).
0725         The server closes the session (and releases the tag lock) after processing has been completed.
0726         """
0727 
0728         # set user text if it's empty
0729         if self.data_to_send["userText"] in ["", None]:
0730             self.data_to_send["userText"] = "Tag '%s' uploaded from CondDBFW client." % list(self.data_to_send["destinationTags"].keys())[0]
0731 
0732         self._outputter.write("Sending metadata to server - see server_side_log at server_side_logs/upload_log_%s for details on metadata processing on server side."\
0733                             % self.upload_session_id)
0734 
0735         # sent the HTTPs request to the server
0736         url_data = {"database" : self.data_to_send["destinationDatabase"], "upload_session_id" : upload_session_id, "tier0_response" : self.data_to_send.get("tier0_response")}
0737         request = url_query(url=self._SERVICE_URL + "upload_metadata/", url_data=url_data, body=json.dumps(self.data_to_send))
0738         response = request.send()
0739         self._outputter.write("Response received - conditions upload process complete.")
0740         return response
0741 
0742 if __name__ == "__main__":
0743     """
0744     This code should only be executed for testing.
0745     """
0746     import sys
0747     from .uploadConditions import parse_arguments
0748 
0749     print(
0750 """
0751 This code should only be executed for testing.
0752 Any uploads done by the user should be done by calling the uploadConditions.py script.
0753 See https://cms-conddb-dev.cern.ch/cmsDbCondUpload for information on how to obtain the correct version.
0754 """
0755     )
0756 
0757     upload_metadata = parse_arguments()
0758 
0759     upload_metadata["sqlite_file"] = upload_metadata.get("sourceDB")
0760 
0761     # make new dictionary, and copy over everything except "metadata_source"
0762     upload_metadata_argument = {}
0763     for (key, value) in list(upload_metadata.items()):
0764         if key != "metadata_source":
0765             upload_metadata_argument[key] = value
0766 
0767     upload_metadata["metadata_source"] = data_sources.json_data_node.make(upload_metadata_argument)
0768 
0769     upload_controller = uploader(**upload_metadata)
0770 
0771     result = upload_controller.upload()