from copy import deepcopy
import logging
import os
import shutil
import subprocess
import timeit
from multiprocessing import Pool
from enum import IntEnum
from pymchelper.input_output import frompattern
[docs]class OutputDataType(IntEnum):
"""
Output type requested by user plots (i.e .png) or text data
"""
plot = 1
txt = 2
[docs]class Runner:
"""
Main class responsible for configuring and starting multiple parallel MC simulation processes
It can be used to access combined averaged results of the simulation.
"""
def __init__(self, jobs=None, keep_workspace_after_run=False, output_directory='.'):
# create pool of processes, waiting to be started by run method
# if jobs is not specified, os.cpu_count() would be used
self._pool = Pool(processes=jobs)
# User of the runner has two options: either to specify number of parallel jobs by
# setting the self.jobs to given number, or to leave it as None. If self.jobs is None
# then multiprocessing library will automatically allocate number of parallel jobs to `os.cpu_count()`
# Therefore we cannot rely of self.jobs as the actual counter of parallel processes
# Instead we use undocumented feature of multiprocessing module,
# extracting actual number of allocated processes from `_processes` attribute in Pool class
# To see how it is used internally in the Python (in v3.9) source code take a look at:
# https://github.com/python/cpython/blob/3.9/Lib/multiprocessing/pool.py#L210
self.jobs = self._pool._processes
# workspace is a collection of working directories
# this manager is responsible for creating and cleaning working directories
self.workspace_manager = WorkspaceManager(output_directory=output_directory,
keep_workspace_after_run=keep_workspace_after_run)
[docs] def run(self, settings):
"""
Execute parallel simulation processes, creating workspace (and working directories) in the `output_directory`
In case of successful execution return True, otherwise return False
"""
start_time = timeit.default_timer()
# SHIELD-HIT12A and Fluka require RNG seeds to be integers greater or equal to 1
# each of the workers needs to have its own different RNG seed
rng_seeds = range(1, self.jobs + 1)
# create working directories
self.workspace_manager.create_working_directories(simulation_input_path=settings.input_path,
rng_seeds=rng_seeds)
# rng seeds injection to settings for each SingleSimulationExecutor call
# TODO consider better way of doing it # skipcq: PYL-W0511
settings_list = []
for rng_seed in rng_seeds:
current_settings = deepcopy(settings) # do not modify original arguments
current_settings.set_rng_seed(rng_seed)
settings_list.append(current_settings)
# create executor callable object for current run
executor = SingleSimulationExecutor()
try:
# start execution using pool of workers, mapping the executor callable object to the different settings
# and workspaces
self._pool.map(executor, zip(settings_list, self.workspace_manager.working_directories_abs_paths))
except KeyboardInterrupt:
logging.info('Terminating the pool')
self._pool.terminate()
logging.info('Pool is terminated')
self.workspace_manager.clean()
return False
elapsed = timeit.default_timer() - start_time
logging.info("run elapsed time {:.3f} seconds".format(elapsed))
return True
[docs] def get_data(self):
"""
Scans the output directory for location of the working directories (like run_1, run_2).
Takes all files from all working directories in `output_dir`,
merges their content to form pymchelper Estimator objects.
For each of the output file a single Estimator objects is created, which holds numpy arrays with results.
Return dictionary with keys being output filenames, and values being Estimator objects
"""
start_time = timeit.default_timer()
# TODO line below is specific to SHIELD-HIT12A, should be generalised # skipcq: PYL-W0511
# scans output directory for MC simulation output files
output_files_pattern = os.path.join(self.workspace_manager.output_dir_absolute_path, "run_*", "*.bdo")
logging.debug("Files to merge {:s}".format(output_files_pattern))
estimators_dict = {}
# convert output files to list of estimator objects
estimators_list = frompattern(output_files_pattern)
for estimator in estimators_list:
logging.debug("Appending estimator for {:s}".format(estimator.file_corename))
estimators_dict[estimator.file_corename] = estimator
elapsed = timeit.default_timer() - start_time
logging.info("Workspace reading {:.3f} seconds".format(elapsed))
return estimators_dict
[docs] def clean(self):
"""
Removes all working directories (if exists)
"""
self.workspace_manager.clean()
[docs]class SingleSimulationExecutor:
"""
Callable class responsible for execution of the single MC simulation process.
"""
def __call__(self, settings_and_working_dir, **kwargs):
# we deliberately combine settings and list of working directories
# in the single argument `settings_and_working_dir`
# as this would simplify using this class by multiprocessing module
settings, working_dir_abs_path = settings_and_working_dir
try:
# combine MC engine executable with its command line options to form core of the command string
# this will form basis of the command, like:
# /usr/local/bin/shieldhit --time 00:30:50 -v -N 3
core_command_string = str(settings)
# for easier digesting by subprocess module, convert command string to a list
# and finally append the location of the input files
# finally we obtain a list like:
# ('/usr/local/bin/shieldhit', '--time', '00:30:50', '-v', '-N', '3', '/data/my/simulation/input')
command_as_list = core_command_string.split()
command_as_list.append(working_dir_abs_path)
# execute the MC simulation on a spawned process
# TODO handle this differently, i.e. redirect it to file or save in some variable # skipcq: PYL-W0511
logging.debug('working directory {:s}, command {:s}'.format(working_dir_abs_path,
' '.join(command_as_list)))
DEVNULL = open(os.devnull, 'wb')
subprocess.check_call(command_as_list, cwd=working_dir_abs_path, stdout=DEVNULL, stderr=DEVNULL)
except KeyboardInterrupt:
logging.debug("KeyboardInterrupt")
raise KeyboardInterrupt
[docs]class WorkspaceManager:
"""
A workspace consists of multiple working directories (i.e. run_1, run_2),
each per one of the parallel simulation run.
"""
def __init__(self, output_directory='.', keep_workspace_after_run=False):
self.output_dir_absolute_path = os.path.abspath(output_directory)
self.keep_workspace_after_run = keep_workspace_after_run
self.working_directories_abs_paths = []
[docs] def create_working_directories(self, simulation_input_path, rng_seeds=()):
"""
Create working directories and fill `self.working_directories_abs_paths`
"""
self.working_directories_abs_paths = []
for rng_seed in rng_seeds:
# set workspace to a subdirectory of the output_directory, with run_* pattern
# i.e. /home/user/output/run_3
working_dir_abs_path = os.path.join(self.output_dir_absolute_path, 'run_{:d}'.format(rng_seed))
logging.info("Workspace {:s}".format(working_dir_abs_path))
if os.path.isdir(simulation_input_path):
# if path already exists, remove it before copying with copytree()
if os.path.exists(working_dir_abs_path):
shutil.rmtree(working_dir_abs_path)
# if cleaned or not existing, then create it
if not os.path.exists(working_dir_abs_path):
os.makedirs(working_dir_abs_path)
# copy all files from the directory
for directory_entry in os.listdir(simulation_input_path):
path_to_directory_entry = os.path.join(simulation_input_path, directory_entry)
if os.path.isfile(path_to_directory_entry):
shutil.copy2(path_to_directory_entry, working_dir_abs_path)
logging.debug("Copying input files into {:s}".format(working_dir_abs_path))
elif os.path.isfile(simulation_input_path):
if not os.path.exists(working_dir_abs_path):
os.makedirs(working_dir_abs_path)
shutil.copy2(simulation_input_path, working_dir_abs_path)
logging.debug("Copying input files into {:s}".format(working_dir_abs_path))
else:
logging.debug("Input files {:s} not a dir or file".format(simulation_input_path))
self.working_directories_abs_paths.append(working_dir_abs_path)
[docs] def clean(self):
"""
clean the workspace by removing all working directories
(only if requested by `keep_workspace_after_run` flag)
"""
if not self.keep_workspace_after_run:
start_time = timeit.default_timer()
for working_dir_abs_path in self.working_directories_abs_paths:
# shutil.rmtree will throw exception if the directory we are trying to remove doesn't exist
# hence we only remove existing directories
# this allows safely to call `clean` method multiple times
if os.path.exists(working_dir_abs_path):
shutil.rmtree(working_dir_abs_path)
elapsed = timeit.default_timer() - start_time
print("Cleaning {:.3f} seconds".format(elapsed))