Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339
#!/usr/bin/env python3

from builtins import range
import os
import re
import sys
import shutil
import tarfile
import argparse
import subprocess
import Alignment.MillePedeAlignmentAlgorithm.mpslib.tools as mps_tools
import Alignment.MillePedeAlignmentAlgorithm.mpslib.Mpslibclass as mpslib

parser = argparse.ArgumentParser(description = "Setup local mps database")
parser.add_argument("-m", "--setup-merge", dest = "setup_merge",
                    action = "store_true", default = False,
                    help = "setup pede merge job")
parser.add_argument("-a", "--append", action = "store_true", default = False,
                    help = "append jobs to existing list")
parser.add_argument("-M", "--memory", type = int, # seems to be obsolete
                    help = "memory (MB) to be allocated for pede")
parser.add_argument("-N", "--name", # remove restrictions on job name?
                    help = ("name to be assigned to the jobs; Whitespaces and "
                            "colons are not allowed"))
parser.add_argument("-w", "--weight", type = float,
                    help = "assign statistical weight")
parser.add_argument("-e", "--max-events", dest = "max_events", type = int,
                    help = "maximum number of events to process")

parser.add_argument("batch_script",
                    help = "path to the mille batch script template")
parser.add_argument("config_template",
                    help = "path to the config template")
parser.add_argument("input_file_list",
                    help = "path to the input file list")
parser.add_argument("n_jobs", type = int,
                    help = "number of jobs assigned to this dataset")
parser.add_argument("job_class",
                    help=("can be any of the normal LSF queues (8nm, 1nh, 8nh, "
                    "1nd, 2nd, 1nw, 2nw), special CAF queues (cmscaf1nh, "
                    "cmscaf1nd, cmscaf1nw) and special CAF pede queues "
                    "(cmscafspec1nh, cmscafspec1nd, cmscafspec1nw); if it "
                    "contains a ':' the part before ':' defines the class for "
                    "mille jobs and the part after defines the pede job class"))
parser.add_argument("job_name",
                    help = "name assigned to batch jobs")
parser.add_argument("merge_script",
                    help = "path to the pede batch script template")
parser.add_argument("mss_dir",
                    help = "name of the mass storage directory")

args = parser.parse_args(sys.argv[1:])


# setup mps database
lib = mpslib.jobdatabase()
lib.batchScript = args.batch_script
lib.cfgTemplate = args.config_template
lib.infiList = args.input_file_list
lib.nJobs = args.n_jobs
lib.classInf = args.job_class
lib.addFiles = args.job_name
lib.driver = "merge" if args.setup_merge else ""
lib.mergeScript = args.merge_script
lib.mssDirPool = ""
lib.mssDir = args.mss_dir
lib.pedeMem = args.memory


if not os.access(args.batch_script, os.R_OK):
    print("Bad 'batch_script' script name", args.batch_script)
    sys.exit(1)

if not os.access(args.config_template, os.R_OK):
    print("Bad 'config_template' file name", args.config_template)
    sys.exit(1)

if not os.access(args.input_file_list, os.R_OK):
    print("Bad input list file", args.input_file_list)
    sys.exit(1)

# ignore 'append' flag if mps database is not yet created
if not os.access("mps.db", os.R_OK): args.append = False

allowed_mille_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
                         "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
                         "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
                         "cmsexpress","htcondor_cafalca_espresso","htcondor_espresso",
                         "htcondor_cafalca_microcentury","htcondor_microcentury",
                         "htcondor_cafalca_longlunch", "htcondor_longlunch",
                         "htcondor_cafalca_workday", "htcondor_workday",
                         "htcondor_cafalca_tomorrow", "htcondor_tomorrow",
                         "htcondor_cafalca_testmatch", "htcondor_testmatch",
                         "htcondor_cafalca_nextweek", "htcondor_nextweek")
if lib.get_class("mille") not in allowed_mille_classes:
    print("Bad job class for mille in class", args.job_class)
    print("Allowed classes:")
    for mille_class in allowed_mille_classes:
        print(" -", mille_class)
    sys.exit(1)

allowed_pede_classes = ("lxplus", "cmscaf1nh", "cmscaf1nd", "cmscaf1nw",
                        "cmscafspec1nh", "cmscafspec1nd", "cmscafspec1nw",
                        "8nm", "1nh", "8nh", "1nd", "2nd", "1nw", "2nw",
                        "htcondor_bigmem_espresso",
                        "htcondor_bigmem_microcentury",
                        "htcondor_bigmem_longlunch",
                        "htcondor_bigmem_workday",
                        "htcondor_bigmem_tomorrow",
                        "htcondor_bigmem_testmatch",
                        "htcondor_bigmem_nextweek")
