Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309
#! /usr/bin/env python3

import argparse
import glob
import json
import os
import re
import shutil
import socket
import subprocess
import sys

def cmsRun(config: str, verbose: bool, **args):
    cmd = [ 'cmsRun', config ] + [ arg + '=' + str(val) for (arg, val) in args.items() ]
    sys.stdout.write(' \\\n  '.join(cmd))
    sys.stdout.write('\n\n')
    if verbose:
        status = subprocess.run(cmd, stdout=None, stderr=None)
    else:
        status = subprocess.run(cmd, capture_output=True, text=True)

    # handle error conditions
    if status.returncode < 0:
        sys.stderr.write('error: cmsRun was killed by signal %d\n' % -status.returncode)
        if not verbose:
            sys.stderr.write('\n')
            sys.stderr.write(status.stderr)
        sys.exit(status.returncode)
    elif status.returncode > 0:
        sys.stderr.write('error: cmsRun exited with error code %d\n' % status.returncode)
        if not verbose:
            sys.stderr.write('\n')
            sys.stderr.write(status.stderr)
        sys.exit(status.returncode)


class LuminosityBlockRange:
    def __init__(self, value: str = '') -> None:
        self.min_run = 0
        self.max_run = 0
        self.min_lumi = 0
        self.max_lumi = 0
        if value and value != 'all':
            ((self.min_run, self.min_lumi), (self.max_run, self.max_lumi)) = LuminosityBlockRange.parse_range(value)

    @staticmethod
    def parse_value(value: str) -> int:
        return 0 if value in ('', 'min', 'max') else int(value)

    @staticmethod
    def parse_value_pair(value: str) -> (int, int):
        if value.count(':') > 1:
            raise ValueError('invalid syntax')
        (first, second) = value.split(':') if ':' in value else ('', value)
        return LuminosityBlockRange.parse_value(first), LuminosityBlockRange.parse_value(second)

    @staticmethod
    def parse_range(value: str) -> ((int, int), (int, int)):
        if value.count('-') > 1:
            raise ValueError('invalid syntax')
        (first, second) = value.split('-') if '-' in value else (value, value)
        return LuminosityBlockRange.parse_value_pair(first), LuminosityBlockRange.parse_value_pair(second)

    def is_in_range(self, run: int, lumi: int) -> bool:
        return (
            (self.min_run == 0 or self.min_run == run) and (self.min_lumi == 0 or self.min_lumi <= lumi) or
            (self.min_run != 0 and self.min_run < run)
        ) and (
            (self.max_run == 0 or self.max_run == run) and (self.max_lumi == 0 or self.max_lumi >= lumi) or
            (self.min_run != 0 and self.max_run > run)
        )


# default values
events_per_file = 100
events_per_lumi = 11655
output_directory = os.getcwd()

