1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#!/usr/bin/env python3
from builtins import range
import os
import re
import subprocess
import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib
def fill_time_info(mps_index, status, cpu_time):
"""Fill timing info in the database for `mps_index`.
Arguments:
- `mps_index`: index in the MPS database
- `status`: job status
- `cpu_time`: extracted CPU timing information
"""
cpu_time = int(round(cpu_time)) # care only about seconds for now
if status in ("RUN", "DONE"):
if cpu_time > 0:
diff = cpu_time - lib.JOBRUNTIME[mps_index]
lib.JOBRUNTIME[mps_index] = cpu_time
lib.JOBHOST[mps_index] = "+"+str(diff)
lib.JOBINCR[mps_index] = diff
else:
lib.JOBRUNTIME[mps_index] = 0
lib.JOBINCR[mps_index] = 0
################################################################################
# mapping of HTCondor status codes to MPS status
htcondor_jobstatus = {"1": "PEND", # Idle
"2": "RUN", # Running
"3": "EXIT", # Removed
"4": "DONE", # Completed
"5": "PEND", # Held
"6": "RUN", # Transferring output
"7": "PEND"} # Suspended
################################################################################
# collect submitted jobs (use 'in' to handle composites, e.g. DISABLEDFETCH)
lib = mpslib.jobdatabase()
lib.read_db()
submitted_jobs = {}
for i in range(len(lib.JOBID)):
submitted = True
for status in ("SETUP", "OK", "DONE", "FETCH", "ABEND", "WARN", "FAIL"):
if status in lib.JOBSTATUS[i]:
submitted = False
break
if submitted:
submitted_jobs[lib.JOBID[i]] = i
print("submitted jobs:", len(submitted_jobs))
################################################################################
# deal with submitted jobs by looking into output of shell (condor_q)
if len(submitted_jobs) > 0:
job_status = {}
condor_q = subprocess.check_output(["condor_q", "-af:j",
"JobStatus", "RemoteSysCpu"],
stderr = subprocess.STDOUT).decode()
for line in condor_q.splitlines():
job_id, status, cpu_time = line.split()
job_status[job_id] = {"status": htcondor_jobstatus[status],
"cpu": float(cpu_time)}
for job_id, job_info in job_status.items():
mps_index = submitted_jobs.get(job_id, -1)
# check for disabled Jobs
disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
# continue with next batch job if not found or not interesting
if mps_index == -1:
print("mps_update.py - the job", job_id, end=' ')
print("was not found in the JOBID array")
continue
else: # pop entry from submitted jobs
submitted_jobs.pop(job_id)
# if found update Joblists for mps.db
lib.JOBSTATUS[mps_index] = disabled+job_info["status"]
fill_time_info(mps_index, job_info["status"], job_info["cpu"])
################################################################################
# loop over remaining jobs to see whether they are done
submitted_jobs_copy = { k:v for k,v in submitted_jobs.items() }
for job_id, mps_index in submitted_jobs_copy.items(): # IMPORTANT to copy here (no iterator!)
# check if current job is disabled. Print stuff.
disabled = "DISABLED" if "DISABLED" in lib.JOBSTATUS[mps_index] else ""
print(" DB job ", job_id, mps_index)
# check if it is a HTCondor job already moved to "history"
userlog = os.path.join("jobData", lib.JOBDIR[mps_index], "HTCJOB")
condor_h = subprocess.check_output(["condor_history", job_id, "-limit", "1",
"-userlog", userlog,
"-af:j", "JobStatus", "RemoteSysCpu"],
stderr = subprocess.STDOUT).decode()
if len(condor_h.strip()) > 0:
job_id, status, cpu_time = condor_h.split()
status = htcondor_jobstatus[status]
lib.JOBSTATUS[mps_index] = disabled + status
fill_time_info(mps_index, status, float(cpu_time))
submitted_jobs.pop(job_id)
continue
if "RUN" in lib.JOBSTATUS[mps_index]:
print("WARNING: Job ", mps_index, end=' ')
print("in state RUN, neither found by htcondor, nor bjobs, nor find", end=' ')
print("LSFJOB directory!")
################################################################################
# check for orphaned jobs
for job_id, mps_index in submitted_jobs.items():
for status in ("SETUP", "DONE", "FETCH", "TIMEL", "SUBTD"):
if status in lib.JOBSTATUS[mps_index]:
print("Funny entry index", mps_index, " job", lib.JOBID[mps_index], end=' ')
print(" status", lib.JOBSTATUS[mps_index])
lib.write_db()
|