Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:07:42

0001 #! /usr/bin/env python3
0002 
0003 from __future__ import print_function
0004 from builtins import range
0005 import os,time,sys,zipfile,re,shutil,stat
0006 from fcntl import lockf, LOCK_EX, LOCK_UN
0007 from hashlib import md5
0008 from glob import glob
0009 from datetime import datetime
0010 
0011 COLLECTING_DIR = sys.argv[1] #Directory where to look for root files
0012 T_FILE_DONE_DIR = sys.argv[2] #Directory where to place processed root files
0013 DROPBOX = sys.argv[3] #Directory where the collected files are sent.
0014 
0015 EXEDIR = os.path.dirname(__file__) 
0016 COLLECTOR_WAIT_TIME = 10 # time  between collector cilces
0017 WAIT_TIME_FILE_PT = 60 * 15 # time to wait to pick up lost files
0018 TMP_DROPBOX = os.path.join(DROPBOX,".uploading")
0019 KEEP = 2 # number of _d files to keep
0020 RETRIES = 3 # number of retries to sen a file
0021 STOP_FILE = "%s/.stop" % EXEDIR
0022 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0023 os.environ["WorkDir"] = EXEDIR
0024 
0025 def logme(msg, *args):
0026   procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0027   print(datetime.now(), procid, msg % args)
0028   
0029 def filecheck(rootfile):
0030   cmd = EXEDIR + '/filechk.sh ' + rootfile
0031   a = os.popen(cmd).read().split()
0032   tag=a.pop()
0033   if tag == '(int)(-1)':
0034     logme("ERROR: File %s corrupted (isZombi)", rootfile)
0035     return False
0036   elif tag == '(int)0':
0037     logme("ERROR: File %s is incomplete", rootfile)
0038     return False
0039   elif tag == '(int)1':
0040     return True
0041   else:
0042     return False
0043 
0044 def isFileOpen(fName):
0045   fName = os.path.realpath(fName)
0046   pids=os.listdir('/proc')
0047   for pid in sorted(pids):
0048     try:  
0049       if not pid.isdigit():
0050         continue
0051     
0052       if os.stat(os.path.join('/proc',pid)).st_uid != os.getuid():
0053         continue
0054       
0055       uid = os.stat(os.path.join('/proc',pid)).st_uid
0056       fd_dir=os.path.join('/proc', pid, 'fd')
0057       if os.stat(fd_dir).st_uid != os.getuid():
0058         continue
0059         
0060       for f in os.listdir(fd_dir):
0061         fdName = os.path.join(fd_dir, f)
0062         if os.path.islink(fdName) :       
0063             link=os.readlink(fdName)
0064             if link == fName:
0065               return True
0066     except:
0067       continue
0068           
0069   return False
0070      
0071 def convert(infile, ofile):
0072   cmd = EXEDIR + '/convert.sh ' + infile + ' ' +ofile
0073   os.system(cmd)
0074   
0075 def uploadFile(fName, subsystem, run):
0076   hname = os.getenv("HOSTNAME")
0077   seed=hname.replace("-","t")[-6:]
0078   finalTMPfile="%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)        
0079   if os.path.exists(finalTMPfile):
0080     os.remove(finalTMPfile)
0081             
0082   md5Digest=md5(file(fName).read())
0083   originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
0084   originTMPFile="%s.origin" % finalTMPfile
0085   originFile=open(originTMPFile,"w")
0086   originFile.write(originStr)
0087   originFile.close() 
0088   shutil.copy(fName,finalTMPfile)
0089   if not os.path.exists(finalTMPfile) or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
0090     return False
0091   
0092   version=1
0093   lFile=open("%s/lock" % TMP_DROPBOX ,"a")
0094   lockf(lFile,LOCK_EX)
0095   for vdir,vsubdir,vfiles in os.walk(DROPBOX):
0096     if 'DQM_V0001_%s_R%s.root' % (subsystem,run) not in vfiles:
0097       continue
0098     
0099     version += 1
0100 
0101   if not os.path.exists("%s/%04d" % (DROPBOX,version)):
0102     os.makedirs("%s/%04d" % (DROPBOX,version))
0103     os.chmod("%s/%04d" % (DROPBOX,version),2775)
0104                   
0105   finalfile="%s/%04d/DQM_V0001_%s_R%s.root" %   (DROPBOX,version,subsystem,run)        
0106   originFileName="%s.origin" % finalfile     
0107   try:
0108     os.rename(finalTMPfile,finalfile)
0109     os.rename(originTMPFile,originFileName)
0110     os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0111     os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)  
0112   except:
0113     lockf(lFile,LOCK_UN)
0114     lFile.close()
0115     logme("ERROR: File %s upload failed to the DROPBOX" % fName)
0116     return False
0117     
0118   logme("INFO: File %s has been successfully sent to the DROPBOX" % fName)
0119   lockf(lFile,LOCK_UN)
0120   lFile.close()
0121   return True
0122   
0123 def processSiStrip(fName,finalTfile):
0124   dqmfile = fName
0125   if "Playback" in fName and "SiStrip" == NEW[rFile]["subSystem"]:
0126     dqmfile = fName.replace('Playback','DQM')
0127     convert(fName,dqmfile)
0128     if not os.path.exists(dqmfile):
0129       logme("ERROR: Problem converting %s skiping" % Tfile)
0130       shutil.move(fName,finalTfile+"_d")
0131       return (dqmfile,False)
0132       
0133     os.rename(fName,finalTfile.replace('Playback','Playback_full'))
0134   
0135   return (dqmfile,True)
0136   
0137 ####### ENDLESS LOOP WITH SLEEP
0138 NEW = {}
0139 LAST_SEEN_RUN = "0"
0140 LAST_FILE_UPLOADED = time.time()
0141 if not os.path.exists(TMP_DROPBOX):
0142   os.makedirs(TMP_DROPBOX)
0143 
0144 while True:
0145   #Check if you need to stop.
0146   if os.path.exists(STOP_FILE):
0147     logme("INFO: Stop file found, quitting")
0148     sys.exit(0)
0149 
0150   #clean up tagfiele_runend files, this should be removed as it use is deprecated
0151   TAGS=sorted(glob('%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=True)
0152   for tag in TAGS:
0153     os.remove(tag)
0154     
0155   for dir, subdirs, files in os.walk(COLLECTING_DIR):
0156     for f in files:
0157       fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
0158       if fMatch:
0159         runnr = fMatch.group("runnr")
0160         subsystem=fMatch.group("subsys")
0161         f = "%s/%s" % (dir, f)
0162         NEW.setdefault(f, {"runNumber":runnr, 
0163                            "subSystem":subsystem, 
0164                            "Processed":False, 
0165                            "TFiles":[]})
0166         if int(runnr) > int(LAST_SEEN_RUN):
0167           LAST_SEEN_RUN = runnr
0168   
0169   for rFile in NEW.keys():
0170     if len(NEW[rFile]["TFiles"]):
0171       continue
0172       
0173     # Add respective T files just in case the final root file is damage
0174     for dir, subdirs, files in os.walk(COLLECTING_DIR):
0175       for f in files:
0176         runnr = NEW[rFile]["runNumber"]
0177         subsystem=NEW[rFile]["subSystem"]
0178         fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
0179                     subsystem, runnr),f)
0180         if fMatch:
0181           f = "%s/%s" % (dir, f)
0182           NEW[rFile]["TFiles"].append(f)
0183 
0184       NEW[rFile]["TFiles"].sort(reverse=True)   
0185   
0186   #Process files
0187   for rFile in NEW.keys():
0188     if isFileOpen(rFile):
0189       logme("INFO: File %s is open", rFile)
0190       continue
0191     
0192     transferred = False
0193     run = NEW[rFile]["runNumber"]
0194     subsystem = NEW[rFile]["subSystem"]
0195     finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
0196     if not os.path.exists(finalTdir):
0197       os.makedirs(finalTdir)
0198     
0199     if not filecheck(rFile):
0200       os.rename(rFile,"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
0201       for Tfile in NEW[rFile]["TFiles"]:
0202         finalTfile="%s/%s" % (finalTdir,os.path.basename(Tfile))
0203         if transferred:
0204           break
0205         
0206         if not filecheck(Tfile):
0207           if os.path.exists(Tfile):
0208             shutil.move(Tfile,finalTfile+"_d")
0209           continue
0210             
0211         fToUpload, converted = processSiStrip(Tfile, finalTfile)
0212         if not converted:
0213           continue
0214 
0215         for i in range(RETRIES):
0216           if uploadFile(fToUpload, subsystem, run):
0217             NEW[rFile]["Processed"] = transferred = True
0218             LAST_FILE_UPLOADED = time.time()
0219             os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
0220             break
0221         
0222       NEW[rFile]['Processed'] = True
0223       continue
0224     
0225     finalTfile="%s/%s" % (finalTdir,os.path.basename(rFile))    
0226     fToUpload, converted = processSiStrip(rFile, finalTfile)
0227     if not converted:
0228       continue
0229     
0230     for i in range(RETRIES):
0231       if uploadFile(fToUpload, subsystem, run):
0232         NEW[rFile]["Processed"] = transferred = True
0233         LAST_FILE_UPLOADED = time.time()
0234         os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
0235         break
0236   
0237   #Clean up COLLECTING_DIR
0238   for rFile in NEW.keys():
0239     if not  NEW[rFile]["Processed"]:
0240       continue
0241 
0242     run = NEW[rFile]["runNumber"]
0243     subsystem = NEW[rFile]["subSystem"]
0244     finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
0245     for Tfile in NEW[rFile]["TFiles"]:
0246       if os.path.exists(Tfile):
0247         finalTfile="%s/%s_d" % (finalTdir,os.path.basename(Tfile)) 
0248         os.rename(Tfile,finalTfile)
0249         
0250     #Enforce KEEPS
0251     fList = sorted(glob("%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=lambda x,y: "_T" not in x and 1 or ("_T" in y and ( -1 * cmp(x,y))))
0252     for f in fList[::-1]:
0253       if len(fList) > KEEP:
0254         fList.remove(f)
0255         os.remove(f)
0256   
0257   #Determine if the run has been fully processed.
0258   for rFile in NEW.keys():
0259     if NEW[rFile]['Processed']:
0260       del NEW[rFile]
0261       
0262   #Find and process orphan _T files.
0263   if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
0264     for dir, subdirs, files in os.walk(COLLECTING_DIR):
0265       for f in files:
0266         fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
0267         if not fMatch:
0268           continue
0269           
0270         runnr = fMatch.group("runnr")
0271         subsystem=fMatch.group("subsys")
0272         if runnr > LAST_SEEN_RUN:
0273           continue
0274           
0275         tmpFName = "%s/%s.root" % (dir,f.rsplit("_",1)[0])
0276         if os.path.exists(tmpFName):
0277           continue
0278           
0279         finalTdir = "%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
0280         fList = sorted(glob("%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
0281                     cmp=lambda x,y: cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
0282         fName = "%s/%s" % (dir,f)
0283         if len(fList) and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
0284           os.remove(fName)
0285           continue
0286         
0287         logme("INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
0288         tmpF = open(tmpFName,"w+")
0289         tmpF.close()
0290         del tmpF      
0291         
0292   time.sleep(COLLECTOR_WAIT_TIME)