"""
Parallel and serial mapper implementations.
The API is a bit crufty since interprocess communication has evolved from
the original implementation. And the names are misleading.
Available mappers:
- SerialMapper: Single-threaded execution
- MPMapper: Multi-process execution using multiprocessing
- ThreadPoolMapper: Multi-threaded execution using ThreadPoolExecutor
- MPIMapper: MPI-based distributed execution across cluster nodes
Usage::
Mapper.start_worker(problem)
mapper = Mapper.start_mapper(problem, None, cpus)
result = mapper(points)
...
mapper = Mapper.start_mapper(problem, None, cpus)
result = mapper(points)
Mapper.stop_mapper()
"""
import time
import sys
import os
import signal
import threading
import copy
from concurrent.futures import ThreadPoolExecutor
from cloudpickle import dumps, loads
# {{{ http://code.activestate.com/recipes/496767/ (r1)
# Converted to use ctypes by Paul Kienzle
PROCESS_ALL_ACCESS = 0x1F0FFF
[docs]
def can_pickle(problem, check=False):
"""
Returns True if *problem* can be pickled.
If this method returns False then MPMapper cannot be used and
SerialMapper should be used instead.
If *check* is True then call *nllf()* on the duplicated object as a
"smoke test" to verify that the function will run after copying. This
is not foolproof. For example, access to a database may work in the
duplicated object because the connection is open and available in the
current process, but it will fail when trying to run on a remote machine.
"""
try:
dup = loads(dumps(problem))
if check:
dup.nllf()
return True
except Exception:
return False
[docs]
def setpriority(pid=None, priority=1):
"""
Set The Priority of a Windows Process. Priority is a value between 0-5
where 2 is normal priority and 5 is maximum. Default sets the priority
of the current python process but can take any valid process ID.
"""
# import win32api,win32process,win32con
from ctypes import windll
priorityclasses = [
0x40, # IDLE_PRIORITY_CLASS,
0x4000, # BELOW_NORMAL_PRIORITY_CLASS,
0x20, # NORMAL_PRIORITY_CLASS,
0x8000, # ABOVE_NORMAL_PRIORITY_CLASS,
0x80, # HIGH_PRIORITY_CLASS,
0x100, # REALTIME_PRIORITY_CLASS
]
if pid is None:
pid = windll.kernel32.GetCurrentProcessId()
handle = windll.kernel32.OpenProcess(PROCESS_ALL_ACCESS, True, pid)
windll.kernel32.SetPriorityClass(handle, priorityclasses[priority])
# end of http://code.activestate.com/recipes/496767/ }}}
[docs]
def nice():
if os.name == "nt":
setpriority(priority=1)
else:
os.nice(5)
[docs]
def pool_size(cpus=0):
"""
Get the number of cpus available for processing, or use the number provided.
On linux, use os.sched_getaffinity to count the number of cpus allocated to the
process rather than multiprocessing.cpu_count to return all processors on the
system. This allows us to restrict the amount of parallelism to the number of
cpus allocated by slurm when running on a compute cluster with a partial node.
"""
if cpus > 0:
return cpus
# Use sched_getaffinity if available (only on linux)
if hasattr(os, "sched_getaffinity"):
return len(os.sched_getaffinity(0))
import multiprocessing
return multiprocessing.cpu_count()
# For debugging parallelism it is handy to know which core the process is using
[docs]
def cpu_id(num_sockets=2):
"""
Return the processor id for the currently running process.
"""
import multiprocessing
import psutil
process = multiprocessing.current_process()
return psutil.Process(process.pid).cpu_num()
SHOW_PERFORMANCE = os.environ.get("BUMPS_SHOW_PERFORMANCE", "0").upper() in ("1", "TRUE", "ON")
# Noise so that the type checker is happy
[docs]
class BaseMapper(object):
has_problem = False
[docs]
@staticmethod
def start_worker(problem):
"""Called with the problem to initialize the worker"""
raise NotImplementedError()
[docs]
@staticmethod
def start_mapper(problem, modelargs=None, cpus=0):
"""Called with the problem on a new fit."""
raise NotImplementedError()
# TODO: deprecate mapper parameter
[docs]
@staticmethod
def stop_mapper(mapper=None):
raise NotImplementedError()
[docs]
class SerialMapper(BaseMapper):
timestamps = []
[docs]
@staticmethod
def start_worker(problem):
pass
[docs]
@staticmethod
def start_mapper(problem, modelargs=None, cpus=0):
# Note: map is an iterator in python 3.x
# return lambda points: list(map(problem.nllf, points))
SerialMapper.timestamps = []
def mapper(points):
tstart = time.perf_counter_ns()
result = list(map(problem.nllf, points))
tstop = time.perf_counter_ns()
SerialMapper.timestamps.append((tstart, tstop))
return result
return mapper
[docs]
@staticmethod
def stop_mapper(mapper=None):
show_performance(MPMapper.timestamps)
pass
def _MP_setup():
# Using MPMapper class variables to store worker globals.
# It doesn't matter if they conflict with the controller values since
# they are in a different process.
signal.signal(signal.SIGINT, signal.SIG_IGN)
nice()
# print(f"starting pool worker on {cpu_id()}")
def _MP_run_problem(problem_point_tuple):
problem_id, point, shared_pickled_problem = problem_point_tuple
# problem_id, point, shared_pickled_problem, cpu_usage, lock = problem_point_tuple
# with lock: cpu_usage[cpu_id()] += 1
if problem_id != MPMapper.problem_id:
# print(f"Fetching problem {problem_id} from namespace")
MPMapper.problem = loads(shared_pickled_problem[:].tobytes())
MPMapper.problem_id = problem_id
return MPMapper.problem.nllf(point)
[docs]
class MPMapper(BaseMapper):
# Note: suprocesses are using the same variables
pool = None
manager = None
problem_id = 0
shared_pickled_problem = None
problem = None
timestamps = []
[docs]
@staticmethod
def start_worker(problem):
pass
[docs]
@staticmethod
def start_mapper(problem, modelargs=None, cpus=0):
import multiprocessing
# Set up the process pool on the first call.
if MPMapper.pool is None:
# Create a sync namespace to distribute the problem description.
MPMapper.manager = multiprocessing.Manager()
# Start the process pool, sending the namespace handle
MPMapper.pool = multiprocessing.Pool(pool_size(cpus), _MP_setup)
# For verifying that the execution threads can migrate between cpus, accumulate
# a histogram of the processor id for each function evaluation.
# MPMapper.num_cpus = multiprocessing.cpu_count() # may be more than pool_size
# MPMapper.lock = MPMapper.manager.Lock()
# MPMapper.cpu_usage = MPMapper.manager.Array('i', [0]*MPMapper.num_cpus)
# print("pool created")
# Increment the problem number and store the problem in the namespace.
# The store action uses pickle to transfer python objects to the
# manager process. Since this may fail for lambdas and for functions
# defined within the model file, instead use cloudpickle
# to pickle the problem before storing.
MPMapper.problem_id += 1
MPMapper.pickled_problem = dumps(problem)
MPMapper.shared_pickled_problem = MPMapper.manager.Array("B", MPMapper.pickled_problem)
# Set the mapper to send problem_id/point/shared_pickled_problem value triples
MPMapper.timestamps = []
def mapper(points):
try:
tstart = time.perf_counter_ns()
# args = ((MPMapper.problem_id, p, MPMapper.shared_pickled_problem, MPMapper.cpu_usage, MPMapper.lock) for p in points)
args = ((MPMapper.problem_id, p, MPMapper.shared_pickled_problem) for p in points)
result = MPMapper.pool.map(_MP_run_problem, args)
tstop = time.perf_counter_ns()
# print(f"map time {tstart} => {tstop}")
MPMapper.timestamps.append((tstart, tstop))
return result
except KeyboardInterrupt:
MPMapper.stop_mapper()
return mapper
[docs]
@staticmethod
def stop_mapper(mapper=None):
# print("stopping mapper")
# reset pool and manager
if MPMapper.pool is not None:
MPMapper.pool.terminate()
MPMapper.pool = None
# Show cpu histogram
# print("== evaluation count per cpu ==")
# for k in range(MPMapper.num_cpus):
# print(MPMapper.cpu_usage[k], end=" ")
# print()
show_performance(MPMapper.timestamps)
MPMapper.manager.shutdown()
MPMapper.manager = None
# Don't reset problem id; it keeps count even when mapper is restarted.
##MPMapper.problem_id = 0
def _TP_run_problem(problem_point_tuple):
"""Thread pool worker function with thread-local problem copy."""
problem_id, point, original_problem = problem_point_tuple
# Get or create thread-local problem copy
thread_local = threading.current_thread()
if getattr(thread_local, "problem_id", None) != problem_id:
thread_local.problem_id = problem_id
thread_local.problem_copy = copy.deepcopy(original_problem)
return thread_local.problem_copy.nllf(point)
[docs]
class ThreadPoolMapper(BaseMapper):
"""
Thread-based parallel mapper using concurrent.futures.ThreadPoolExecutor.
Each thread maintains its own copy of the problem object for independent
calculations of nllf.
This mapper will only be efficient when using a free-threaded python interpreter
(otherwise the GIL will prevent true parallelism).
"""
pool = None
problem_id = 0
timestamps = []
[docs]
@staticmethod
def start_worker(problem):
pass
[docs]
@staticmethod
def start_mapper(problem, modelargs=None, cpus=0):
# Set up the thread pool on the first call.
if ThreadPoolMapper.pool is None:
ThreadPoolMapper.pool = ThreadPoolExecutor(max_workers=pool_size(cpus))
ThreadPoolMapper.problem_id += 1
# Create mapper function that submits tasks to thread pool
ThreadPoolMapper.timestamps = []
def mapper(points):
try:
tstart = time.perf_counter_ns()
futures = [
ThreadPoolMapper.pool.submit(_TP_run_problem, (ThreadPoolMapper.problem_id, p, problem))
for p in points
]
# Collect results in order
result = [future.result() for future in futures]
tstop = time.perf_counter_ns()
ThreadPoolMapper.timestamps.append((tstart, tstop))
return result
except KeyboardInterrupt:
ThreadPoolMapper.stop_mapper()
raise
return mapper
[docs]
@staticmethod
def stop_mapper(mapper=None):
if ThreadPoolMapper.pool is not None:
show_performance(ThreadPoolMapper.timestamps)
ThreadPoolMapper.pool.shutdown(wait=True)
ThreadPoolMapper.pool = None
# Thread-local copies will be automatically garbage collected
# when threads are destroyed
def _MPI_set_problem(problem, comm, root=0):
pickled_problem = dumps(problem) if comm.rank == root else None
pickled_problem = comm.bcast(pickled_problem, root=root)
return problem if comm.rank == root else loads(pickled_problem)
def _MPI_map(problem, points, comm, root=0):
import numpy as np
from mpi4py import MPI
# print(f"{comm.rank}: mapping points")
# Send number of points and number of variables per point.
# root: return result if there are points otherwise return False
# worker: return True if there are points otherwise return False
npoints, nvars = comm.bcast(points.shape if comm.rank == root else None, root=root)
if npoints == 0:
return None
# Divvy points equally across all processes
whole = points if comm.rank == root else None
idx = np.arange(comm.size)
size = np.ones(comm.size, idx.dtype) * (npoints // comm.size) + (idx < npoints % comm.size)
offset = np.cumsum(np.hstack((0, size[:-1])))
part = np.empty((size[comm.rank], nvars), dtype="d")
comm.Scatterv((whole, (size * nvars, offset * nvars), MPI.DOUBLE), (part, MPI.DOUBLE), root=root)
# Evaluate models assigned to each processor
partial_result = np.array([problem.nllf(pk) for pk in part], dtype="d")
# Collect results
result = np.empty(npoints, dtype="d") if comm.rank == root else True
comm.Barrier()
comm.Gatherv((partial_result, MPI.DOUBLE), (result, (size, offset), MPI.DOUBLE), root=root)
comm.Barrier()
return result
[docs]
def using_mpi():
# Can look for environment variables defined by mpirun
# mpich: PMI_SIZE, PMI_*, MPI_*
# openmp: OMPI_COMM_WORLD_SIZE, OMPI_* PMIX_*
# impi_rt (intel): PMI_SIZE I_MPI_* HYDRA_*
# msmpi (microsoft): PMI_SIZE PMI_* MSMPI_*
size = os.environ.get("PMI_SIZE", None)
if size is None:
size = os.environ.get("OMPI_COMM_WORLD_SIZE", None)
return size is not None and int(size) > 1
[docs]
class MPIMapper(BaseMapper):
has_problem = True
"""For MPIMapper only the worker is initialized with the fit problem."""
timestamps = []
[docs]
@staticmethod
def start_worker(problem):
"""
Start the worker process.
For the main process this does nothing and returns immediately. The
worker processes never return.
Each worker sits in a loop waiting for the next batch of points
for the problem, or for the next problem. Set t
problem is set to None, then exit the process and never
"""
from mpi4py import MPI
comm, root = MPI.COMM_WORLD, 0
MPIMapper.rank = comm.rank
rank = comm.rank
# print(f"MPI {rank} of {comm.size} initializing")
# If worker, sit in a loop waiting for the next point.
# If the point is empty, then wait for a new problem.
# If the problem is None then we are done, otherwise wait for next point.
if rank != root:
# print(f"{rank}: looping")
while True:
result = _MPI_map(problem, None, comm, root)
if result is None:
problem = _MPI_set_problem(None, comm, root)
if problem is None:
break
# print(f"{rank}: changing problem")
# print(f"{rank}: finalizing")
MPI.Finalize()
# Exit the program after the worker is done. Don't return
# to the caller since that is continuing on with the main
# thread, and in particular, attempting to rerun the fit on
# each worker.
# print(f"{rank}: Worker exiting")
sys.exit(0)
# print(f"{rank}: Worker exited")
else:
# Root initialization:
MPIMapper.has_problem = problem is not None
# print("mapper has problem", MPIMapper.has_problem)
[docs]
@staticmethod
def start_mapper(problem, modelargs=None, cpus=0):
# Only root can get here---worker is stuck in start_worker
from mpi4py import MPI
import numpy as np
comm, root = MPI.COMM_WORLD, 0
# Signal new problem then send it, but not on the first fit. We do this
# so that we can still run MPI fits even if the problem itself cannot
# be pickled, but only the first one. (You can still fit a series even
# if the problem can't be pickled, but you will need to restart the
# MPI job separately for each fit.)
# Note: setting problem to None stops the program, so call finalize().
MPIMapper.timestamps = []
def mapper(points):
tstart = time.perf_counter_ns()
result = _MPI_map(problem, points, comm, root)
tstop = time.perf_counter_ns()
MPIMapper.timestamps.append((tstart, tstop))
return result
if not MPIMapper.has_problem: # Only true on the first fit
# print(f"*** {comm.rank}: replacing problem")
# Send an empty set of points to signal a new problem is coming.
mapper(np.empty((0, 0), "d"))
_MPI_set_problem(problem, comm, root)
if problem is None:
# print(f"{comm.rank}: finalizing root")
MPI.Finalize()
MPIMapper.has_problem = False
return mapper
[docs]
@staticmethod
def stop_mapper(mapper=None):
# print("stopping mapper")
# Set problem=None to stop the program.
show_performance(MPIMapper.timestamps)
MPIMapper.start_mapper(None, None)