# Copyright 2020 by B. Knueven, D. Mildebrath, C. Muir, J-P Watson, and D.L. Woodruff
# This software is distributed under the 3-clause BSD License.
import numpy as np
import abc
import enum
import logging
import time
import os
import math
import mpisppy.utils.sputils as sputils
from pyomo.environ import ComponentMap, Var
from mpisppy import MPI
from mpisppy.cylinders.spcommunicator import SPCommunicator, communicator_array
[docs]
class ConvergerSpokeType(enum.Enum):
OUTER_BOUND = 1
INNER_BOUND = 2
W_GETTER = 3
NONANT_GETTER = 4
[docs]
class Spoke(SPCommunicator):
def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, options=None):
super().__init__(spbase_object, fullcomm, strata_comm, cylinder_comm, options)
self.local_write_id = 0
self.remote_write_id = 0
self.local_length = 0 # Does NOT include the + 1
self.remote_length = 0 # Length on hub; does NOT include + 1
self.last_call_to_got_kill_signal = time.time()
def _make_windows(self, local_length, remote_length):
# Spokes notify the hub of the buffer sizes
pair_of_lengths = np.array([local_length, remote_length], dtype="i")
self.strata_comm.Send((pair_of_lengths, MPI.INT), dest=0, tag=self.strata_rank)
self.local_length = local_length
self.remote_length = remote_length
# Make the windows of the appropriate buffer sizes
# To do?: Spoke should not need to know how many other spokes there are.
# Just call a single _make_window()? Do you need to create empty
# windows?
# ANSWER (dlw July 2020): Since the windows have zero length and since
# the number of spokes is not expected to be large, it is probably OK.
# The (minor) benefit is that free_windows does not need to know if it
# was called by a hub or a spoke. If we ever move to dynamic spoke
# creation, then this needs to be reimagined.
self.windows = [None for _ in range(self.n_spokes)]
self.buffers = [None for _ in range(self.n_spokes)]
for i in range(self.n_spokes):
length = self.local_length if self.strata_rank == i + 1 else 0
win, buff = self._make_window(length)
self.windows[i] = win
self.buffers[i] = buff
self._windows_constructed = True
[docs]
def spoke_to_hub(self, values):
""" Put the specified values into the locally-owned buffer for the hub
to pick up.
Notes:
This automatically does the -1 indexing
This assumes that values contains a slot at the end for the
write_id
"""
expected_length = self.local_length + 1
if len(values) != expected_length:
raise RuntimeError(
f"Attempting to put array of length {len(values)} "
f"into local buffer of length {expected_length}"
)
self.cylinder_comm.Barrier()
self.local_write_id += 1
values[-1] = self.local_write_id
window = self.windows[self.strata_rank - 1]
window.Lock(self.strata_rank)
window.Put((values, len(values), MPI.DOUBLE), self.strata_rank)
window.Unlock(self.strata_rank)
[docs]
def spoke_from_hub(self, values):
"""
"""
expected_length = self.remote_length + 1
if len(values) != expected_length:
raise RuntimeError(
f"Spoke trying to get buffer of length {expected_length} "
f"from hub, but provided buffer has length {len(values)}."
)
self.cylinder_comm.Barrier()
window = self.windows[self.strata_rank - 1]
window.Lock(0)
window.Get((values, len(values), MPI.DOUBLE), 0)
window.Unlock(0)
new_id = int(values[-1])
local_val = np.array((new_id,-new_id), 'i')
max_min_ids = np.zeros(2, 'i')
self.cylinder_comm.Allreduce((local_val, MPI.INT),
(max_min_ids, MPI.INT),
op=MPI.MAX)
max_id = max_min_ids[0]
min_id = -max_min_ids[1]
# NOTE: we only proceed if all the ranks agree
# on the ID
if max_id != min_id:
return False
assert max_id == min_id == new_id
if (new_id > self.remote_write_id) or (new_id < 0):
self.remote_write_id = new_id
return True
return False
[docs]
def got_kill_signal(self):
""" Spoke should call this method at least every iteration
to see if the Hub terminated
"""
return self._got_kill_signal()
[docs]
@abc.abstractmethod
def main(self):
"""
The main call for the Spoke. Derived classe
should call the got_kill_signal method
regularly to ensure all ranks terminate
with the Hub.
"""
pass
[docs]
def get_serial_number(self):
return self.remote_write_id
@abc.abstractmethod
def _got_kill_signal(self):
""" Every spoke needs a way to get the signal to terminate
from the hub
"""
pass
class _BoundSpoke(Spoke):
""" A base class for bound spokes
"""
def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, options=None):
super().__init__(spbase_object, fullcomm, strata_comm, cylinder_comm, options)
if self.cylinder_rank == 0 and \
'trace_prefix' in spbase_object.options and \
spbase_object.options['trace_prefix'] is not None:
trace_prefix = spbase_object.options['trace_prefix']
filen = trace_prefix+self.__class__.__name__+'.csv'
if os.path.exists(filen):
raise RuntimeError(f"Spoke trace file {filen} already exists!")
with open(filen, 'w') as f:
f.write("time,bound\n")
self.trace_filen = filen
self.start_time = spbase_object.start_time
else:
self.trace_filen = None
self._new_locals = False
self._bound = None
self._locals = None
def make_windows(self):
""" Makes the bound window and a remote window to
look for a kill signal
"""
self._make_windows(1, 2) # kill signals are accounted for in _make_window
self._bound = communicator_array(1) # spoke bound + kill signal
self._locals = communicator_array(2) # hub outer/inner bounds and kill signal
@property
def bound(self):
return self._bound[0]
@bound.setter
def bound(self, value):
self._append_trace(value)
self._bound[0] = value
self.spoke_to_hub(self._bound)
@property
def hub_inner_bound(self):
"""Returns the local copy of the inner bound from the hub"""
return self._locals[-2]
@property
def hub_outer_bound(self):
"""Returns the local copy of the outer bound from the hub"""
return self._locals[-3]
def _got_kill_signal(self):
"""Looks for the kill signal and returns True if sent"""
self._new_locals = self.spoke_from_hub(self._locals)
return self.remote_write_id == -1
def _append_trace(self, value):
if self.cylinder_rank != 0 or self.trace_filen is None:
return
with open(self.trace_filen, 'a') as f:
f.write(f"{time.perf_counter()-self.start_time},{value}\n")
class _BoundNonantLenSpoke(_BoundSpoke):
""" A base class for bound spokes which also
want something of len nonants from OPT
"""
def make_windows(self):
""" Makes the bound window and with a remote buffer long enough
to hold an array as long as the nonants.
Input:
opt (SPBase): Must have local_scenarios attached already!
"""
if not hasattr(self.opt, "local_scenarios"):
raise RuntimeError("Provided SPBase object does not have local_scenarios attribute")
if len(self.opt.local_scenarios) == 0:
raise RuntimeError(f"Rank has zero local_scenarios")
vbuflen = 2
for s in self.opt.local_scenarios.values():
vbuflen += len(s._mpisppy_data.nonant_indices)
self._make_windows(1, vbuflen)
self._bound = communicator_array(1)
self._locals = communicator_array(vbuflen)
[docs]
class InnerBoundSpoke(_BoundSpoke):
""" For Spokes that provide an inner bound through self.bound to the
Hub, and do not need information from the main PH OPT hub.
"""
converger_spoke_types = (ConvergerSpokeType.INNER_BOUND,)
converger_spoke_char = 'I'
[docs]
class OuterBoundSpoke(_BoundSpoke):
""" For Spokes that provide an outer bound through self.bound to the
Hub, and do not need information from the main PH OPT hub.
"""
converger_spoke_types = (ConvergerSpokeType.OUTER_BOUND,)
converger_spoke_char = 'O'
class _BoundWSpoke(_BoundNonantLenSpoke):
""" A base class for bound spokes which also want the W's from the OPT
threads
"""
@property
def localWs(self):
"""Returns the local copy of the weights"""
return self._locals[:-3] # -3 for the bounds and kill signal
@property
def new_Ws(self):
""" Returns True if the local copy of
the weights has been updated since
the last call to got_kill_signal
"""
return self._new_locals
[docs]
class OuterBoundWSpoke(_BoundWSpoke):
"""
For Spokes that provide an outer bound
through self.bound to the Hub,
and receive the Ws (or weights) from
the main PH OPT hub.
"""
converger_spoke_types = (
ConvergerSpokeType.OUTER_BOUND,
ConvergerSpokeType.W_GETTER,
)
converger_spoke_char = 'O'
class _BoundNonantSpoke(_BoundNonantLenSpoke):
""" A base class for bound spokes which also
want the xhat's from the OPT threads
"""
@property
def localnonants(self):
"""Returns the local copy of the nonants"""
return self._locals[:-3]
@property
def new_nonants(self):
"""Returns True if the local copy of
the nonants has been updated since
the last call to got_kill_signal"""
return self._new_locals
[docs]
class InnerBoundNonantSpoke(_BoundNonantSpoke):
""" For Spokes that provide an inner (incumbent)
bound through self.bound to the Hub,
and receive the nonants from
the main SPOpt hub.
Includes some helpful methods for saving
and restoring results
"""
converger_spoke_types = (
ConvergerSpokeType.INNER_BOUND,
ConvergerSpokeType.NONANT_GETTER,
)
converger_spoke_char = 'I'
def __init__(self, spbase_object, fullcomm, strata_comm, cylinder_comm, options=None):
super().__init__(spbase_object, fullcomm, strata_comm, cylinder_comm, options)
self.is_minimizing = self.opt.is_minimizing
self.best_inner_bound = math.inf if self.is_minimizing else -math.inf
self.solver_options = None # can be overwritten by derived classes
# set up best solution cache
for k,s in self.opt.local_scenarios.items():
s._mpisppy_data.best_solution_cache = None
[docs]
def update_if_improving(self, candidate_inner_bound):
if candidate_inner_bound is None:
return False
update = (candidate_inner_bound < self.best_inner_bound) \
if self.is_minimizing else \
(self.best_inner_bound < candidate_inner_bound)
if not update:
return False
self.best_inner_bound = candidate_inner_bound
# send to hub
self.bound = candidate_inner_bound
self._cache_best_solution()
return True
[docs]
def finalize(self):
for k,s in self.opt.local_scenarios.items():
if s._mpisppy_data.best_solution_cache is None:
return None
for var, value in s._mpisppy_data.best_solution_cache.items():
var.set_value(value, skip_validation=True)
self.opt.first_stage_solution_available = True
self.opt.tree_solution_available = True
self.final_bound = self.bound
return self.final_bound
def _cache_best_solution(self):
for k,s in self.opt.local_scenarios.items():
scenario_cache = ComponentMap()
for var in s.component_data_objects(Var):
scenario_cache[var] = var.value
s._mpisppy_data.best_solution_cache = scenario_cache
[docs]
class OuterBoundNonantSpoke(_BoundNonantSpoke):
""" For Spokes that provide an outer
bound through self.bound to the Hub,
and receive the nonants from
the main OPT hub.
"""
converger_spoke_types = (
ConvergerSpokeType.OUTER_BOUND,
ConvergerSpokeType.NONANT_GETTER,
)
converger_spoke_char = 'A' # probably Lagrangian