Source code for mpisppy.cylinders.hub

# Copyright 2020 by B. Knueven, D. Mildebrath, C. Muir, J-P Watson, and D.L. Woodruff
# This software is distributed under the 3-clause BSD License.
import numpy as np
import abc
import logging
import time
import mpisppy.log
from mpisppy.opt.aph import APH

from mpisppy import MPI
from mpisppy.cylinders.spcommunicator import SPCommunicator, communicator_array
from math import inf
from mpisppy.cylinders.spoke import ConvergerSpokeType

from mpisppy import global_toc

# Could also pass, e.g., sys.stdout instead of a filename
mpisppy.log.setup_logger("mpisppy.cylinders.Hub",
                         "hub.log",
                         level=logging.CRITICAL)
logger = logging.getLogger("mpisppy.cylinders.Hub")

[docs] class Hub(SPCommunicator): def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, spokes, options=None): super().__init__(spbase_object, fullcomm, strata_comm, cylinder_comm, options=options) assert len(spokes) == self.n_spokes self.local_write_ids = np.zeros(self.n_spokes, dtype=np.int64) self.remote_write_ids = np.zeros(self.n_spokes, dtype=np.int64) self.local_lengths = np.zeros(self.n_spokes, dtype=np.int64) self.remote_lengths = np.zeros(self.n_spokes, dtype=np.int64) # ^^^ Does NOT include +1 self.spokes = spokes # List of dicts logger.debug(f"Built the hub object on global rank {fullcomm.Get_rank()}") # for logging self.print_init = True self.latest_ib_char = None self.latest_ob_char = None self.last_ib_idx = None self.last_ob_idx = None # for termination based on stalling out self.stalled_iter_cnt = 0 self.last_gap = float('inf') # abs_gap tracker
[docs] @abc.abstractmethod def setup_hub(self): pass
[docs] @abc.abstractmethod def sync(self): """ To be called within the whichever optimization algorithm is being run on the hub (e.g. PH) """ pass
[docs] @abc.abstractmethod def is_converged(self): """ The hub has the ability to halt the optimization algorithm on the hub before any local convergers. """ pass
[docs] @abc.abstractmethod def current_iteration(self): """ Returns the current iteration count - however the hub defines it. """ pass
[docs] @abc.abstractmethod def main(self): pass
[docs] def clear_latest_chars(self): self.latest_ib_char = None self.latest_ob_char = None
[docs] def compute_gaps(self): """ Compute the current absolute and relative gaps, using the current self.BestInnerBound and self.BestOuterBound """ if self.opt.is_minimizing: abs_gap = self.BestInnerBound - self.BestOuterBound else: abs_gap = self.BestOuterBound - self.BestInnerBound ## define by the best solution, as is common nano = float("nan") # typing aid if ( abs_gap != nano and abs_gap != float("inf") and abs_gap != float("-inf") and self.BestOuterBound != nano and self.BestOuterBound != 0 ): rel_gap = abs_gap / abs(self.BestOuterBound) else: rel_gap = float("inf") return abs_gap, rel_gap
[docs] def get_update_string(self): if self.latest_ib_char is None and \ self.latest_ob_char is None: return ' ' if self.latest_ib_char is None: return self.latest_ob_char + ' ' if self.latest_ob_char is None: return ' ' + self.latest_ib_char return self.latest_ob_char+' '+self.latest_ib_char
[docs] def screen_trace(self): current_iteration = self.current_iteration() abs_gap, rel_gap = self.compute_gaps() best_solution = self.BestInnerBound best_bound = self.BestOuterBound update_source = self.get_update_string() if self.print_init: row = f'{"Iter.":>5s} {" "} {"Best Bound":>14s} {"Best Incumbent":>14s} {"Rel. Gap":>12s} {"Abs. Gap":>14s}' global_toc(row, True) self.print_init = False row = f"{current_iteration:5d} {update_source} {best_bound:14.4f} {best_solution:14.4f} {rel_gap*100:12.3f}% {abs_gap:14.4f}" global_toc(row, True) self.clear_latest_chars()
[docs] def determine_termination(self): # return True if termination is indicated, otherwise return False if not hasattr(self,"options") or self.options is None\ or ("rel_gap" not in self.options and "abs_gap" not in self.options\ and "max_stalled_iters" not in self.options): return False # Nothing to see here folks... # If we are still here, there is some option for termination abs_gap, rel_gap = self.compute_gaps() abs_gap_satisfied = False rel_gap_satisfied = False max_stalled_satisfied = False if "rel_gap" in self.options and rel_gap <= self.options["rel_gap"]: rel_gap_satisfied = True if "abs_gap" in self.options and abs_gap <= self.options["abs_gap"]: abs_gap_satisfied = True if "max_stalled_iters" in self.options: if abs_gap < self.last_gap: # liberal test (we could use an epsilon) self.last_gap = abs_gap self.stalled_iter_cnt = 0 else: self.stalled_iter_cnt += 1 if self.stalled_iter_cnt >= self.options["max_stalled_iters"]: max_stalled_satisfied = True if abs_gap_satisfied: global_toc(f"Terminating based on inter-cylinder absolute gap {abs_gap:12.4f}") if rel_gap_satisfied: global_toc(f"Terminating based on inter-cylinder relative gap {rel_gap*100:12.3f}%") if max_stalled_satisfied: global_toc(f"Terminating based on max-stalled-iters {self.stalled_iter_cnt}") return abs_gap_satisfied or rel_gap_satisfied or max_stalled_satisfied
[docs] def hub_finalize(self): if self.has_outerbound_spokes: self.receive_outerbounds() if self.has_innerbound_spokes: self.receive_innerbounds() if self.global_rank == 0: self.print_init = True global_toc(f"Statistics at termination", True) self.screen_trace()
[docs] def receive_innerbounds(self): """ Get inner bounds from inner bound spokes NOTE: Does not check if there _are_ innerbound spokes (but should be harmless to call if there are none) """ logging.debug("Hub is trying to receive from InnerBounds") for idx in self.innerbound_spoke_indices: is_new = self.hub_from_spoke(self.innerbound_receive_buffers[idx], idx) if is_new: bound = self.innerbound_receive_buffers[idx][0] logging.debug("!! new InnerBound to opt {}".format(bound)) self.BestInnerBound = self.InnerBoundUpdate(bound, idx) logging.debug("ph back from InnerBounds")
[docs] def receive_outerbounds(self): """ Get outer bounds from outer bound spokes NOTE: Does not check if there _are_ outerbound spokes (but should be harmless to call if there are none) """ logging.debug("Hub is trying to receive from OuterBounds") for idx in self.outerbound_spoke_indices: is_new = self.hub_from_spoke(self.outerbound_receive_buffers[idx], idx) if is_new: bound = self.outerbound_receive_buffers[idx][0] logging.debug("!! new OuterBound to opt {}".format(bound)) self.BestOuterBound = self.OuterBoundUpdate(bound, idx) logging.debug("ph back from OuterBounds")
[docs] def OuterBoundUpdate(self, new_bound, idx=None, char='*'): current_bound = self.BestOuterBound if self._outer_bound_update(new_bound, current_bound): if idx is None: self.latest_ob_char = char self.last_ob_idx = 0 else: self.latest_ob_char = self.outerbound_spoke_chars[idx] self.last_ob_idx = idx return new_bound else: return current_bound
[docs] def InnerBoundUpdate(self, new_bound, idx=None, char='*'): current_bound = self.BestInnerBound if self._inner_bound_update(new_bound, current_bound): if idx is None: self.latest_ib_char = char self.last_ib_idx = 0 else: self.latest_ib_char = self.innerbound_spoke_chars[idx] self.last_ib_idx = idx return new_bound else: return current_bound
[docs] def initialize_bound_values(self): if self.opt.is_minimizing: self.BestInnerBound = inf self.BestOuterBound = -inf self._inner_bound_update = lambda new, old : (new < old) self._outer_bound_update = lambda new, old : (new > old) else: self.BestInnerBound = -inf self.BestOuterBound = inf self._inner_bound_update = lambda new, old : (new > old) self._outer_bound_update = lambda new, old : (new < old)
[docs] def initialize_outer_bound_buffers(self): """ Initialize value of BestOuterBound, and outer bound receive buffers """ self.outerbound_receive_buffers = dict() for idx in self.outerbound_spoke_indices: self.outerbound_receive_buffers[idx] = communicator_array( self.remote_lengths[idx - 1] )
[docs] def initialize_inner_bound_buffers(self): """ Initialize value of BestInnerBound, and inner bound receive buffers """ self.innerbound_receive_buffers = dict() for idx in self.innerbound_spoke_indices: self.innerbound_receive_buffers[idx] = communicator_array( self.remote_lengths[idx - 1] )
[docs] def initialize_nonants(self): """ Initialize the buffer for the hub to send nonants to the appropriate spokes """ self.nonant_send_buffer = None for idx in self.nonant_spoke_indices: if self.nonant_send_buffer is None: # for hub outer/inner bounds and kill signal self.nonant_send_buffer = communicator_array(self.local_lengths[idx - 1]) elif self.local_lengths[idx - 1] + 1 != len(self.nonant_send_buffer): raise RuntimeError("Nonant buffers disagree on size")
[docs] def initialize_boundsout(self): """ Initialize the buffer for the hub to send bounds to bounds only spokes """ self.boundsout_send_buffer = None for idx in self.bounds_only_indices: if self.boundsout_send_buffer is None: self.boundsout_send_buffer = communicator_array(self.local_lengths[idx - 1]) if self.local_lengths[idx - 1] != 2: raise RuntimeError(f'bounds only local length buffers must be 2 (bounds). Currently {self.local_lengths[idx - 1]}')
def _populate_boundsout_cache(self, buf): """ Populate a given buffer with the current bounds """ buf[-3] = self.BestOuterBound buf[-2] = self.BestInnerBound
[docs] def send_boundsout(self): """ Send bounds to the appropriate spokes This is called only for spokes which are bounds only. w and nonant spokes are passed bounds through the w and nonant buffers """ self._populate_boundsout_cache(self.boundsout_send_buffer) logging.debug("hub is sending bounds={}".format(self.boundsout_send_buffer)) for idx in self.bounds_only_indices: self.hub_to_spoke(self.boundsout_send_buffer, idx)
[docs] def initialize_spoke_indices(self): """ Figure out what types of spokes we have, and sort them into the appropriate classes. Note: Some spokes may be multiple types (e.g. outerbound and nonant), though not all combinations are supported. """ self.outerbound_spoke_indices = set() self.innerbound_spoke_indices = set() self.nonant_spoke_indices = set() self.w_spoke_indices = set() self.outerbound_spoke_chars = dict() self.innerbound_spoke_chars = dict() for (i, spoke) in enumerate(self.spokes): spoke_class = spoke["spoke_class"] if hasattr(spoke_class, "converger_spoke_types"): for cst in spoke_class.converger_spoke_types: if cst == ConvergerSpokeType.OUTER_BOUND: self.outerbound_spoke_indices.add(i + 1) self.outerbound_spoke_chars[i+1] = spoke_class.converger_spoke_char elif cst == ConvergerSpokeType.INNER_BOUND: self.innerbound_spoke_indices.add(i + 1) self.innerbound_spoke_chars[i+1] = spoke_class.converger_spoke_char elif cst == ConvergerSpokeType.W_GETTER: self.w_spoke_indices.add(i + 1) elif cst == ConvergerSpokeType.NONANT_GETTER: self.nonant_spoke_indices.add(i + 1) else: raise RuntimeError(f"Unrecognized converger_spoke_type {cst}") else: ##this isn't necessarily wrong, i.e., cut generators logger.debug(f"Spoke class {spoke_class} not recognized by hub") # all _BoundSpoke spokes get hub bounds so we determine which spokes # are "bounds only" self.bounds_only_indices = \ (self.outerbound_spoke_indices | self.innerbound_spoke_indices) - \ (self.w_spoke_indices | self.nonant_spoke_indices) self.has_outerbound_spokes = len(self.outerbound_spoke_indices) > 0 self.has_innerbound_spokes = len(self.innerbound_spoke_indices) > 0 self.has_nonant_spokes = len(self.nonant_spoke_indices) > 0 self.has_w_spokes = len(self.w_spoke_indices) > 0 self.has_bounds_only_spokes = len(self.bounds_only_indices) > 0 # Not all opt classes may have extensions if getattr(self.opt, "extensions", None) is not None: self.opt.extobject.initialize_spoke_indices()
[docs] def make_windows(self): if self._windows_constructed: # different parts of the hub may call make_windows, # we just care about the first call return # Spokes notify the hub of the buffer sizes for i in range(self.n_spokes): pair_of_sizes = np.zeros(2, dtype="i") self.strata_comm.Recv((pair_of_sizes, MPI.INT), source=i + 1, tag=i + 1) self.remote_lengths[i] = pair_of_sizes[0] self.local_lengths[i] = pair_of_sizes[1] # Make the windows of the appropriate buffer sizes self.windows = [None for _ in range(self.n_spokes)] self.buffers = [None for _ in range(self.n_spokes)] for i in range(self.n_spokes): length = self.local_lengths[i] win, buff = self._make_window(length) self.windows[i] = win self.buffers[i] = buff # flag this for multiple calls from the hub self._windows_constructed = True
[docs] def hub_to_spoke(self, values, spoke_strata_rank): """ Put the specified values into the specified locally-owned buffer for the spoke to pick up. Notes: This automatically does the -1 indexing This assumes that values contains a slot at the end for the write_id """ expected_length = self.local_lengths[spoke_strata_rank - 1] + 1 if len(values) != expected_length: raise RuntimeError( f"Attempting to put array of length {len(values)} " f"into local buffer of length {expected_length}" ) # this is so the spoke ranks all get the same write_id at approximately the same time if not isinstance(self.opt, APH): self.cylinder_comm.Barrier() self.local_write_ids[spoke_strata_rank - 1] += 1 values[-1] = self.local_write_ids[spoke_strata_rank - 1] window = self.windows[spoke_strata_rank - 1] window.Lock(self.strata_rank) window.Put((values, len(values), MPI.DOUBLE), self.strata_rank) window.Unlock(self.strata_rank)
[docs] def hub_from_spoke(self, values, spoke_num): """ spoke_num is the rank in the strata_comm, so it is 1-based not 0-based Returns: is_new (bool): Indicates whether the "gotten" values are new, based on the write_id. """ expected_length = self.remote_lengths[spoke_num - 1] + 1 if len(values) != expected_length: raise RuntimeError( f"Hub trying to get buffer of length {expected_length} " f"from spoke, but provided buffer has length {len(values)}." ) # so the window in each rank gets read at approximately the same time, # and so has the same write_id if not isinstance(self.opt, APH): self.cylinder_comm.Barrier() window = self.windows[spoke_num - 1] window.Lock(spoke_num) window.Get((values, len(values), MPI.DOUBLE), spoke_num) window.Unlock(spoke_num) if isinstance(self.opt, APH): # reverting part of changes from Ben getting rid of spoke sleep DLW jan 2023 if values[-1] > self.remote_write_ids[spoke_num - 1]: self.remote_write_ids[spoke_num - 1] = values[-1] return True else: new_id = int(values[-1]) local_val = np.array((new_id,), 'i') sum_ids = np.zeros(1, 'i') self.cylinder_comm.Allreduce((local_val, MPI.INT), (sum_ids, MPI.INT), op=MPI.SUM) if new_id != sum_ids[0] / self.cylinder_comm.size: return False if (new_id > self.remote_write_ids[spoke_num - 1]) or (new_id < 0): self.remote_write_ids[spoke_num - 1] = new_id return True return False
[docs] def send_terminate(self): """ Send an array of zeros with a -1 appended to the end to indicate termination. This function puts to the local buffer, so every spoke will see it simultaneously. processes (don't need to call them one at a time). """ for rank in range(1, self.n_spokes + 1): dummies = np.zeros(self.local_lengths[rank - 1] + 1) dummies[-1] = -1 window = self.windows[rank - 1] window.Lock(0) window.Put((dummies, len(dummies), MPI.DOUBLE), 0) window.Unlock(0)
[docs] class PHHub(Hub):
[docs] def setup_hub(self): """ Must be called after make_windows(), so that the hub knows the sizes of all the spokes windows """ if not self._windows_constructed: raise RuntimeError( "Cannot call setup_hub before memory windows are constructed" ) self.initialize_spoke_indices() self.initialize_bound_values() if self.has_outerbound_spokes: self.initialize_outer_bound_buffers() if self.has_innerbound_spokes: self.initialize_inner_bound_buffers() if self.has_w_spokes: self.initialize_ws() if self.has_nonant_spokes: self.initialize_nonants() if self.has_bounds_only_spokes: self.initialize_boundsout() # bounds going out ## Do some checking for things we currently don't support if len(self.outerbound_spoke_indices & self.innerbound_spoke_indices) > 0: raise RuntimeError( "A Spoke providing both inner and outer " "bounds is currently unsupported" ) if len(self.w_spoke_indices & self.nonant_spoke_indices) > 0: raise RuntimeError( "A Spoke needing both Ws and nonants is currently unsupported" ) ## Generate some warnings if nothing is giving bounds if not self.has_outerbound_spokes: logger.warn( "No OuterBound Spokes defined, this converger " "will not cause the hub to terminate" ) if not self.has_innerbound_spokes: logger.warn( "No InnerBound Spokes defined, this converger " "will not cause the hub to terminate" ) if self.opt.extensions is not None: self.opt.extobject.setup_hub()
[docs] def sync(self): """ Manages communication with Spokes """ if self.has_w_spokes: self.send_ws() if self.has_nonant_spokes: self.send_nonants() if self.has_bounds_only_spokes: self.send_boundsout() if self.has_outerbound_spokes: self.receive_outerbounds() if self.has_innerbound_spokes: self.receive_innerbounds() if self.opt.extensions is not None: self.opt.extobject.sync_with_spokes()
[docs] def sync_with_spokes(self): self.sync()
[docs] def is_converged(self): ## might as well get a bound, in this case if self.opt._PHIter == 1: self.BestOuterBound = self.OuterBoundUpdate(self.opt.trivial_bound) if not self.has_innerbound_spokes: if self.opt._PHIter == 1: logger.warning( "PHHub cannot compute convergence without " "inner bound spokes." ) ## you still want to output status, even without inner bounders configured if self.global_rank == 0: self.screen_trace() return False if not self.has_outerbound_spokes: if self.opt._PHIter == 1: global_toc( "Without outer bound spokes, no progress " "will be made on the Best Bound") ## log some output if self.global_rank == 0: self.screen_trace() return self.determine_termination()
[docs] def current_iteration(self): """ Return the current PH iteration.""" return self.opt._PHIter
[docs] def main(self): """ SPComm gets attached in self.__init__ """ self.opt.ph_main(finalize=False)
[docs] def finalize(self): """ does PH.post_loops, returns Eobj """ Eobj = self.opt.post_loops(self.opt.extensions) return Eobj
[docs] def send_nonants(self): """ Gather nonants and send them to the appropriate spokes TODO: Will likely fail with bundling """ self.opt._save_nonants() ci = 0 ## index to self.nonant_send_buffer nonant_send_buffer = self.nonant_send_buffer for k, s in self.opt.local_scenarios.items(): for xvar in s._mpisppy_data.nonant_indices.values(): nonant_send_buffer[ci] = xvar._value ci += 1 logging.debug("hub is sending X nonants={}".format(nonant_send_buffer)) self._populate_boundsout_cache(nonant_send_buffer) for idx in self.nonant_spoke_indices: self.hub_to_spoke(nonant_send_buffer, idx)
[docs] def initialize_ws(self): """ Initialize the buffer for the hub to send dual weights to the appropriate spokes """ self.w_send_buffer = None for idx in self.w_spoke_indices: if self.w_send_buffer is None: self.w_send_buffer = communicator_array(self.local_lengths[idx - 1]) elif self.local_lengths[idx - 1] + 1 != len(self.w_send_buffer): raise RuntimeError("W buffers disagree on size")
[docs] def send_ws(self): """ Send dual weights to the appropriate spokes """ self.opt._populate_W_cache(self.w_send_buffer, padding=3) logging.debug("hub is sending Ws={}".format(self.w_send_buffer)) self._populate_boundsout_cache(self.w_send_buffer) for idx in self.w_spoke_indices: self.hub_to_spoke(self.w_send_buffer, idx)
[docs] class LShapedHub(Hub):
[docs] def setup_hub(self): """ Must be called after make_windows(), so that the hub knows the sizes of all the spokes windows """ if not self._windows_constructed: raise RuntimeError( "Cannot call setup_hub before memory windows are constructed" ) self.initialize_spoke_indices() self.initialize_bound_values() if self.has_outerbound_spokes: self.initialize_outer_bound_buffers() if self.has_innerbound_spokes: self.initialize_inner_bound_buffers() ## Do some checking for things we currently ## do not support if self.has_w_spokes: raise RuntimeError("LShaped hub does not compute dual weights (Ws)") if self.has_nonant_spokes: self.initialize_nonants() if len(self.outerbound_spoke_indices & self.innerbound_spoke_indices) > 0: raise RuntimeError( "A Spoke providing both inner and outer " "bounds is currently unsupported" ) ## Generate some warnings if nothing is giving bounds if not self.has_innerbound_spokes: logger.warn( "No InnerBound Spokes defined, this converger " "will not cause the hub to terminate" )
[docs] def sync(self, send_nonants=True): """ Manages communication with Bound Spokes """ if send_nonants and self.has_nonant_spokes: self.send_nonants() if self.has_outerbound_spokes: self.receive_outerbounds() if self.has_innerbound_spokes: self.receive_innerbounds() # in case LShaped ever gets extensions if getattr(self.opt, "extensions", None) is not None: self.opt.extobject.sync_with_spokes()
[docs] def is_converged(self): """ Returns a boolean. If True, then LShaped will terminate Side-effects: The L-shaped method produces outer bounds during execution, so we will check it as well. """ bound = self.opt._LShaped_bound self.BestOuterBound = self.OuterBoundUpdate(bound) ## log some output if self.global_rank == 0: self.screen_trace() return self.determine_termination()
[docs] def current_iteration(self): """ Return the current L-shaped iteration.""" return self.opt.iter
[docs] def main(self): """ SPComm gets attached in self.__init__ """ self.opt.lshaped_algorithm()
[docs] def send_nonants(self): """ Gather nonants and send them to the appropriate spokes TODO: Will likely fail with bundling """ ci = 0 ## index to self.nonant_send_buffer nonant_send_buffer = self.nonant_send_buffer for k, s in self.opt.local_scenarios.items(): nonant_to_root_var_map = s._mpisppy_model.subproblem_to_root_vars_map for xvar in s._mpisppy_data.nonant_indices.values(): ## Grab the value from the associated root variable nonant_send_buffer[ci] = nonant_to_root_var_map[xvar]._value ci += 1 logging.debug("hub is sending X nonants={}".format(nonant_send_buffer)) self._populate_boundsout_cache(nonant_send_buffer) for idx in self.nonant_spoke_indices: self.hub_to_spoke(nonant_send_buffer, idx)
[docs] class APHHub(PHHub):
[docs] def main(self): """ SPComm gets attached by self.__init___; holding APH harmless """ logger.critical("aph debug main in hub.py") self.opt.APH_main(spcomm=self, finalize=False)
[docs] def finalize(self): """ does PH.post_loops, returns Eobj """ # NOTE: APH_main does NOT pass in extensions # to APH.post_loops Eobj = self.opt.post_loops() return Eobj