File indexing completed on 2024-11-25 02:29:20
0001 import subprocess
0002 import json
0003 import netrc
0004 import sqlite3
0005 import os
0006 import sys
0007 import shutil
0008 import logging
0009 from datetime import datetime
0010
0011 errorInImportFileFolder = 'import_errors'
0012 dateformatForFolder = "%Y-%m-%d-%H-%M-%S"
0013 dateformatForLabel = "%Y-%m-%d %H:%M:%S"
0014
0015 auth_path_key = 'COND_AUTH_PATH'
0016
0017 messageLevelEnvVar = 'POPCON_LOG_LEVEL'
0018 fmt_str = "[%(asctime)s] %(levelname)s: %(message)s"
0019 logLevel = logging.INFO
0020 if messageLevelEnvVar in os.environ:
0021 levStr = os.environ[messageLevelEnvVar]
0022 if levStr == 'DEBUG':
0023 logLevel = logging.DEBUG
0024 logFormatter = logging.Formatter(fmt_str)
0025 logger = logging.getLogger()
0026 logger.setLevel(logLevel)
0027 consoleHandler = logging.StreamHandler(sys.stdout)
0028 consoleHandler.setFormatter(logFormatter)
0029 logger.addHandler(consoleHandler)
0030
0031 def checkFile( dbName ):
0032 dbFileName = '%s.db' %dbName
0033
0034
0035
0036
0037 if not os.path.exists( dbFileName ):
0038 logger.error('The file generated by PopCon with the data to be imported %s has not been found.'%dbFileName )
0039 return -1
0040
0041 empty = True
0042 try:
0043 dbcon = sqlite3.connect( dbFileName )
0044 dbcur = dbcon.cursor()
0045 dbcur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='IOV'");
0046 if dbcur.fetchone() is None:
0047 logger.error('The condition database with the data to be imported has not been found in the file generated by PopCon: %s'%dbFileName )
0048 return -1
0049 dbcur.execute('SELECT * FROM IOV')
0050 rows = dbcur.fetchall()
0051 for r in rows:
0052 empty = False
0053 dbcon.close()
0054 if empty:
0055 logger.warning('The data set generated by PopCon contains no data to be imported. The import will be skipped.')
0056 return 0
0057 return 1
0058 except Exception as e:
0059 logger.error('Check on input data failed: %s' %str(e))
0060 return -2
0061
0062 def saveFileForImportErrors( datef, dbName, withMetadata=False ):
0063
0064 leafFolderName = datef.strftime(dateformatForFolder)
0065 fileFolder = os.path.join( errorInImportFileFolder, leafFolderName)
0066 if not os.path.exists(fileFolder):
0067 os.makedirs(fileFolder)
0068 df= '%s.db' %dbName
0069 dataDestFile = os.path.join( fileFolder, df)
0070 if not os.path.exists(dataDestFile):
0071 shutil.copy2(df, dataDestFile)
0072 if withMetadata:
0073 mf= '%s.txt' %dbName
0074 metadataDestFile = os.path.join( fileFolder, mf )
0075 if not os.path.exists(metadataDestFile):
0076 shutil.copy2(df, metadataDestFile)
0077 logger.error("Upload failed. Data file and metadata saved in folder '%s'" %os.path.abspath(fileFolder))
0078
0079 def upload( args, dbName ):
0080 destDb = args.destDb
0081 destTag = args.destTag
0082 comment = args.comment
0083
0084 datef = datetime.now()
0085
0086
0087 if os.path.exists( '%s.txt' %dbName ):
0088 logger.debug('Removing already existing file %s' %dbName)
0089 os.remove( '%s.txt' %dbName )
0090
0091
0092 uploadMd = {}
0093 uploadMd['destinationDatabase'] = destDb
0094 tags = {}
0095 tagInfo = {}
0096 tags[ destTag ] = tagInfo
0097 uploadMd['destinationTags'] = tags
0098 uploadMd['inputTag'] = destTag
0099 uploadMd['since'] = None
0100 datelabel = datef.strftime(dateformatForLabel)
0101 commentStr = ''
0102 if not comment is None:
0103 commentStr = comment
0104 uploadMd['userText'] = '%s : %s' %(datelabel,commentStr)
0105 with open( '%s.txt' %dbName, 'wb') as jf:
0106 jf.write( json.dumps( uploadMd, sort_keys=True, indent = 2 ) )
0107 jf.write('\n')
0108
0109
0110 uploadCommand = 'uploadConditions.py %s' %dbName
0111 ret = 0
0112 try:
0113 pipe = subprocess.Popen( uploadCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
0114 stdout = pipe.communicate()[0]
0115 print(stdout)
0116 retCode = pipe.returncode
0117 if retCode != 0:
0118 saveFileForImportErrors( datef, dbName, True )
0119 ret |= retCode
0120 except Exception as e:
0121 ret |= 1
0122 logger.error(str(e))
0123 return ret
0124
0125 def copy( args, dbName ):
0126 dbFileName = '%s.db' %dbName
0127 destDb = args.destDb
0128 destTag = args.destTag
0129 comment = args.comment
0130
0131 datef = datetime.now()
0132 destMap = { "oracle://cms_orcoff_prep/cms_conditions": "oradev", "oracle://cms_orcon_prod/cms_conditions": "onlineorapro" }
0133 if destDb.lower() in destMap.keys():
0134 destDb = destMap[destDb.lower()]
0135 else:
0136 if destDb.startswith('sqlite'):
0137 destDb = destDb.split(':')[1]
0138 else:
0139 logger.error( 'Destination connection %s is not supported.' %destDb )
0140 return
0141
0142 note = '"Importing data with O2O execution"'
0143 commandOptions = '--force --yes --db %s copy %s %s --destdb %s --synchronize --note %s' %(dbFileName,destTag,destTag,destDb,note)
0144 copyCommand = 'conddb %s' %commandOptions
0145 logger.info( 'Executing command: %s' %copyCommand )
0146 try:
0147 pipe = subprocess.Popen( copyCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
0148 stdout = pipe.communicate()[0]
0149 print(stdout)
0150 retCode = pipe.returncode
0151 if retCode != 0:
0152 saveFileForImportErrors( datef, dbName )
0153 ret = retCode
0154 except Exception as e:
0155 ret = 1
0156 logger.error( str(e) )
0157 return ret
0158
0159 def run( args ):
0160
0161 dbName = datetime.utcnow().strftime('%Y-%m-%d_%H-%M-%S-%f')
0162 dbFileName = '%s.db' %dbName
0163
0164 if args.auth is not None and not args.auth=='':
0165 if auth_path_key in os.environ:
0166 logger.warning("Cannot set authentication path to %s in the environment, since it is already set." %args.auth)
0167 else:
0168 logger.info("Setting the authentication path to %s in the environment." %args.auth)
0169 os.environ[auth_path_key]=args.auth
0170 if os.path.exists( '%s.db' %dbName ):
0171 logger.info("Removing files with name %s" %dbName )
0172 os.remove( '%s.db' %dbName )
0173 if os.path.exists( '%s.txt' %dbName ):
0174 os.remove( '%s.txt' %dbName )
0175 command = 'cmsRun %s ' %args.job_file
0176 command += ' targetFile=%s' %dbFileName
0177 command += ' destinationDatabase=%s' %args.destDb
0178 command += ' destinationTag=%s' %args.destTag
0179 command += ' 2>&1'
0180 logger.info( 'Executing command: %s' %command )
0181 pipe = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT )
0182 stdout = pipe.communicate()[0]
0183 retCode = pipe.returncode
0184 print(stdout)
0185 logger.info('PopCon Analyzer return code is: %s' %retCode )
0186 if retCode!=0:
0187 logger.error( 'O2O job failed. Skipping upload.' )
0188 return retCode
0189
0190 ret = checkFile( dbName )
0191 if ret > 0:
0192 if args.copy:
0193 ret = copy( args, dbName )
0194 else:
0195 ret = upload( args, dbName )
0196 if ret >=0:
0197 logger.info('Deleting local file %s.db' %dbName )
0198 os.remove( '%s.db' %dbName )
0199 return ret