Back to home page

Project CMSSW displayed by LXR

 
 

    


File indexing completed on 2024-04-06 12:23:30

0001 #!/usr/bin/env python3
0002 
0003 from __future__ import print_function
0004 from PhysicsTools.HeppyCore.utils.edmIntegrityCheck import PublishToFileSystem, IntegrityCheck
0005 import das
0006 
0007 import copy, os
0008 
0009 if __name__ == '__main__':
0010     
0011     from optparse import OptionParser, OptionGroup
0012     
0013     usage = """usage: %prog [options] /Sample/Name/On/Castor
0014 
0015 e.g.: %prog -u wreece -p -w 'PFAOD_*.root' /MultiJet/Run2011A-05Aug2011-v1/AOD/V2
0016     """
0017     das = das.DASOptionParser(usage=usage)
0018     group = OptionGroup(das.parser,'edmIntegrityCheck Options','Options related to checking files on CASTOR')
0019     
0020     group.add_option("-d", "--device", dest="device", default='cmst3',help="The storage device to write to, e.g. 'cmst3'")
0021     group.add_option("-n", "--name", dest="name", default=None,help='The name of the dataset in DAS. Will be guessed if not specified')
0022     group.add_option("-p", "--printout", dest="printout", default=False, action='store_true',help='Print a report to stdout')    
0023     group.add_option("-r", "--recursive", dest="resursive", default=False, action='store_true',help='Walk the mass storage device recursively')
0024     group.add_option("-u", "--user", dest="user", default=os.environ['USER'],help='The username to use when looking at mass storage devices')
0025     group.add_option("-w", "--wildcard", dest="wildcard", default=None,help='A UNIX style wildcard to specify which files to check')
0026     group.add_option("--update", dest="update", default=False, action='store_true',help='Only update the status of corrupted files')
0027     group.add_option("-t","--timeout", dest="timeout", default=-1, type=int, help='Set a timeout on the edmFileUtil calls')
0028     group.add_option("--min-run", dest="min_run", default=-1, type=int, help='When querying DBS, require runs >= than this run')
0029     group.add_option("--max-run", dest="max_run", default=-1, type=int, help='When querying DBS, require runs <= than this run')
0030     group.add_option("--max_threads", dest="max_threads", default=None,help='The maximum number of threads to use')
0031     das.parser.add_option_group(group)    
0032     (opts, datasets) = das.get_opt()
0033 
0034     if len(datasets)==0:
0035         print(das.parser.print_help())
0036         print()
0037         print('need to provide a dataset in argument')
0038 
0039     def work(d,op):
0040         tokens = d.split('%')
0041         if len(tokens) == 2:
0042             op.user = tokens[0]
0043             d = tokens[1]
0044         
0045         check = IntegrityCheck(d,op)
0046         pub = PublishToFileSystem(check)
0047 
0048         previous = None
0049         if op.update:
0050             previous = pub.get(check.directory)
0051 
0052         check.test(previous = previous, timeout = op.timeout)
0053         if op.printout:
0054             check.report()
0055         report = check.structured()
0056         pub.publish(report)
0057 
0058         return d
0059 
0060     def callback(result):
0061         print('Checking thread done: ',str(result))
0062     
0063     #submit the main work in a multi-threaded way
0064 
0065     if len(datasets) == 1:
0066         d = datasets[0]
0067         work(d, copy.deepcopy(opts))
0068     else:
0069         import multiprocessing
0070         if opts.max_threads is not None and opts.max_threads:
0071             opts.max_threads = int(opts.max_threads)
0072         pool = multiprocessing.Pool(processes=opts.max_threads)
0073 
0074         for d in datasets:
0075             pool.apply_async(work, args=(d,copy.deepcopy(opts)),callback=callback)
0076         pool.close()
0077         pool.join()