Line Code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
import FWCore.ParameterSet.Config as cms

import os

from HeterogeneousCore.Common.PlatformStatus import PlatformStatus

class ModuleTypeResolverAlpaka:
    def __init__(self, accelerators, backend, synchronize):
        # first element is used as the default if nothing is set
        self._valid_backends = []
        if "gpu-nvidia" in accelerators:
            self._valid_backends.append("cuda_async")
        if "gpu-amd" in accelerators:
            self._valid_backends.append("rocm_async")
        if "cpu" in accelerators:
            self._valid_backends.append("serial_sync")
        if len(self._valid_backends) == 0:
            raise cms.EDMException(cms.edm.errors.UnavailableAccelerator, "ModuleTypeResolverAlpaka had no backends available because of the combination of the job configuration and accelerator availability of on the machine. The job sees {} accelerators.".format(", ".join(accelerators)))
        if backend is not None:
            if not backend in self._valid_backends:
                raise cms.EDMException(cms.edm.errors.UnavailableAccelerator, "The ProcessAcceleratorAlpaka was configured to use {} backend, but that backend is not available because of the combination of the job configuration and accelerator availability on the machine. The job was configured to use {} accelerators, which translates to {} Alpaka backends.".format(
                    backend, ", ".join(accelerators), ", ".join(self._valid_backends)))
            if backend != self._valid_backends[0]:
                self._valid_backends.remove(backend)
                self._valid_backends.insert(0, backend)
        self._synchronize = synchronize

    def plugin(self):
        return "ModuleTypeResolverAlpaka"

    def setModuleVariant(self, module):
        if module.type_().endswith("@alpaka"):
            defaultBackend = self._valid_backends[0]
            if hasattr(module, "alpaka"):
                # Ensure the untrackedness already here, because the
                # C++ ModuleTypeResolverAlpaka relies on the
                # untrackedness (before the configuration validation)
                if module.alpaka.isTracked():
                    raise cms.EDMException(cms.edm.errors.Configuration, "The 'alpaka' PSet in module '{}' is tracked, but it should be untracked".format(module.label()))
                if hasattr(module.alpaka, "backend"):
                    if module.alpaka.backend == "":
                        module.alpaka.backend = defaultBackend
                    elif module.alpaka.backend.value() not in self._valid_backends:
                        raise cms.EDMException(cms.edm.errors.UnavailableAccelerator, "Module {} has the Alpaka backend set explicitly, but its accelerator is not available for the job because of the combination of the job configuration and accelerator availability on the machine. The following Alpaka backends are available for the job {}.".format(module.label_(), ", ".join(self._valid_backends)))
                else:
                    module.alpaka.backend = cms.untracked.string(defaultBackend)
            else:
                module.alpaka = cms.untracked.PSet(
                    backend = cms.untracked.string(defaultBackend)
                )
            isDefaultValue = lambda v: \
                isinstance(v, type(cms.optional.untracked.bool)) \
                and not v.isTracked() \
                and v.isCompatibleCMSType(cms.bool)
            if not hasattr(module.alpaka, "synchronize") or isDefaultValue(module.alpaka.synchronize):
                module.alpaka.synchronize = cms.untracked.bool(self._synchronize)

class ProcessAcceleratorAlpaka(cms.ProcessAccelerator):
    """ProcessAcceleratorAlpaka itself does not define or inspect
    availability of any accelerator devices. It merely sets up
    necessary Alpaka infrastructure based on the availability of
    accelerators that the concrete ProcessAccelerators (like
    ProcessAcceleratorCUDA) define.
    """
    def __init__(self):
        super(ProcessAcceleratorAlpaka, self).__init__()
        self._backend = None
        self._synchronize = False

    # User-facing interface
    def setBackend(self, backend):
        self._backend = backend

    def setSynchronize(self, synchronize):
        self._synchronize = synchronize

    # Framework-facing interface
    def moduleTypeResolver(self, accelerators):
        return ModuleTypeResolverAlpaka(accelerators, self._backend, self._synchronize)

    def apply(self, process, accelerators):
        # Propagate the AlpakaService messages through the MessageLogger
        if not hasattr(process.MessageLogger, "AlpakaService"):
            process.MessageLogger.AlpakaService = cms.untracked.PSet()

        # The CPU backend is effectively always available, ensure the AlpakaServiceSerialSync is loaded
        if not hasattr(process, "AlpakaServiceSerialSync"):
            from HeterogeneousCore.AlpakaServices.AlpakaServiceSerialSync_cfi import AlpakaServiceSerialSync
            process.add_(AlpakaServiceSerialSync)

        # Check if CUDA is available, and if the system has at least one usable NVIDIA GPU
        try:
            if not "gpu-nvidia" in accelerators:
                raise False
            from HeterogeneousCore.AlpakaServices.AlpakaServiceCudaAsync_cfi import AlpakaServiceCudaAsync
        except:
            # CUDA is not available, do not load the AlpakaServiceCudaAsync
            if hasattr(process, "AlpakaServiceCudaAsync"):
                del process.AlpakaServiceCudaAsync
        else:
            # CUDA is available, ensure the AlpakaServiceCudaAsync is loaded
            if not hasattr(process, "AlpakaServiceCudaAsync"):
                process.add_(AlpakaServiceCudaAsync)

        # Check if ROCm is available, and if the system has at least one usable AMD GPU
        try:
            if not "gpu-amd" in accelerators:
                raise False
            from HeterogeneousCore.AlpakaServices.AlpakaServiceROCmAsync_cfi import AlpakaServiceROCmAsync
        except:
            # ROCm is not available, do not load the AlpakaServiceROCmAsync
            if hasattr(process, "AlpakaServiceROCmAsync"):
                del process.AlpakaServiceROCmAsync
        else:
            # ROCm is available, ensure the AlpakaServiceROCmAsync is loaded
            if not hasattr(process, "AlpakaServiceROCmAsync"):
                process.add_(AlpakaServiceROCmAsync)


# Ensure this module is kept in the configuration when dumping it
cms.specialImportRegistry.registerSpecialImportForType(ProcessAcceleratorAlpaka, "from HeterogeneousCore.AlpakaCore.ProcessAcceleratorAlpaka import ProcessAcceleratorAlpaka")