# Copyright 2020 by B. Knueven, D. Mildebrath, C. Muir, J-P Watson, and D.L. Woodruff
# This software is distributed under the 3-clause BSD License.
import numpy as np
import abc
import logging
import time
import mpisppy.log
from mpisppy.opt.aph import APH
from mpisppy import MPI
from mpisppy.cylinders.spcommunicator import SPCommunicator, communicator_array
from math import inf
from mpisppy.cylinders.spoke import ConvergerSpokeType
from mpisppy import global_toc
# Could also pass, e.g., sys.stdout instead of a filename
mpisppy.log.setup_logger("mpisppy.cylinders.Hub",
"hub.log",
level=logging.CRITICAL)
logger = logging.getLogger("mpisppy.cylinders.Hub")
[docs]
class Hub(SPCommunicator):
def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, spokes, options=None):
super().__init__(spbase_object, fullcomm, strata_comm, cylinder_comm, options=options)
assert len(spokes) == self.n_spokes
self.local_write_ids = np.zeros(self.n_spokes, dtype=np.int64)
self.remote_write_ids = np.zeros(self.n_spokes, dtype=np.int64)
self.local_lengths = np.zeros(self.n_spokes, dtype=np.int64)
self.remote_lengths = np.zeros(self.n_spokes, dtype=np.int64)
# ^^^ Does NOT include +1
self.spokes = spokes # List of dicts
logger.debug(f"Built the hub object on global rank {fullcomm.Get_rank()}")
# for logging
self.print_init = True
self.latest_ib_char = None
self.latest_ob_char = None
self.last_ib_idx = None
self.last_ob_idx = None
# for termination based on stalling out
self.stalled_iter_cnt = 0
self.last_gap = float('inf') # abs_gap tracker
[docs]
@abc.abstractmethod
def setup_hub(self):
pass
[docs]
@abc.abstractmethod
def sync(self):
""" To be called within the whichever optimization algorithm
is being run on the hub (e.g. PH)
"""
pass
[docs]
@abc.abstractmethod
def is_converged(self):
""" The hub has the ability to halt the optimization algorithm on the
hub before any local convergers.
"""
pass
[docs]
@abc.abstractmethod
def current_iteration(self):
""" Returns the current iteration count - however the hub defines it.
"""
pass
[docs]
@abc.abstractmethod
def main(self):
pass
[docs]
def clear_latest_chars(self):
self.latest_ib_char = None
self.latest_ob_char = None
[docs]
def compute_gaps(self):
""" Compute the current absolute and relative gaps,
using the current self.BestInnerBound and self.BestOuterBound
"""
if self.opt.is_minimizing:
abs_gap = self.BestInnerBound - self.BestOuterBound
else:
abs_gap = self.BestOuterBound - self.BestInnerBound
## define by the best solution, as is common
nano = float("nan") # typing aid
if (
abs_gap != nano
and abs_gap != float("inf")
and abs_gap != float("-inf")
and self.BestOuterBound != nano
and self.BestOuterBound != 0
):
rel_gap = abs_gap / abs(self.BestOuterBound)
else:
rel_gap = float("inf")
return abs_gap, rel_gap
[docs]
def get_update_string(self):
if self.latest_ib_char is None and \
self.latest_ob_char is None:
return ' '
if self.latest_ib_char is None:
return self.latest_ob_char + ' '
if self.latest_ob_char is None:
return ' ' + self.latest_ib_char
return self.latest_ob_char+' '+self.latest_ib_char
[docs]
def screen_trace(self):
current_iteration = self.current_iteration()
abs_gap, rel_gap = self.compute_gaps()
best_solution = self.BestInnerBound
best_bound = self.BestOuterBound
update_source = self.get_update_string()
if self.print_init:
row = f'{"Iter.":>5s} {" "} {"Best Bound":>14s} {"Best Incumbent":>14s} {"Rel. Gap":>12s} {"Abs. Gap":>14s}'
global_toc(row, True)
self.print_init = False
row = f"{current_iteration:5d} {update_source} {best_bound:14.4f} {best_solution:14.4f} {rel_gap*100:12.3f}% {abs_gap:14.4f}"
global_toc(row, True)
self.clear_latest_chars()
[docs]
def determine_termination(self):
# return True if termination is indicated, otherwise return False
if not hasattr(self,"options") or self.options is None\
or ("rel_gap" not in self.options and "abs_gap" not in self.options\
and "max_stalled_iters" not in self.options):
return False # Nothing to see here folks...
# If we are still here, there is some option for termination
abs_gap, rel_gap = self.compute_gaps()
abs_gap_satisfied = False
rel_gap_satisfied = False
max_stalled_satisfied = False
if "rel_gap" in self.options and rel_gap <= self.options["rel_gap"]:
rel_gap_satisfied = True
if "abs_gap" in self.options and abs_gap <= self.options["abs_gap"]:
abs_gap_satisfied = True
if "max_stalled_iters" in self.options:
if abs_gap < self.last_gap: # liberal test (we could use an epsilon)
self.last_gap = abs_gap
self.stalled_iter_cnt = 0
else:
self.stalled_iter_cnt += 1
if self.stalled_iter_cnt >= self.options["max_stalled_iters"]:
max_stalled_satisfied = True
if abs_gap_satisfied:
global_toc(f"Terminating based on inter-cylinder absolute gap {abs_gap:12.4f}")
if rel_gap_satisfied:
global_toc(f"Terminating based on inter-cylinder relative gap {rel_gap*100:12.3f}%")
if max_stalled_satisfied:
global_toc(f"Terminating based on max-stalled-iters {self.stalled_iter_cnt}")
return abs_gap_satisfied or rel_gap_satisfied or max_stalled_satisfied
[docs]
def hub_finalize(self):
if self.has_outerbound_spokes:
self.receive_outerbounds()
if self.has_innerbound_spokes:
self.receive_innerbounds()
if self.global_rank == 0:
self.print_init = True
global_toc(f"Statistics at termination", True)
self.screen_trace()
[docs]
def receive_innerbounds(self):
""" Get inner bounds from inner bound spokes
NOTE: Does not check if there _are_ innerbound spokes
(but should be harmless to call if there are none)
"""
logging.debug("Hub is trying to receive from InnerBounds")
for idx in self.innerbound_spoke_indices:
is_new = self.hub_from_spoke(self.innerbound_receive_buffers[idx], idx)
if is_new:
bound = self.innerbound_receive_buffers[idx][0]
logging.debug("!! new InnerBound to opt {}".format(bound))
self.BestInnerBound = self.InnerBoundUpdate(bound, idx)
logging.debug("ph back from InnerBounds")
[docs]
def receive_outerbounds(self):
""" Get outer bounds from outer bound spokes
NOTE: Does not check if there _are_ outerbound spokes
(but should be harmless to call if there are none)
"""
logging.debug("Hub is trying to receive from OuterBounds")
for idx in self.outerbound_spoke_indices:
is_new = self.hub_from_spoke(self.outerbound_receive_buffers[idx], idx)
if is_new:
bound = self.outerbound_receive_buffers[idx][0]
logging.debug("!! new OuterBound to opt {}".format(bound))
self.BestOuterBound = self.OuterBoundUpdate(bound, idx)
logging.debug("ph back from OuterBounds")
[docs]
def OuterBoundUpdate(self, new_bound, idx=None, char='*'):
current_bound = self.BestOuterBound
if self._outer_bound_update(new_bound, current_bound):
if idx is None:
self.latest_ob_char = char
self.last_ob_idx = 0
else:
self.latest_ob_char = self.outerbound_spoke_chars[idx]
self.last_ob_idx = idx
return new_bound
else:
return current_bound
[docs]
def InnerBoundUpdate(self, new_bound, idx=None, char='*'):
current_bound = self.BestInnerBound
if self._inner_bound_update(new_bound, current_bound):
if idx is None:
self.latest_ib_char = char
self.last_ib_idx = 0
else:
self.latest_ib_char = self.innerbound_spoke_chars[idx]
self.last_ib_idx = idx
return new_bound
else:
return current_bound
[docs]
def initialize_bound_values(self):
if self.opt.is_minimizing:
self.BestInnerBound = inf
self.BestOuterBound = -inf
self._inner_bound_update = lambda new, old : (new < old)
self._outer_bound_update = lambda new, old : (new > old)
else:
self.BestInnerBound = -inf
self.BestOuterBound = inf
self._inner_bound_update = lambda new, old : (new > old)
self._outer_bound_update = lambda new, old : (new < old)
[docs]
def initialize_outer_bound_buffers(self):
""" Initialize value of BestOuterBound, and outer bound receive buffers
"""
self.outerbound_receive_buffers = dict()
for idx in self.outerbound_spoke_indices:
self.outerbound_receive_buffers[idx] = communicator_array(
self.remote_lengths[idx - 1]
)
[docs]
def initialize_inner_bound_buffers(self):
""" Initialize value of BestInnerBound, and inner bound receive buffers
"""
self.innerbound_receive_buffers = dict()
for idx in self.innerbound_spoke_indices:
self.innerbound_receive_buffers[idx] = communicator_array(
self.remote_lengths[idx - 1]
)
[docs]
def initialize_nonants(self):
""" Initialize the buffer for the hub to send nonants
to the appropriate spokes
"""
self.nonant_send_buffer = None
for idx in self.nonant_spoke_indices:
if self.nonant_send_buffer is None:
# for hub outer/inner bounds and kill signal
self.nonant_send_buffer = communicator_array(self.local_lengths[idx - 1])
elif self.local_lengths[idx - 1] + 1 != len(self.nonant_send_buffer):
raise RuntimeError("Nonant buffers disagree on size")
[docs]
def initialize_boundsout(self):
""" Initialize the buffer for the hub to send bounds
to bounds only spokes
"""
self.boundsout_send_buffer = None
for idx in self.bounds_only_indices:
if self.boundsout_send_buffer is None:
self.boundsout_send_buffer = communicator_array(self.local_lengths[idx - 1])
if self.local_lengths[idx - 1] != 2:
raise RuntimeError(f'bounds only local length buffers must be 2 (bounds). Currently {self.local_lengths[idx - 1]}')
def _populate_boundsout_cache(self, buf):
""" Populate a given buffer with the current bounds
"""
buf[-3] = self.BestOuterBound
buf[-2] = self.BestInnerBound
[docs]
def send_boundsout(self):
""" Send bounds to the appropriate spokes
This is called only for spokes which are bounds only.
w and nonant spokes are passed bounds through the w and nonant buffers
"""
self._populate_boundsout_cache(self.boundsout_send_buffer)
logging.debug("hub is sending bounds={}".format(self.boundsout_send_buffer))
for idx in self.bounds_only_indices:
self.hub_to_spoke(self.boundsout_send_buffer, idx)
[docs]
def initialize_spoke_indices(self):
""" Figure out what types of spokes we have,
and sort them into the appropriate classes.
Note:
Some spokes may be multiple types (e.g. outerbound and nonant),
though not all combinations are supported.
"""
self.outerbound_spoke_indices = set()
self.innerbound_spoke_indices = set()
self.nonant_spoke_indices = set()
self.w_spoke_indices = set()
self.outerbound_spoke_chars = dict()
self.innerbound_spoke_chars = dict()
for (i, spoke) in enumerate(self.spokes):
spoke_class = spoke["spoke_class"]
if hasattr(spoke_class, "converger_spoke_types"):
for cst in spoke_class.converger_spoke_types:
if cst == ConvergerSpokeType.OUTER_BOUND:
self.outerbound_spoke_indices.add(i + 1)
self.outerbound_spoke_chars[i+1] = spoke_class.converger_spoke_char
elif cst == ConvergerSpokeType.INNER_BOUND:
self.innerbound_spoke_indices.add(i + 1)
self.innerbound_spoke_chars[i+1] = spoke_class.converger_spoke_char
elif cst == ConvergerSpokeType.W_GETTER:
self.w_spoke_indices.add(i + 1)
elif cst == ConvergerSpokeType.NONANT_GETTER:
self.nonant_spoke_indices.add(i + 1)
else:
raise RuntimeError(f"Unrecognized converger_spoke_type {cst}")
else: ##this isn't necessarily wrong, i.e., cut generators
logger.debug(f"Spoke class {spoke_class} not recognized by hub")
# all _BoundSpoke spokes get hub bounds so we determine which spokes
# are "bounds only"
self.bounds_only_indices = \
(self.outerbound_spoke_indices | self.innerbound_spoke_indices) - \
(self.w_spoke_indices | self.nonant_spoke_indices)
self.has_outerbound_spokes = len(self.outerbound_spoke_indices) > 0
self.has_innerbound_spokes = len(self.innerbound_spoke_indices) > 0
self.has_nonant_spokes = len(self.nonant_spoke_indices) > 0
self.has_w_spokes = len(self.w_spoke_indices) > 0
self.has_bounds_only_spokes = len(self.bounds_only_indices) > 0
# Not all opt classes may have extensions
if getattr(self.opt, "extensions", None) is not None:
self.opt.extobject.initialize_spoke_indices()
[docs]
def make_windows(self):
if self._windows_constructed:
# different parts of the hub may call make_windows,
# we just care about the first call
return
# Spokes notify the hub of the buffer sizes
for i in range(self.n_spokes):
pair_of_sizes = np.zeros(2, dtype="i")
self.strata_comm.Recv((pair_of_sizes, MPI.INT), source=i + 1, tag=i + 1)
self.remote_lengths[i] = pair_of_sizes[0]
self.local_lengths[i] = pair_of_sizes[1]
# Make the windows of the appropriate buffer sizes
self.windows = [None for _ in range(self.n_spokes)]
self.buffers = [None for _ in range(self.n_spokes)]
for i in range(self.n_spokes):
length = self.local_lengths[i]
win, buff = self._make_window(length)
self.windows[i] = win
self.buffers[i] = buff
# flag this for multiple calls from the hub
self._windows_constructed = True
[docs]
def hub_to_spoke(self, values, spoke_strata_rank):
""" Put the specified values into the specified locally-owned buffer
for the spoke to pick up.
Notes:
This automatically does the -1 indexing
This assumes that values contains a slot at the end for the
write_id
"""
expected_length = self.local_lengths[spoke_strata_rank - 1] + 1
if len(values) != expected_length:
raise RuntimeError(
f"Attempting to put array of length {len(values)} "
f"into local buffer of length {expected_length}"
)
# this is so the spoke ranks all get the same write_id at approximately the same time
if not isinstance(self.opt, APH):
self.cylinder_comm.Barrier()
self.local_write_ids[spoke_strata_rank - 1] += 1
values[-1] = self.local_write_ids[spoke_strata_rank - 1]
window = self.windows[spoke_strata_rank - 1]
window.Lock(self.strata_rank)
window.Put((values, len(values), MPI.DOUBLE), self.strata_rank)
window.Unlock(self.strata_rank)
[docs]
def hub_from_spoke(self, values, spoke_num):
""" spoke_num is the rank in the strata_comm, so it is 1-based not 0-based
Returns:
is_new (bool): Indicates whether the "gotten" values are new,
based on the write_id.
"""
expected_length = self.remote_lengths[spoke_num - 1] + 1
if len(values) != expected_length:
raise RuntimeError(
f"Hub trying to get buffer of length {expected_length} "
f"from spoke, but provided buffer has length {len(values)}."
)
# so the window in each rank gets read at approximately the same time,
# and so has the same write_id
if not isinstance(self.opt, APH):
self.cylinder_comm.Barrier()
window = self.windows[spoke_num - 1]
window.Lock(spoke_num)
window.Get((values, len(values), MPI.DOUBLE), spoke_num)
window.Unlock(spoke_num)
if isinstance(self.opt, APH):
# reverting part of changes from Ben getting rid of spoke sleep DLW jan 2023
if values[-1] > self.remote_write_ids[spoke_num - 1]:
self.remote_write_ids[spoke_num - 1] = values[-1]
return True
else:
new_id = int(values[-1])
local_val = np.array((new_id,), 'i')
sum_ids = np.zeros(1, 'i')
self.cylinder_comm.Allreduce((local_val, MPI.INT),
(sum_ids, MPI.INT),
op=MPI.SUM)
if new_id != sum_ids[0] / self.cylinder_comm.size:
return False
if (new_id > self.remote_write_ids[spoke_num - 1]) or (new_id < 0):
self.remote_write_ids[spoke_num - 1] = new_id
return True
return False
[docs]
def send_terminate(self):
""" Send an array of zeros with a -1 appended to the
end to indicate termination. This function puts to the local
buffer, so every spoke will see it simultaneously.
processes (don't need to call them one at a time).
"""
for rank in range(1, self.n_spokes + 1):
dummies = np.zeros(self.local_lengths[rank - 1] + 1)
dummies[-1] = -1
window = self.windows[rank - 1]
window.Lock(0)
window.Put((dummies, len(dummies), MPI.DOUBLE), 0)
window.Unlock(0)
[docs]
class PHHub(Hub):
[docs]
def setup_hub(self):
""" Must be called after make_windows(), so that
the hub knows the sizes of all the spokes windows
"""
if not self._windows_constructed:
raise RuntimeError(
"Cannot call setup_hub before memory windows are constructed"
)
self.initialize_spoke_indices()
self.initialize_bound_values()
if self.has_outerbound_spokes:
self.initialize_outer_bound_buffers()
if self.has_innerbound_spokes:
self.initialize_inner_bound_buffers()
if self.has_w_spokes:
self.initialize_ws()
if self.has_nonant_spokes:
self.initialize_nonants()
if self.has_bounds_only_spokes:
self.initialize_boundsout() # bounds going out
## Do some checking for things we currently don't support
if len(self.outerbound_spoke_indices & self.innerbound_spoke_indices) > 0:
raise RuntimeError(
"A Spoke providing both inner and outer "
"bounds is currently unsupported"
)
if len(self.w_spoke_indices & self.nonant_spoke_indices) > 0:
raise RuntimeError(
"A Spoke needing both Ws and nonants is currently unsupported"
)
## Generate some warnings if nothing is giving bounds
if not self.has_outerbound_spokes:
logger.warn(
"No OuterBound Spokes defined, this converger "
"will not cause the hub to terminate"
)
if not self.has_innerbound_spokes:
logger.warn(
"No InnerBound Spokes defined, this converger "
"will not cause the hub to terminate"
)
if self.opt.extensions is not None:
self.opt.extobject.setup_hub()
[docs]
def sync(self):
"""
Manages communication with Spokes
"""
if self.has_w_spokes:
self.send_ws()
if self.has_nonant_spokes:
self.send_nonants()
if self.has_bounds_only_spokes:
self.send_boundsout()
if self.has_outerbound_spokes:
self.receive_outerbounds()
if self.has_innerbound_spokes:
self.receive_innerbounds()
if self.opt.extensions is not None:
self.opt.extobject.sync_with_spokes()
[docs]
def sync_with_spokes(self):
self.sync()
[docs]
def is_converged(self):
## might as well get a bound, in this case
if self.opt._PHIter == 1:
self.BestOuterBound = self.OuterBoundUpdate(self.opt.trivial_bound)
if not self.has_innerbound_spokes:
if self.opt._PHIter == 1:
logger.warning(
"PHHub cannot compute convergence without "
"inner bound spokes."
)
## you still want to output status, even without inner bounders configured
if self.global_rank == 0:
self.screen_trace()
return False
if not self.has_outerbound_spokes:
if self.opt._PHIter == 1:
global_toc(
"Without outer bound spokes, no progress "
"will be made on the Best Bound")
## log some output
if self.global_rank == 0:
self.screen_trace()
return self.determine_termination()
[docs]
def current_iteration(self):
""" Return the current PH iteration."""
return self.opt._PHIter
[docs]
def main(self):
""" SPComm gets attached in self.__init__ """
self.opt.ph_main(finalize=False)
[docs]
def finalize(self):
""" does PH.post_loops, returns Eobj """
Eobj = self.opt.post_loops(self.opt.extensions)
return Eobj
[docs]
def send_nonants(self):
""" Gather nonants and send them to the appropriate spokes
TODO: Will likely fail with bundling
"""
self.opt._save_nonants()
ci = 0 ## index to self.nonant_send_buffer
nonant_send_buffer = self.nonant_send_buffer
for k, s in self.opt.local_scenarios.items():
for xvar in s._mpisppy_data.nonant_indices.values():
nonant_send_buffer[ci] = xvar._value
ci += 1
logging.debug("hub is sending X nonants={}".format(nonant_send_buffer))
self._populate_boundsout_cache(nonant_send_buffer)
for idx in self.nonant_spoke_indices:
self.hub_to_spoke(nonant_send_buffer, idx)
[docs]
def initialize_ws(self):
""" Initialize the buffer for the hub to send dual weights
to the appropriate spokes
"""
self.w_send_buffer = None
for idx in self.w_spoke_indices:
if self.w_send_buffer is None:
self.w_send_buffer = communicator_array(self.local_lengths[idx - 1])
elif self.local_lengths[idx - 1] + 1 != len(self.w_send_buffer):
raise RuntimeError("W buffers disagree on size")
[docs]
def send_ws(self):
""" Send dual weights to the appropriate spokes
"""
self.opt._populate_W_cache(self.w_send_buffer, padding=3)
logging.debug("hub is sending Ws={}".format(self.w_send_buffer))
self._populate_boundsout_cache(self.w_send_buffer)
for idx in self.w_spoke_indices:
self.hub_to_spoke(self.w_send_buffer, idx)
[docs]
class LShapedHub(Hub):
[docs]
def setup_hub(self):
""" Must be called after make_windows(), so that
the hub knows the sizes of all the spokes windows
"""
if not self._windows_constructed:
raise RuntimeError(
"Cannot call setup_hub before memory windows are constructed"
)
self.initialize_spoke_indices()
self.initialize_bound_values()
if self.has_outerbound_spokes:
self.initialize_outer_bound_buffers()
if self.has_innerbound_spokes:
self.initialize_inner_bound_buffers()
## Do some checking for things we currently
## do not support
if self.has_w_spokes:
raise RuntimeError("LShaped hub does not compute dual weights (Ws)")
if self.has_nonant_spokes:
self.initialize_nonants()
if len(self.outerbound_spoke_indices & self.innerbound_spoke_indices) > 0:
raise RuntimeError(
"A Spoke providing both inner and outer "
"bounds is currently unsupported"
)
## Generate some warnings if nothing is giving bounds
if not self.has_innerbound_spokes:
logger.warn(
"No InnerBound Spokes defined, this converger "
"will not cause the hub to terminate"
)
[docs]
def sync(self, send_nonants=True):
"""
Manages communication with Bound Spokes
"""
if send_nonants and self.has_nonant_spokes:
self.send_nonants()
if self.has_outerbound_spokes:
self.receive_outerbounds()
if self.has_innerbound_spokes:
self.receive_innerbounds()
# in case LShaped ever gets extensions
if getattr(self.opt, "extensions", None) is not None:
self.opt.extobject.sync_with_spokes()
[docs]
def is_converged(self):
""" Returns a boolean. If True, then LShaped will terminate
Side-effects:
The L-shaped method produces outer bounds during execution,
so we will check it as well.
"""
bound = self.opt._LShaped_bound
self.BestOuterBound = self.OuterBoundUpdate(bound)
## log some output
if self.global_rank == 0:
self.screen_trace()
return self.determine_termination()
[docs]
def current_iteration(self):
""" Return the current L-shaped iteration."""
return self.opt.iter
[docs]
def main(self):
""" SPComm gets attached in self.__init__ """
self.opt.lshaped_algorithm()
[docs]
def send_nonants(self):
""" Gather nonants and send them to the appropriate spokes
TODO: Will likely fail with bundling
"""
ci = 0 ## index to self.nonant_send_buffer
nonant_send_buffer = self.nonant_send_buffer
for k, s in self.opt.local_scenarios.items():
nonant_to_root_var_map = s._mpisppy_model.subproblem_to_root_vars_map
for xvar in s._mpisppy_data.nonant_indices.values():
## Grab the value from the associated root variable
nonant_send_buffer[ci] = nonant_to_root_var_map[xvar]._value
ci += 1
logging.debug("hub is sending X nonants={}".format(nonant_send_buffer))
self._populate_boundsout_cache(nonant_send_buffer)
for idx in self.nonant_spoke_indices:
self.hub_to_spoke(nonant_send_buffer, idx)
[docs]
class APHHub(PHHub):
[docs]
def main(self):
""" SPComm gets attached by self.__init___; holding APH harmless """
logger.critical("aph debug main in hub.py")
self.opt.APH_main(spcomm=self, finalize=False)
[docs]
def finalize(self):
""" does PH.post_loops, returns Eobj """
# NOTE: APH_main does NOT pass in extensions
# to APH.post_loops
Eobj = self.opt.post_loops()
return Eobj