if lib.get_class("pede") not in allowed_pede_classes:
    print("Bad job class for pede in class", args.job_class)
    print("Allowed classes:")
    for pede_class in allowed_pede_classes:
        print(" -", pede_class)
    sys.exit(1)

if args.setup_merge:
    if args.merge_script == "":
        args.merge_script = args.batch_script + "merge"
    if not os.access(args.merge_script, os.R_OK):
        print("Bad merge script file name", args.merge_script)
        sys.exit(1)

if args.mss_dir.strip() != "":
    if ":" in args.mss_dir:
        lib.mssDirPool = args.mss_dir.split(":")
        lib.mssDirPool, args.mss_dir = lib.mssDirPool[0], ":".join(lib.mssDirPool[1:])
        lib.mssDir = args.mss_dir

pedeMemMin = 1024 # Minimum memory allocated for pede: 1024MB=1GB

# Try to guess the memory requirements from the pede executable name.
# 2.5GB is used as default otherwise.
# AP - 23.03.2010
cms_process = mps_tools.get_process_object(args.config_template)
pedeMemDef = cms_process.AlignmentProducer.algoConfig.pedeSteerer.pedeCommand.value()
pedeMemDef = os.path.basename(pedeMemDef) # This is the pede executable (only the file name, eg "pede_4GB").
pedeMemDef = pedeMemDef.split("_")[-1]
pedeMemDef = pedeMemDef.replace("GB", "")
try:
    pedeMemDef = 1024*float(pedeMemDef)
    if pedeMemDef < pedeMemMin: pedeMemDef = pedeMemMin # pedeMemDef must be >= pedeMemMin.
except ValueError:
    pedeMemDef = int(1024*2.5)


# Allocate memory for the pede job.
# The value specified by the user (-M option) prevails on the one evinced from the executable name.
# AP - 23.03.2010
if not args.memory or args.memory < pedeMemMin:
    print("Memory request ({}) is < {}, using {}.".format(args.memory, pedeMemMin, pedeMemDef), end=' ')
    lib.pedeMem = args.memory = pedeMemDef

# Create the job directories
nJobExist = 0
if args.append and os.path.isdir("jobData"):
    # Append mode, and "jobData" exists. Find the highest existing job number
    jobs = os.listdir("jobData")
    job_regex = re.compile(r"job(\d+)") # can have any number of digits
    existing_jobs_set = set()
    for item in jobs:
        job_regex = re.compile(r"job(\d+)")
        x = job_regex.search(item)
        if x:
            #print(x.group(1))
            existing_jobs_set.add(int(x.group(1)))
    nJobExist = max(existing_jobs_set)

for j in range(1, args.n_jobs + 1):
    i = j+nJobExist
    jobdir = "job{0:03d}".format(i)
    os.makedirs(os.path.join("jobData", jobdir))

# build the absolute job directory path (needed by mps_script)
theJobData = os.path.abspath("jobData")

if args.append:
    # save current values
    tmpBatchScript = lib.batchScript
    tmpCfgTemplate = lib.cfgTemplate
    tmpInfiList    = lib.infiList
    tmpNJobs       = lib.nJobs
    tmpClass       = lib.classInf
    tmpMergeScript = lib.mergeScript
    tmpDriver      = lib.driver

    # Read DB file
    lib.read_db()

    # check if last job is a merge job
    if lib.JOBDIR[lib.nJobs] == "jobm":
        # remove the merge job
        lib.JOBDIR.pop()
        lib.JOBID.pop()
        lib.JOBSTATUS.pop()
        lib.JOBNTRY.pop()
        lib.JOBRUNTIME.pop()
        lib.JOBNEVT.pop()
        lib.JOBHOST.pop()
        lib.JOBINCR.pop()
        lib.JOBREMARK.pop()
        lib.JOBSP1.pop()
        lib.JOBSP2.pop()
        lib.JOBSP3.pop()

    # Restore variables
    lib.batchScript = tmpBatchScript
    lib.cfgTemplate = tmpCfgTemplate
    lib.infiList    = tmpInfiList
    lib.nJobs       = tmpNJobs
    lib.classInf    = tmpClass
    lib.mergeScript = tmpMergeScript
    lib.driver      = tmpDriver


