Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-26 02:34:08

0001 #! /usr/bin/env python3
0002 
0003 from builtins import range
0004 import os,time,sys,zipfile,re,shutil,stat
0005 from fcntl import lockf, LOCK_EX, LOCK_UN
0006 from hashlib import md5
0007 from glob import glob
0008 from datetime import datetime
0009 
0010 COLLECTING_DIR = sys.argv[1] #Directory where to look for root files
0011 T_FILE_DONE_DIR = sys.argv[2] #Directory where to place processed root files
0012 DROPBOX = sys.argv[3] #Directory where the collected files are sent.
0013 
0014 EXEDIR = os.path.dirname(__file__) 
0015 COLLECTOR_WAIT_TIME = 10 # time  between collector cilces
0016 WAIT_TIME_FILE_PT = 60 * 15 # time to wait to pick up lost files
0017 TMP_DROPBOX = os.path.join(DROPBOX,".uploading")
0018 KEEP = 2 # number of _d files to keep
0019 RETRIES = 3 # number of retries to sen a file
0020 STOP_FILE = "%s/.stop" % EXEDIR
0021 sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
0022 os.environ["WorkDir"] = EXEDIR
0023 
0024 def logme(msg, *args):
0025   procid = "[%s/%d]" % (__file__.rsplit("/", 1)[-1], os.getpid())
0026   print(datetime.now(), procid, msg % args)
0027   
0028 def filecheck(rootfile):
0029   cmd = EXEDIR + '/filechk.sh ' + rootfile
0030   a = os.popen(cmd).read().split()
0031   tag=a.pop()
0032   if tag == '(int)(-1)':
0033     logme("ERROR: File %s corrupted (isZombi)", rootfile)
0034     return False
0035   elif tag == '(int)0':
0036     logme("ERROR: File %s is incomplete", rootfile)
0037     return False
0038   elif tag == '(int)1':
0039     return True
0040   else:
0041     return False
0042 
0043 def isFileOpen(fName):
0044   fName = os.path.realpath(fName)
0045   pids=os.listdir('/proc')
0046   for pid in sorted(pids):
0047     try:  
0048       if not pid.isdigit():
0049         continue
0050     
0051       if os.stat(os.path.join('/proc',pid)).st_uid != os.getuid():
0052         continue
0053       
0054       uid = os.stat(os.path.join('/proc',pid)).st_uid
0055       fd_dir=os.path.join('/proc', pid, 'fd')
0056       if os.stat(fd_dir).st_uid != os.getuid():
0057         continue
0058         
0059       for f in os.listdir(fd_dir):
0060         fdName = os.path.join(fd_dir, f)
0061         if os.path.islink(fdName) :       
0062             link=os.readlink(fdName)
0063             if link == fName:
0064               return True
0065     except:
0066       continue
0067           
0068   return False
0069      
0070 def convert(infile, ofile):
0071   cmd = EXEDIR + '/convert.sh ' + infile + ' ' +ofile
0072   os.system(cmd)
0073   
0074 def uploadFile(fName, subsystem, run):
0075   hname = os.getenv("HOSTNAME")
0076   seed=hname.replace("-","t")[-6:]
0077   finalTMPfile="%s/DQM_V0001_%s_R%s.root.%s" % (TMP_DROPBOX,subsystem,run,seed)        
0078   if os.path.exists(finalTMPfile):
0079     os.remove(finalTMPfile)
0080             
0081   md5Digest=md5(file(fName).read())
0082   originStr="md5:%s %d %s" % (md5Digest.hexdigest(),os.stat(fName).st_size,fName)
0083   originTMPFile="%s.origin" % finalTMPfile
0084   originFile=open(originTMPFile,"w")
0085   originFile.write(originStr)
0086   originFile.close() 
0087   shutil.copy(fName,finalTMPfile)
0088   if not os.path.exists(finalTMPfile) or not os.stat(finalTMPfile).st_size == os.stat(fName).st_size:
0089     return False
0090   
0091   version=1
0092   lFile=open("%s/lock" % TMP_DROPBOX ,"a")
0093   lockf(lFile,LOCK_EX)
0094   for vdir,vsubdir,vfiles in os.walk(DROPBOX):
0095     if 'DQM_V0001_%s_R%s.root' % (subsystem,run) not in vfiles:
0096       continue
0097     
0098     version += 1
0099 
0100   if not os.path.exists("%s/%04d" % (DROPBOX,version)):
0101     os.makedirs("%s/%04d" % (DROPBOX,version))
0102     os.chmod("%s/%04d" % (DROPBOX,version),2775)
0103                   
0104   finalfile="%s/%04d/DQM_V0001_%s_R%s.root" %   (DROPBOX,version,subsystem,run)        
0105   originFileName="%s.origin" % finalfile     
0106   try:
0107     os.rename(finalTMPfile,finalfile)
0108     os.rename(originTMPFile,originFileName)
0109     os.chmod(finalfile,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)
0110     os.chmod(originFileName,stat.S_IREAD|stat.S_IRGRP|stat.S_IROTH| stat.S_IWRITE|stat.S_IWGRP|stat.S_IWOTH)  
0111   except:
0112     lockf(lFile,LOCK_UN)
0113     lFile.close()
0114     logme("ERROR: File %s upload failed to the DROPBOX" % fName)
0115     return False
0116     
0117   logme("INFO: File %s has been successfully sent to the DROPBOX" % fName)
0118   lockf(lFile,LOCK_UN)
0119   lFile.close()
0120   return True
0121   
0122 def processSiStrip(fName,finalTfile):
0123   dqmfile = fName
0124   if "Playback" in fName and "SiStrip" == NEW[rFile]["subSystem"]:
0125     dqmfile = fName.replace('Playback','DQM')
0126     convert(fName,dqmfile)
0127     if not os.path.exists(dqmfile):
0128       logme("ERROR: Problem converting %s skiping" % Tfile)
0129       shutil.move(fName,finalTfile+"_d")
0130       return (dqmfile,False)
0131       
0132     os.rename(fName,finalTfile.replace('Playback','Playback_full'))
0133   
0134   return (dqmfile,True)
0135   
0136 ####### ENDLESS LOOP WITH SLEEP
0137 NEW = {}
0138 LAST_SEEN_RUN = "0"
0139 LAST_FILE_UPLOADED = time.time()
0140 if not os.path.exists(TMP_DROPBOX):
0141   os.makedirs(TMP_DROPBOX)
0142 
0143 while True:
0144   #Check if you need to stop.
0145   if os.path.exists(STOP_FILE):
0146     logme("INFO: Stop file found, quitting")
0147     sys.exit(0)
0148 
0149   #clean up tagfiele_runend files, this should be removed as it use is deprecated
0150   TAGS=sorted(glob('%s/tagfile_runend_*' % COLLECTING_DIR ),reverse=True)
0151   for tag in TAGS:
0152     os.remove(tag)
0153     
0154   for dir, subdirs, files in os.walk(COLLECTING_DIR):
0155     for f in files:
0156       fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})\.root$',f)
0157       if fMatch:
0158         runnr = fMatch.group("runnr")
0159         subsystem=fMatch.group("subsys")
0160         f = "%s/%s" % (dir, f)
0161         NEW.setdefault(f, {"runNumber":runnr, 
0162                            "subSystem":subsystem, 
0163                            "Processed":False, 
0164                            "TFiles":[]})
0165         if int(runnr) > int(LAST_SEEN_RUN):
0166           LAST_SEEN_RUN = runnr
0167   
0168   for rFile in NEW.keys():
0169     if len(NEW[rFile]["TFiles"]):
0170       continue
0171       
0172     # Add respective T files just in case the final root file is damage
0173     for dir, subdirs, files in os.walk(COLLECTING_DIR):
0174       for f in files:
0175         runnr = NEW[rFile]["runNumber"]
0176         subsystem=NEW[rFile]["subSystem"]
0177         fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_%s_R%s_T[0-9]{8}.root$' % (
0178                     subsystem, runnr),f)
0179         if fMatch:
0180           f = "%s/%s" % (dir, f)
0181           NEW[rFile]["TFiles"].append(f)
0182 
0183       NEW[rFile]["TFiles"].sort(reverse=True)   
0184   
0185   #Process files
0186   for rFile in NEW.keys():
0187     if isFileOpen(rFile):
0188       logme("INFO: File %s is open", rFile)
0189       continue
0190     
0191     transferred = False
0192     run = NEW[rFile]["runNumber"]
0193     subsystem = NEW[rFile]["subSystem"]
0194     finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
0195     if not os.path.exists(finalTdir):
0196       os.makedirs(finalTdir)
0197     
0198     if not filecheck(rFile):
0199       os.rename(rFile,"%s/%s_d" % (finalTdir, os.path.basename(rFile)))
0200       for Tfile in NEW[rFile]["TFiles"]:
0201         finalTfile="%s/%s" % (finalTdir,os.path.basename(Tfile))
0202         if transferred:
0203           break
0204         
0205         if not filecheck(Tfile):
0206           if os.path.exists(Tfile):
0207             shutil.move(Tfile,finalTfile+"_d")
0208           continue
0209             
0210         fToUpload, converted = processSiStrip(Tfile, finalTfile)
0211         if not converted:
0212           continue
0213 
0214         for i in range(RETRIES):
0215           if uploadFile(fToUpload, subsystem, run):
0216             NEW[rFile]["Processed"] = transferred = True
0217             LAST_FILE_UPLOADED = time.time()
0218             os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
0219             break
0220         
0221       NEW[rFile]['Processed'] = True
0222       continue
0223     
0224     finalTfile="%s/%s" % (finalTdir,os.path.basename(rFile))    
0225     fToUpload, converted = processSiStrip(rFile, finalTfile)
0226     if not converted:
0227       continue
0228     
0229     for i in range(RETRIES):
0230       if uploadFile(fToUpload, subsystem, run):
0231         NEW[rFile]["Processed"] = transferred = True
0232         LAST_FILE_UPLOADED = time.time()
0233         os.rename(fToUpload, "%s/%s" % (finalTdir, os.path.basename(fToUpload)))
0234         break
0235   
0236   #Clean up COLLECTING_DIR
0237   for rFile in NEW.keys():
0238     if not  NEW[rFile]["Processed"]:
0239       continue
0240 
0241     run = NEW[rFile]["runNumber"]
0242     subsystem = NEW[rFile]["subSystem"]
0243     finalTdir="%s/%s/%s" % (T_FILE_DONE_DIR,run[0:3],run[3:6])
0244     for Tfile in NEW[rFile]["TFiles"]:
0245       if os.path.exists(Tfile):
0246         finalTfile="%s/%s_d" % (finalTdir,os.path.basename(Tfile)) 
0247         os.rename(Tfile,finalTfile)
0248         
0249     #Enforce KEEPS
0250     fList = sorted(glob("%s/*_%s_R%s*_d" % (finalTdir,subsystem, run)),cmp=lambda x,y: "_T" not in x and 1 or ("_T" in y and ( -1 * cmp(x,y))))
0251     for f in fList[::-1]:
0252       if len(fList) > KEEP:
0253         fList.remove(f)
0254         os.remove(f)
0255   
0256   #Determine if the run has been fully processed.
0257   for rFile in NEW.keys():
0258     if NEW[rFile]['Processed']:
0259       del NEW[rFile]
0260       
0261   #Find and process orphan _T files.
0262   if LAST_FILE_UPLOADED < time.time() - WAIT_TIME_FILE_PT:
0263     for dir, subdirs, files in os.walk(COLLECTING_DIR):
0264       for f in files:
0265         fMatch=re.match('^(DQM|Playback)_V[0-9]{4}_(?P<subsys>.*)_R(?P<runnr>[0-9]{9})_T[0-9]{8}\.root$',f)
0266         if not fMatch:
0267           continue
0268           
0269         runnr = fMatch.group("runnr")
0270         subsystem=fMatch.group("subsys")
0271         if runnr > LAST_SEEN_RUN:
0272           continue
0273           
0274         tmpFName = "%s/%s.root" % (dir,f.rsplit("_",1)[0])
0275         if os.path.exists(tmpFName):
0276           continue
0277           
0278         finalTdir = "%s/%s/%s" % (T_FILE_DONE_DIR,runnr[0:3],runnr[3:6])
0279         fList = sorted(glob("%s/*_%s_R%s*" % (finalTdir,subsystem, runnr)),
0280                     cmp=lambda x,y: cmp(os.stat(x).st_mtime,os.stat(y).st_mtime))
0281         fName = "%s/%s" % (dir,f)
0282         if len(fList) and os.stat(fList[-1]).st_mtime > os.stat(fName).st_mtime:
0283           os.remove(fName)
0284           continue
0285         
0286         logme("INFO: Creating dummy file %s to pick up Orphan _T files", tmpFName)
0287         tmpF = open(tmpFName,"w+")
0288         tmpF.close()
0289         del tmpF      
0290         
0291   time.sleep(COLLECTOR_WAIT_TIME)