1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
import FWCore.ParameterSet.Config as cms
import os
from HeterogeneousCore.Common.PlatformStatus import PlatformStatus
class ModuleTypeResolverAlpaka:
def __init__(self, accelerators, backend, synchronize):
# first element is used as the default if nothing is set
self._valid_backends = []
if "gpu-nvidia" in accelerators:
self._valid_backends.append("cuda_async")
if "gpu-amd" in accelerators:
self._valid_backends.append("rocm_async")
if "cpu" in accelerators:
self._valid_backends.append("serial_sync")
if len(self._valid_backends) == 0:
raise cms.EDMException(cms.edm.errors.UnavailableAccelerator, "ModuleTypeResolverAlpaka had no backends available because of the combination of the job configuration and accelerator availability of on the machine. The job sees {} accelerators.".format(", ".join(accelerators)))
if backend is not None:
if not backend in self._valid_backends:
raise cms.EDMException(cms.edm.errors.UnavailableAccelerator, "The ProcessAcceleratorAlpaka was configured to use {} backend, but that backend is not available because of the combination of the job configuration and accelerator availability on the machine. The job was configured to use {} accelerators, which translates to {} Alpaka backends.".format(
backend, ", ".join(accelerators), ", ".join(self._valid_backends)))
if backend != self._valid_backends[0]:
self._valid_backends.remove(backend)
self._valid_backends.insert(0, backend)
self._synchronize = synchronize
def plugin(self):
return "ModuleTypeResolverAlpaka"
def setModuleVariant(self, module):
if module.type_().endswith("@alpaka"):
defaultBackend = self._valid_backends[0]
if hasattr(module, "alpaka"):
# Ensure the untrackedness already here, because the
# C++ ModuleTypeResolverAlpaka relies on the
# untrackedness (before the configuration validation)
if module.alpaka.isTracked():
raise cms.EDMException(cms.edm.errors.Configuration, "The 'alpaka' PSet in module '{}' is tracked, but it should be untracked".format(module.label()))
if hasattr(module.alpaka, "backend"):
if module.alpaka.backend == "":
module.alpaka.backend = defaultBackend
elif module.alpaka.backend.value() not in self._valid_backends:
raise cms.EDMException(cms.edm.errors.UnavailableAccelerator, "Module {} has the Alpaka backend set explicitly, but its accelerator is not available for the job because of the combination of the job configuration and accelerator availability on the machine. The following Alpaka backends are available for the job {}.".format(module.label_(), ", ".join(self._valid_backends)))
else:
module.alpaka.backend = cms.untracked.string(defaultBackend)
else:
module.alpaka = cms.untracked.PSet(
backend = cms.untracked.string(defaultBackend)
)
isDefaultValue = lambda v: \
isinstance(v, type(cms.optional.untracked.bool)) \
and not v.isTracked() \
and v.isCompatibleCMSType(cms.bool)
if not hasattr(module.alpaka, "synchronize") or isDefaultValue(module.alpaka.synchronize):
module.alpaka.synchronize = cms.untracked.bool(self._synchronize)
class ProcessAcceleratorAlpaka(cms.ProcessAccelerator):
"""ProcessAcceleratorAlpaka itself does not define or inspect
availability of any accelerator devices. It merely sets up
necessary Alpaka infrastructure based on the availability of
accelerators that the concrete ProcessAccelerators (like
ProcessAcceleratorCUDA) define.
"""
def __init__(self):
super(ProcessAcceleratorAlpaka, self).__init__()
self._backend = None
self._synchronize = False
# User-facing interface
def setBackend(self, backend):
self._backend = backend
def setSynchronize(self, synchronize):
self._synchronize = synchronize
# Framework-facing interface
def moduleTypeResolver(self, accelerators):
return ModuleTypeResolverAlpaka(accelerators, self._backend, self._synchronize)
def apply(self, process, accelerators):
# Propagate the AlpakaService messages through the MessageLogger
if not hasattr(process.MessageLogger, "AlpakaService"):
process.MessageLogger.AlpakaService = cms.untracked.PSet()
# The CPU backend is effectively always available, ensure the AlpakaServiceSerialSync is loaded
if not hasattr(process, "AlpakaServiceSerialSync"):
from HeterogeneousCore.AlpakaServices.AlpakaServiceSerialSync_cfi import AlpakaServiceSerialSync
process.add_(AlpakaServiceSerialSync)
# Check if CUDA is available, and if the system has at least one usable NVIDIA GPU
try:
if not "gpu-nvidia" in accelerators:
raise False
from HeterogeneousCore.AlpakaServices.AlpakaServiceCudaAsync_cfi import AlpakaServiceCudaAsync
except:
# CUDA is not available, do not load the AlpakaServiceCudaAsync
if hasattr(process, "AlpakaServiceCudaAsync"):
del process.AlpakaServiceCudaAsync
else:
# CUDA is available, ensure the AlpakaServiceCudaAsync is loaded
if not hasattr(process, "AlpakaServiceCudaAsync"):
process.add_(AlpakaServiceCudaAsync)
# Check if ROCm is available, and if the system has at least one usable AMD GPU
try:
if not "gpu-amd" in accelerators:
raise False
from HeterogeneousCore.AlpakaServices.AlpakaServiceROCmAsync_cfi import AlpakaServiceROCmAsync
except:
# ROCm is not available, do not load the AlpakaServiceROCmAsync
if hasattr(process, "AlpakaServiceROCmAsync"):
del process.AlpakaServiceROCmAsync
else:
# ROCm is available, ensure the AlpakaServiceROCmAsync is loaded
if not hasattr(process, "AlpakaServiceROCmAsync"):
process.add_(AlpakaServiceROCmAsync)
# Ensure this module is kept in the configuration when dumping it
cms.specialImportRegistry.registerSpecialImportForType(ProcessAcceleratorAlpaka, "from HeterogeneousCore.AlpakaCore.ProcessAcceleratorAlpaka import ProcessAcceleratorAlpaka")
|