parser = argparse.ArgumentParser(description='Convert RAW data from .root format to .raw format.', formatter_class = argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('files', type=str, metavar='FILES', nargs='+', help='input files in .root format')
parser.add_argument('-s', '--source', type=str, dest='raw_data_collection', metavar='TAG', default='rawDataCollector', help='name of the FEDRawDataCollection to be repacked into RAW format')
parser.add_argument('-o', '--output', type=str, dest='output_directory', metavar='PATH', default=os.getcwd(), help='base path to store the output files; subdirectories based on the run number are automatically created')
parser.add_argument('-f', '--events_per_file', type=int, dest='events_per_file', metavar='EVENTS', default=events_per_file, help='split the output into files with at most EVENTS events')
parser.add_argument('-l', '--events_per_lumi', type=int, dest='events_per_lumi', metavar='EVENTS', default=events_per_lumi, help='process at most EVENTS events in each lumisection')
parser.add_argument('-r', '--range', type=LuminosityBlockRange, dest='range', metavar='[RUN:LUMI-RUN:LUMI]', default='all', help='process only the runs and lumisections in the given range')
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', default=False, help='print additional information while processing the input files')
parser.add_argument('-1', '--one-file-per-lumi', action='store_true', dest='one_file_per_lumi', default=False, help='assume that lumisections are not split across files (and disable --events_per_lumi)')

# parse the command line arguments and options
args = parser.parse_args()
if args.output_directory and args.output_directory.endswith('/'):
    args.output_directory = args.output_directory[:-1]

# read the list of input files from the command line arguments
files = [ 'file:' + f if (not ':' in f and not f.startswith('/store/') and os.path.exists(f)) else f for f in args.files ]

# extract the list of runs and lumiections in the input files
class FileInfo(object):
    def __init__(self):
        self.events = 0
        self.files = set()

header  = re.compile(r'^ +Run +Lumi +# Events$')
empty   = re.compile(r'^ *$')
content = {}

for f in files:

    # run edmFileUtil --eventsInLumis ...
    print(f'preprocessing input file {f}')
    output = subprocess.run(['edmFileUtil', '--eventsInLumis', f], capture_output=True, text=True)
    if args.verbose:
        print(output.stdout)

    # handle error conditions
    if output.returncode < 0:
        sys.stderr.write('error: edmFileUtil was killed by signal %d\n' % -output.returncode)
        if not args.verbose:
            sys.stderr.write('\n')
            sys.stderr.write(output.stderr)
        sys.exit(output.returncode)
    elif output.returncode > 0:
        sys.stderr.write('error: edmFileUtil exited with error code %d\n' % output.returncode)
        if not args.verbose:
            sys.stderr.write('\n')
            sys.stderr.write(output.stderr)
        sys.exit(output.returncode)

    # parse the output of edmFileUtil
    parsing = False
    for line in output.stdout.splitlines():
        if not parsing and header.match(line):
            # start parsing
            parsing = True
            continue

        if parsing and empty.match(line):
            # stop parsing
            parsing = False
            continue

        if parsing:
            run, lumi, events = tuple(map(int, line.split()))
            if not args.range.is_in_range(run, lumi):
                print(f'  run {run}, lumisection {lumi} is outside of the given range and will be skipped')
                continue
            if events == 0:
                print(f'  run {run}, lumisection {lumi} is empty and will be skipped')
                continue
            print(f'  run {run}, lumisection {lumi} with {events} events will be processed')
            if not run in content:
                content[run] = {}
            if not lumi in content[run]:
                content[run][lumi] = FileInfo()
            content[run][lumi].events += events
            content[run][lumi].files.add(f)
    print()

# drop empty lumisections
# note: this may no longer be needed, but is left as a cross check
for run in content:
    empty_lumis = [ lumi for lumi in content[run] if content[run][lumi].events == 0 ]
    for lumi in empty_lumis:
        del content[run][lumi]

# drop empty runs
empty_runs = [ run for run in content if not content[run] ]
for run in empty_runs:
    del content[run]

# locate the CMSSW configuration file
config_name = 'HLTrigger/Tools/python/convertToRaw.py'
current_area = os.environ['CMSSW_BASE']
release_area = os.environ['CMSSW_RELEASE_BASE']

config_py = current_area + '/src/' + config_name
if not os.path.exists(config_py):
    config_py = release_area + '/src/' + config_name
if not os.path.exists(config_py):
    sys.stderr.write('error: cannot find the configuration file %s\n' % config_name)
    sys.exit(1)

# convert the input data to FED RAW data format
converted_files = []

# process each run
for run in sorted(content):

    # create the output directory structure
    run_path = args.output_directory + f'/run{run:06d}'
    shutil.rmtree(run_path, ignore_errors=True)
    os.makedirs(run_path)

    if args.one_file_per_lumi:
        # process the whole run
        lumis = sorted(content[run])
        print('found run %d, lumis %d-%d, with %d events' % (run, min(lumis), max(lumis), sum(content[run][lumi].events for lumi in lumis)))
        cmsRun(config_py, args.verbose, inputFiles = ','.join(files), runNumber = run, eventsPerFile = args.events_per_file, rawDataCollection = args.raw_data_collection, outputPath = args.output_directory)
        converted_files = glob.glob(run_path + f'/run{run:06d}_ls{lumi:04d}_*.raw')

    else:
        # process lumisections individualy, then merge the output
        summary = {
            'data': [0, 0, 0, 0],   # [ 'events', 'files', 'lumisections', 'last lumisection' ]
            'definition': run_path + '/jsd/EoR.jsd',
            'source': socket.getfqdn() + '_' + str(os.getpid())
        }

        for lumi in sorted(content[run]):

            # process individual lumisections
            print('found run %d, lumi %d, with %d events' % (run, lumi, content[run][lumi].events))
            lumi_path = args.output_directory + f'/run{run:06d}_ls{lumi:04d}'
            shutil.rmtree(lumi_path, ignore_errors=True)
            os.makedirs(lumi_path)
            cmsRun(config_py, args.verbose, inputFiles = ','.join(content[run][lumi].files), runNumber = run, lumiNumber = lumi, eventsPerLumi = args.events_per_lumi, eventsPerFile = args.events_per_file, rawDataCollection = args.raw_data_collection, outputPath = lumi_path)

            # merge all lumisections data

            # number of events expected to be processed
            if args.events_per_lumi < 0:
                expected_events = content[run][lumi].events
            else:
                expected_events = min(args.events_per_lumi, content[run][lumi].events)

            # number of files expected to be created
            expected_files = (expected_events + args.events_per_file - 1) // args.events_per_file

            # find the files produced by the conversion job and move them to the per-run path
            lumi_base_path = args.output_directory + f'/run{run:06d}_ls{lumi:04d}'
            lumi_path = lumi_base_path + f'/run{run:06d}'

            # jsd files
            jsd_path = lumi_path + '/jsd'
            if not os.path.exists(run_path + '/jsd'):
                shutil.move(jsd_path, run_path)
            else:
                shutil.rmtree(jsd_path)

            # lumisection data and EoLS files
            lumi_files = glob.glob(lumi_path + f'/run{run:06d}_ls{lumi:04d}_*')
            for f in lumi_files:
                target = run_path + f.removeprefix(lumi_path)
                shutil.move(f, target)
                if f.endswith('.raw'):
                    converted_files.append(target)

            # read the partial EoR file
            eor_file = lumi_path + f'/run{run:06d}_ls0000_EoR.jsn'
            with open(eor_file) as f:
                eor = json.load(f)
                produced_events = int(eor['data'][0])
                produced_files = int(eor['data'][1])
                produced_lumis = int(eor['data'][2])
                produced_last_lumi = int(eor['data'][3])
                assert produced_events == expected_events
                assert produced_files == expected_files
                assert produced_lumis == 1
                assert produced_last_lumi == lumi
                summary['data'][0] += expected_events
                summary['data'][1] += expected_files
                summary['data'][2] += 1
                summary['data'][3] = lumi
            os.remove(eor_file)

            # remove the intermediate directory
            shutil.rmtree(lumi_base_path, ignore_errors=True)

        # write the final EoR file
        # implemented by hand instead of using json.dump() to match the style used by the DAQ tools
        assert len(converted_files) == summary['data'][1]
        eor_file = run_path + f'/run{run:06d}_ls0000_EoR.jsn'
        with open(eor_file, 'w') as file:
            file.write('{\n   "data" : [ "%d", "%d", "%d", "%d" ],\n   "definition" : "%s",\n   "source" : "%s"\n}\n' % (summary['data'][0], summary['data'][1], summary['data'][2], summary['data'][3], summary['definition'], summary['source']))
            file.close()

    # mark the .raw files as not executable
    for f in converted_files:
        os.chmod(f, 0o644)

    # write a cff file for processing the converted files
    cff_file = args.output_directory + f'/run{run:06d}_cff.py'
    with open(cff_file, 'w') as file:
        file.write("""import FWCore.ParameterSet.Config as cms

from EventFilter.Utilities.FedRawDataInputSource_cfi import source as _source
source = _source.clone(
    eventChunkSize = 200,   # MB
    eventChunkBlock = 200,  # MB
    numBuffers = 4,
    maxBufferedFiles = 4,
    fileListMode = True,
    fileNames = (
%s
    )
)

from EventFilter.Utilities.EvFDaqDirector_cfi import EvFDaqDirector as _EvFDaqDirector
EvFDaqDirector = _EvFDaqDirector.clone(
    buBaseDir = '%s',
    runNumber = %d
)

from EventFilter.Utilities.FastMonitoringService_cfi import FastMonitoringService as _FastMonitoringService
FastMonitoringService = _FastMonitoringService.clone()
""" % ('\n'.join("        '" + f + "'," for f in converted_files), args.output_directory, run))
        file.close()

    # all done