Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 11:56:36

0001 #!/usr/bin/env python3
0002 from __future__ import print_function
0003 from builtins import range
0004 import os
0005 import re
0006 import subprocess
0007 import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
0008 
0009 
0010 def fill_time_info(mps_index, status, cpu_time):
0011     """Fill timing info in the database for `mps_index`.
0012 
0013     Arguments:
0014     - `mps_index`: index in the MPS database
0015     - `status`: job status
0016     - `cpu_time`: extracted CPU timing information
0017     """
0018 
0019     cpu_time = int(round(cpu_time))  # care only about seconds for now
0020     if status in ("RUN", "DONE"):
0021         if cpu_time > 0:
0022             diff = cpu_time - lib.JOBRUNTIME[mps_index]
0023             lib.JOBRUNTIME[mps_index] = cpu_time
0024             lib.JOBHOST[mps_index] = "+"+str(diff)
0025             lib.JOBINCR[mps_index] = diff
0026         else:
0027             lib.JOBRUNTIME[mps_index] = 0
0028             lib.JOBINCR[mps_index] = 0
0029 
0030 
0031 
0032 ################################################################################
0033 # mapping of HTCondor status codes to MPS status
0034 htcondor_jobstatus = {"1": "PEND", # Idle
0035                       "2": "RUN",  # Running
0036                       "3": "EXIT", # Removed
0037                       "4": "DONE", # Completed
0038                       "5": "PEND", # Held
0039                       "6": "RUN",  # Transferring output
0040                       "7": "PEND"} # Suspended
0041 
0042 
0043 ################################################################################
0044 # collect submitted jobs (use 'in' to handle composites, e.g. DISABLEDFETCH)
0045 lib = mpslib.jobdatabase()
0046 lib.read_db()
0047 
0048 submitted_jobs = {}
0049 for i in range(len(lib.JOBID)):
0050     submitted = True
0051     for status in ("SETUP", "OK", "DONE", "FETCH", "ABEND", "WARN", "FAIL"):
0052         if status in lib.JOBSTATUS[i]:
0053             submitted = False
0054             break
0055     if submitted:
0056         submitted_jobs[lib.JOBID[i]] = i
0057 print("submitted jobs:", len(submitted_jobs))
0058 
0059 
0060 ################################################################################
0061 # deal with submitted jobs by looking into output of shell (condor_q)
0062 if len(submitted_jobs) > 0:
0063     job_status = {}
0064     condor_q = subprocess.check_output(["condor_q", "-af:j",
0065                                         "JobStatus", "RemoteSysCpu"],
0066                                        stderr = subprocess.STDOUT).decode()
0067     for line in condor_q.splitlines():
0068         job_id, status, cpu_time = line.split()
0069         job_status[job_id] = {"status": htcondor_jobstatus[status],
0070                               "cpu": float(cpu_time)}
0071 
0072     for job_id, job_info in job_status.items():
0073         mps_index = submitted_jobs.get(job_id, -1)
0074         # check for disabled Jobs
0075         disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
0076 
0077         # continue with next batch job if not found or not interesting
0078         if mps_index == -1:
0079             print("mps_update.py - the job", job_id, end=' ')
0080             print("was not found in the JOBID array")
0081             continue
0082         else:                   # pop entry from submitted jobs
0083             submitted_jobs.pop(job_id)
0084 
0085 
0086         # if found update Joblists for mps.db
0087         lib.JOBSTATUS[mps_index] = disabled+job_info["status"]
0088         fill_time_info(mps_index, job_info["status"], job_info["cpu"])
0089 
0090 
0091 ################################################################################
0092 # loop over remaining jobs to see whether they are done
0093 submitted_jobs_copy = { k:v for k,v in submitted_jobs.items() }
0094 for job_id, mps_index in submitted_jobs_copy.items(): # IMPORTANT to copy here (no iterator!)
0095     # check if current job is disabled. Print stuff.
0096     disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
0097     print(" DB job ", job_id, mps_index)
0098 
0099     # check if it is a HTCondor job already moved to "history"
0100     userlog = os.path.join("jobData", lib.JOBDIR[mps_index], "HTCJOB")
0101     condor_h = subprocess.check_output(["condor_history", job_id, "-limit", "1",
0102                                         "-userlog", userlog,
0103                                         "-af:j", "JobStatus", "RemoteSysCpu"],
0104                                        stderr = subprocess.STDOUT).decode()
0105     if len(condor_h.strip()) > 0:
0106         job_id, status, cpu_time = condor_h.split()
0107         status = htcondor_jobstatus[status]
0108         lib.JOBSTATUS[mps_index] = disabled + status
0109         fill_time_info(mps_index, status, float(cpu_time))
0110         submitted_jobs.pop(job_id)
0111         continue
0112 
0113     if "RUN" in lib.JOBSTATUS[mps_index]:
0114         print("WARNING: Job ", mps_index, end=' ')
0115         print("in state RUN, neither found by htcondor, nor bjobs, nor find", end=' ')
0116         print("LSFJOB directory!")
0117 
0118 
0119 ################################################################################
0120 # check for orphaned jobs
0121 for job_id, mps_index in submitted_jobs.items():
0122     for status in ("SETUP", "DONE", "FETCH", "TIMEL", "SUBTD"):
0123         if status in lib.JOBSTATUS[mps_index]:
0124             print("Funny entry index", mps_index, " job", lib.JOBID[mps_index], end=' ')
0125             print(" status", lib.JOBSTATUS[mps_index])
0126 
0127 
0128 lib.write_db()