Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-11-25 02:29:03

0001 #!/usr/bin/env python3
0002 from builtins import range
0003 import os
0004 import re
0005 import subprocess
0006 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
0007 
0008 
0009 def fill_time_info(mps_index, status, cpu_time):
0010     """Fill timing info in the database for `mps_index`.
0011 
0012     Arguments:
0013     - `mps_index`: index in the MPS database
0014     - `status`: job status
0015     - `cpu_time`: extracted CPU timing information
0016     """
0017 
0018     cpu_time = int(round(cpu_time))  # care only about seconds for now
0019     if status in ("RUN", "DONE"):
0020         if cpu_time > 0:
0021             diff = cpu_time - lib.JOBRUNTIME[mps_index]
0022             lib.JOBRUNTIME[mps_index] = cpu_time
0023             lib.JOBHOST[mps_index] = "+"+str(diff)
0024             lib.JOBINCR[mps_index] = diff
0025         else:
0026             lib.JOBRUNTIME[mps_index] = 0
0027             lib.JOBINCR[mps_index] = 0
0028 
0029 
0030 
0031 ################################################################################
0032 # mapping of HTCondor status codes to MPS status
0033 htcondor_jobstatus = {"1": "PEND", # Idle
0034                       "2": "RUN",  # Running
0035                       "3": "EXIT", # Removed
0036                       "4": "DONE", # Completed
0037                       "5": "PEND", # Held
0038                       "6": "RUN",  # Transferring output
0039                       "7": "PEND"} # Suspended
0040 
0041 
0042 ################################################################################
0043 # collect submitted jobs (use 'in' to handle composites, e.g. DISABLEDFETCH)
0044 lib = mpslib.jobdatabase()
0045 lib.read_db()
0046 
0047 submitted_jobs = {}
0048 for i in range(len(lib.JOBID)):
0049     submitted = True
0050     for status in ("SETUP", "OK", "DONE", "FETCH", "ABEND", "WARN", "FAIL"):
0051         if status in lib.JOBSTATUS[i]:
0052             submitted = False
0053             break
0054     if submitted:
0055         submitted_jobs[lib.JOBID[i]] = i
0056 print("submitted jobs:", len(submitted_jobs))
0057 
0058 
0059 ################################################################################
0060 # deal with submitted jobs by looking into output of shell (condor_q)
0061 if len(submitted_jobs) > 0:
0062     job_status = {}
0063     condor_q = subprocess.check_output(["condor_q", "-af:j",
0064                                         "JobStatus", "RemoteSysCpu"],
0065                                        stderr = subprocess.STDOUT).decode()
0066     for line in condor_q.splitlines():
0067         job_id, status, cpu_time = line.split()
0068         job_status[job_id] = {"status": htcondor_jobstatus[status],
0069                               "cpu": float(cpu_time)}
0070 
0071     for job_id, job_info in job_status.items():
0072         mps_index = submitted_jobs.get(job_id, -1)
0073         # check for disabled Jobs
0074         disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
0075 
0076         # continue with next batch job if not found or not interesting
0077         if mps_index == -1:
0078             print("mps_update.py - the job", job_id, end=' ')
0079             print("was not found in the JOBID array")
0080             continue
0081         else:                   # pop entry from submitted jobs
0082             submitted_jobs.pop(job_id)
0083 
0084 
0085         # if found update Joblists for mps.db
0086         lib.JOBSTATUS[mps_index] = disabled+job_info["status"]
0087         fill_time_info(mps_index, job_info["status"], job_info["cpu"])
0088 
0089 
0090 ################################################################################
0091 # loop over remaining jobs to see whether they are done
0092 submitted_jobs_copy = { k:v for k,v in submitted_jobs.items() }
0093 for job_id, mps_index in submitted_jobs_copy.items(): # IMPORTANT to copy here (no iterator!)
0094     # check if current job is disabled. Print stuff.
0095     disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
0096     print(" DB job ", job_id, mps_index)
0097 
0098     # check if it is a HTCondor job already moved to "history"
0099     userlog = os.path.join("jobData", lib.JOBDIR[mps_index], "HTCJOB")
0100     condor_h = subprocess.check_output(["condor_history", job_id, "-limit", "1",
0101                                         "-userlog", userlog,
0102                                         "-af:j", "JobStatus", "RemoteSysCpu"],
0103                                        stderr = subprocess.STDOUT).decode()
0104     if len(condor_h.strip()) > 0:
0105         job_id, status, cpu_time = condor_h.split()
0106         status = htcondor_jobstatus[status]
0107         lib.JOBSTATUS[mps_index] = disabled + status
0108         fill_time_info(mps_index, status, float(cpu_time))
0109         submitted_jobs.pop(job_id)
0110         continue
0111 
0112     if "RUN" in lib.JOBSTATUS[mps_index]:
0113         print("WARNING: Job ", mps_index, end=' ')
0114         print("in state RUN, neither found by htcondor, nor bjobs, nor find", end=' ')
0115         print("LSFJOB directory!")
0116 
0117 
0118 ################################################################################
0119 # check for orphaned jobs
0120 for job_id, mps_index in submitted_jobs.items():
0121     for status in ("SETUP", "DONE", "FETCH", "TIMEL", "SUBTD"):
0122         if status in lib.JOBSTATUS[mps_index]:
0123             print("Funny entry index", mps_index, " job", lib.JOBID[mps_index], end=' ')
0124             print(" status", lib.JOBSTATUS[mps_index])
0125 
0126 
0127 lib.write_db()