Source code for mpisppy.cylinders.spcommunicator

# Copyright 2020 by B. Knueven, D. Mildebrath, C. Muir, J-P Watson, and D.L. Woodruff
# This software is distributed under the 3-clause BSD License.
""" Conventional wisdom seems to be that we should use Put calls locally (i.e.
    a process should Put() into its own buffer), and Get calls for
    communication (i.e. call Get on a remote target, rather than your local
    buffer). The following implementation uses this paradigm.

    The communication in this paradigm is a star graph, with the hub at the
    center and the spokes on the outside. Each spoke is concerned only
    with the hub, but the hub must track information about all of the
    spokes.

    Separate hub and spoke classes for memory/window management?
"""
import numpy as np
import abc
import time
from mpisppy import MPI


[docs] def communicator_array(size): arr = np.empty(size+1) arr[:] = np.nan arr[-1] = 0 return arr
[docs] class SPCommunicator: """ Notes: TODO """ def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, options=None): # flag for if the windows have been constructed self._windows_constructed = False self.fullcomm = fullcomm self.strata_comm = strata_comm self.cylinder_comm = cylinder_comm self.global_rank = fullcomm.Get_rank() self.strata_rank = strata_comm.Get_rank() self.cylinder_rank = cylinder_comm.Get_rank() self.n_spokes = strata_comm.Get_size() - 1 self.opt = spbase_object self.inst_time = time.time() # For diagnostics if options is None: self.options = dict() else: self.options = options # attach the SPCommunicator to # the SPBase object self.opt.spcomm = self
[docs] @abc.abstractmethod def main(self): """ Every hub/spoke must have a main function """ pass
[docs] def sync(self): """ Every hub/spoke may have a sync function """ pass
[docs] def is_converged(self): """ Every hub/spoke may have a is_converged function """ return False
[docs] def finalize(self): """ Every hub/spoke may have a finalize function, which does some final calculations/flushing to disk after convergence """ pass
[docs] def hub_finalize(self): """ Every hub may have another finalize function, which collects any results from finalize """ pass
[docs] def allreduce_or(self, val): return self.opt.allreduce_or(val)
[docs] def free_windows(self): """ """ if self._windows_constructed: for i in range(self.n_spokes): self.windows[i].Free() del self.buffers self._windows_constructed = False
def _make_window(self, length, comm=None): """ Create a local window object and its corresponding memory buffer using MPI.Win.Allocate() Args: length (int): length of the buffer to create comm (MPI Communicator, optional): MPI communicator object to create the window over. Default is self.strata_comm. Returns: window (MPI.Win object): The created window buff (ndarray): Pointer to corresponding memory Notes: The created buffer will actually be +1 longer than length. The last entry is a write number to keep track of new info. This function assumes that the user has provided the correct window size for the local buffer based on whether this process is a hub or spoke, etc. """ if comm is None: comm = self.strata_comm size = MPI.DOUBLE.size * (length + 1) window = MPI.Win.Allocate(size, MPI.DOUBLE.size, comm=comm) buff = np.ndarray(dtype="d", shape=(length + 1,), buffer=window.tomemory()) buff[:] = np.nan buff[-1] = 0. # Initialize the write number to zero return window, buff