# Create (update) the local database
for j in range(1, args.n_jobs + 1):
    i = j+nJobExist
    jobdir = "job{0:03d}".format(i)
    lib.JOBDIR.append(jobdir)
    lib.JOBID.append("")
    lib.JOBSTATUS.append("SETUP")
    lib.JOBNTRY.append(0)
    lib.JOBRUNTIME.append(0)
    lib.JOBNEVT.append(0)
    lib.JOBHOST.append("")
    lib.JOBINCR.append(0)
    lib.JOBREMARK.append("")
    lib.JOBSP1.append("")
    if args.weight is not None:
        lib.JOBSP2.append(str(args.weight))
    else:
        lib.JOBSP2.append("")
    lib.JOBSP3.append(args.name)

    # create the split card files
    cmd = ["mps_split.pl", args.input_file_list,
           str(j if args.max_events is None else 1),
           str(args.n_jobs if args.max_events is None else 1)]
    #print(" ".join(cmd)+" > jobData/{}/theSplit".format(jobdir))
    with open("jobData/{}/theSplit".format(jobdir), "w") as f:
        try:
            subprocess.check_call(cmd, stdout = f)
        except subprocess.CalledProcessError:
            print("              split failed")
            lib.JOBSTATUS[i-1] = "FAIL"
    theIsn = "{0:03d}".format(i)

    # create the cfg file
    skip_events = 0
    max_events = 0
    if args.max_events is not None:
        chunk_size = int(args.max_events/args.n_jobs)
        skip_events = chunk_size*(j-1)
        max_events = (args.max_events - (args.n_jobs-1)*chunk_size
                      if j == args.n_jobs    # last job gets the remaining events
                      else chunk_size)
    
    lib.mps_splice(args.config_template,
                   "jobData/{}/theSplit".format(jobdir),
                   "jobData/{}/the.py".format(jobdir),
                   theIsn)


    # create the run script
    print("mps_script.pl {}  jobData/{}/theScript.sh {}/{} the.py jobData/{}/theSplit {} {} {}".format(args.batch_script, jobdir, theJobData, jobdir, jobdir, theIsn, args.mss_dir, lib.mssDirPool))
    mps_tools.run_checked(["mps_script.pl", args.batch_script,
                           "jobData/{}/theScript.sh".format(jobdir),
                           os.path.join(theJobData, jobdir), "the.py",
                           "jobData/{}/theSplit".format(jobdir), theIsn,
                           args.mss_dir, lib.mssDirPool])


# create the merge job entry. This is always done. Whether it is used depends on the "merge" option.
jobdir = "jobm";
lib.JOBDIR.append(jobdir)
lib.JOBID.append("")
lib.JOBSTATUS.append("SETUP")
lib.JOBNTRY.append(0)
lib.JOBRUNTIME.append(0)
lib.JOBNEVT.append(0)
lib.JOBHOST.append("")
lib.JOBINCR.append(0)
lib.JOBREMARK.append("")
lib.JOBSP1.append("")
lib.JOBSP2.append("")
lib.JOBSP3.append("")

lib.write_db();

# if merge mode, create the directory and set up contents
if args.setup_merge:
    shutil.rmtree("jobData/jobm", ignore_errors = True)
    os.makedirs("jobData/jobm")
    print("Create dir jobData/jobm")

    # We want to merge old and new jobs
    nJobsMerge = args.n_jobs+nJobExist

    # create  merge job cfg
    print("mps_merge.py -w {} jobData/jobm/alignment_merge.py {}/jobm {}".format(args.config_template, theJobData, nJobsMerge))
    mps_tools.run_checked(["mps_merge.py", "-w", args.config_template,
                           "jobData/jobm/alignment_merge.py",
                           os.path.join(theJobData, "jobm"), str(nJobsMerge)])

    # create merge job script
    print("mps_scriptm.pl {} jobData/jobm/theScript.sh {}/jobm alignment_merge.py {} {} {}".format(args.merge_script, theJobData, nJobsMerge, args.mss_dir, lib.mssDirPool))
    mps_tools.run_checked(["mps_scriptm.pl", args.merge_script,
                           "jobData/jobm/theScript.sh",
                           os.path.join(theJobData, "jobm"),
                           "alignment_merge.py", str(nJobsMerge), args.mss_dir,
                           lib.mssDirPool])


# Create a backup of batchScript, cfgTemplate, infiList (and mergeScript)
#   in jobData
backups = os.listdir("jobData")
bu_regex = re.compile(r"ScriptsAndCfg([0-9]{3})\.tar")
existing_backups = [bu_regex.search(item) for item in backups]
existing_backups = [int(bu.group(1)) for bu in existing_backups if bu is not None]
i = (0 if len(existing_backups) == 0 else sorted(existing_backups)[-1]) + 1
ScriptCfg = "ScriptsAndCfg{0:03d}".format(i)
ScriptCfg = os.path.join("jobData", ScriptCfg)
os.makedirs(ScriptCfg)
for f in (args.batch_script, args.config_template, args.input_file_list):
    shutil.copy2(f, ScriptCfg)
if args.setup_merge:
    shutil.copy2(args.merge_script, ScriptCfg)

with tarfile.open(ScriptCfg+".tar", "w") as tar: tar.add(ScriptCfg)
shutil.rmtree(ScriptCfg)


# Write to DB
lib.write_db();
lib.read_db();
lib.print_memdb();