###############################################################################
# mpi-sppy: MPI-based Stochastic Programming in PYthon
#
# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for
# Sustainable Energy, LLC, The Regents of the University of California, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
# full copyright and license information.
###############################################################################
# base class for hub and for spoke strata
import os
import time
import hashlib
import logging
import weakref
import numpy as np
import pyomo.environ as pyo
from mpisppy.utils import nice_join, sputils
from mpisppy import global_toc
from pyomo.common.collections import ComponentSet
from mpisppy import MPI
logger = logging.getLogger("SPBase")
logger.setLevel(logging.WARN)
[docs]
class SPBase:
""" Defines an interface to all strata (hubs and spokes)
Args:
options (dict): options
all_scenario_names (list): all scenario names
scenario_creator (fct): returns a concrete model with special things
scenario_denouement (fct): for post processing and reporting
all_nodenames (list): all node names (including leaves); can be None for 2 Stage
mpicomm (MPI comm): if not given, use the global fullcomm
scenario_creator_kwargs (dict): kwargs passed directly to
scenario_creator.
variable_probability (fct): returns a list of tuples of (id(var), prob)
to set variable-specific probability (similar to PHBase.rho_setter).
Attributes:
local_scenarios (dict of scenario objects): concrete models with
extra data, key is name
comms (dict): keys are node names values are comm objects.
local_scenario_names (list): names of locals
"""
def __init__(
self,
options,
all_scenario_names,
scenario_creator,
scenario_denouement=None,
all_nodenames=None,
mpicomm=None,
scenario_creator_kwargs=None,
variable_probability=None,
E1_tolerance=1e-5,
):
# TODO add missing and private attributes (JP)
# TODO add a class attribute called ROOTNODENAME = "ROOT"
# TODO? add decorators to the class attributes
self.start_time = time.perf_counter()
self.options = options
self.all_scenario_names = all_scenario_names
self.scenario_creator = scenario_creator
self.scenario_denouement = scenario_denouement
self.comms = dict()
self.local_scenarios = dict()
self.local_scenario_names = list()
self.E1_tolerance = E1_tolerance # probs must sum to almost 1
self.scenarios_constructed = False
if all_nodenames is None:
self.all_nodenames = ["ROOT"]
elif "ROOT" in all_nodenames:
self.all_nodenames = all_nodenames
self._check_nodenames()
else:
raise RuntimeError("'ROOT' must be in the list of node names")
self.variable_probability = variable_probability
self.multistage = (len(self.all_nodenames) > 1)
# Set up MPI communicator and rank
if mpicomm is not None:
self.mpicomm = mpicomm
else:
self.mpicomm = MPI.COMM_WORLD
self.cylinder_rank = self.mpicomm.Get_rank()
self.n_proc = self.mpicomm.Get_size()
self.global_rank = MPI.COMM_WORLD.Get_rank()
# for writers, if the appropriate
# solution is loaded into the subproblems
self.tree_solution_available = False
self.first_stage_solution_available = False
self.best_solution_obj_val = None
# sometimes we know a best bound
self.best_bound_obj_val = None
if options.get("toc", True):
global_toc("Initializing SPBase")
if self.n_proc > len(self.all_scenario_names):
raise RuntimeError(f"More ranks ({self.n_proc}) than scenarios"
f" ({len(self.all_scenario_names)})")
self._calculate_scenario_ranks()
if "bundles_per_rank" in self.options and self.options["bundles_per_rank"] > 0:
raise RuntimeError(
"Loose bundling (bundles_per_rank > 0) was removed in 2026.\n"
"Use 'proper bundles' instead (--scenarios-per-bundle).\n"
"See doc/src/properbundles.rst and mpisppy/generic_cylinders.py."
)
self.bundling = False
self._create_scenarios(scenario_creator_kwargs)
self._look_and_leap()
self._compute_unconditional_node_probabilities()
self._attach_nlens()
self._attach_nonant_indices()
self._attach_varid_to_nonant_index()
self._create_communicators()
self._verify_nonant_lengths()
if not self.options.get("turn_off_names_check", False):
self._verify_nonant_names()
self._set_sense()
self._use_variable_probability_setter()
self._set_solution_cache()
## SPCommunicator object
self._spcomm = None
def _set_sense(self, comm=None):
""" Check to confirm that all the models constructed by scenario_crator
have the same sense (min v. max), and set self.is_minimizing
accordingly.
"""
is_min, clear = sputils._models_have_same_sense(self.local_scenarios)
if not clear:
raise RuntimeError(
"All scenario models must have the same "
"model sense (minimize or maximize)"
)
self.is_minimizing = is_min
if self.n_proc <= 1:
return
# Check that all the ranks agree
global_senses = self.mpicomm.gather(is_min, root=0)
if self.cylinder_rank != 0:
return
sense = global_senses[0]
clear = all(val == sense for val in global_senses)
if not clear:
raise RuntimeError(
"All scenario models must have the same "
"model sense (minimize or maximize)"
)
def _verify_nonant_lengths(self):
local_node_nonant_lengths = {} # keys are tree node names
# we need to accumulate all local contributions before the reduce
for k,s in self.local_scenarios.items():
nlens = s._mpisppy_data.nlens
for node in s._mpisppy_node_list:
ndn = node.name
mylen = nlens[ndn]
if ndn not in local_node_nonant_lengths:
local_node_nonant_lengths[ndn] = mylen
elif local_node_nonant_lengths[ndn] != mylen:
raise RuntimeError(f"Tree node {ndn} has scenarios with different numbers of non-anticipative "
f"variables: {mylen} vs. {local_node_nonant_lengths[ndn]}")
# compute node values(reduction)
for ndn, val in local_node_nonant_lengths.items():
local_val = np.array([val], 'i')
max_val = np.zeros(1, 'i')
self.comms[ndn].Allreduce([local_val, MPI.INT],
[max_val, MPI.INT],
op=MPI.MAX)
if val != int(max_val[0]):
raise RuntimeError(f"Tree node {ndn} has scenarios with different numbers of non-anticipative "
f"variables: {val} vs. max {max_val[0]}")
def _verify_nonant_names(self):
"""Verify that nonant variable names match across scenarios.
For each tree node, all scenarios must list the same nonant
variables in the same order. A local (same-rank) check compares
names directly; a cross-rank check uses a deterministic hash
with MPI allreduce (MAX vs MIN) to detect mismatches.
"""
# --- local check (within this rank) ---
local_node_names = {} # ndn -> list of var names (from first seen scenario)
for k, s in self.local_scenarios.items():
for node in s._mpisppy_node_list:
ndn = node.name
names = [var.name for var in node.nonant_vardata_list]
if ndn not in local_node_names:
local_node_names[ndn] = (names, k)
else:
ref_names, ref_scen = local_node_names[ndn]
if names != ref_names:
# Find the first mismatch for a helpful message
for i, (a, b) in enumerate(zip(ref_names, names)):
if a != b:
raise RuntimeError(
f"Tree node {ndn}: nonant variable name "
f"mismatch at position {i}. "
f"Scenario {ref_scen} has '{a}', "
f"scenario {k} has '{b}'.")
# Length mismatch (should be caught by _verify_nonant_lengths)
raise RuntimeError(
f"Tree node {ndn}: nonant variable list length "
f"mismatch between scenarios {ref_scen} and {k}.")
# --- cross-rank check (via deterministic hash) ---
for ndn, (names, _) in local_node_names.items():
name_str = ",".join(names)
h = hashlib.sha256(name_str.encode()).digest()[:4]
local_hash = np.frombuffer(h, dtype=np.int32).copy()
max_hash = np.zeros(1, dtype=np.int32)
min_hash = np.zeros(1, dtype=np.int32)
self.comms[ndn].Allreduce([local_hash, MPI.INT],
[max_hash, MPI.INT],
op=MPI.MAX)
self.comms[ndn].Allreduce([local_hash, MPI.INT],
[min_hash, MPI.INT],
op=MPI.MIN)
if max_hash[0] != min_hash[0]:
raise RuntimeError(
f"Tree node {ndn}: nonant variable names do not "
f"match across ranks. Check that all scenarios "
f"list nonant variables in the same order.")
def _check_nodenames(self):
for ndn in self.all_nodenames:
if ndn != 'ROOT' and sputils.parent_ndn(ndn) not in self.all_nodenames:
raise RuntimeError(f"all_nodenames is inconsistent:"
f"The node {sputils.parent_ndn(ndn)}, parent of {ndn}, is missing.")
def _calculate_scenario_ranks(self):
""" Populate the following attributes
1. self.scenario_names_to_rank (dict of dict):
keys are comms (i.e., tree nodes); values are dicts with keys
that are scenario names and values that are ranks within that comm
2. self._rank_slices (list of lists)
indices correspond to ranks in self.mpicomm and the values are a list
of scenario indices
rank -> list of scenario indices for that rank
3. self._scenario_slices (list)
indices are scenario indices and values are the rank of that scenario
within self.mpicomm
scenario index -> rank
4. self._scenario_tree (instance of sputils._ScenTree)
5. self.local_scenario_names (list)
List of index names owned by the local rank
"""
tree = sputils._ScenTree(self.all_nodenames, self.all_scenario_names)
self.scenario_names_to_rank, self._rank_slices, self._scenario_slices =\
tree.scen_names_to_ranks(self.n_proc)
self._scenario_tree = tree
self.nonleaves = {node.name : node for node in tree.nonleaves}
# list of scenario names owned locally
self.local_scenario_names = [
self.all_scenario_names[i] for i in self._rank_slices[self.cylinder_rank]
]
def _create_scenarios(self, scenario_creator_kwargs):
""" Call the scenario_creator for every local scenario, and store the
results in self.local_scenarios (dict indexed by scenario names).
Notes:
If a scenario probability is not specified as an attribute
_mpisppy_probability of the ConcreteModel returned by ScenarioCreator,
this function automatically assumes uniform probabilities.
"""
if self.scenarios_constructed:
raise RuntimeError("Scenarios already constructed.")
if scenario_creator_kwargs is None:
scenario_creator_kwargs = dict()
local_ict = list() # Local instance creation times for time tracking
for sname in self.local_scenario_names:
instance_creation_start_time = time.time()
s = self.scenario_creator(sname, **scenario_creator_kwargs)
self.local_scenarios[sname] = s
if self.multistage:
#Checking that the scenario can have an associated leaf node in all_nodenames
stmax = np.argmax([nd.stage for nd in s._mpisppy_node_list])
if(s._mpisppy_node_list[stmax].name)+'_0' not in self.all_nodenames:
raise RuntimeError("The leaf node associated with this scenario is not on all_nodenames"
f"Its last non-leaf node {s._mpisppy_node_list[stmax].name} has no first child {s._mpisppy_node_list[stmax].name+'_0'}")
local_ict.append(time.time() - instance_creation_start_time)
if self.options.get("display_timing", False):
all_instance_creation_times = self.mpicomm.gather(
local_ict, root=0
)
if self.cylinder_rank == 0:
aict = [ict for l_ict in all_instance_creation_times for ict in l_ict]
print("Scenario instance creation times:")
print(f"\tmin={np.min(aict):4.2f} mean={np.mean(aict):4.2f} max={np.max(aict):4.2f}")
self.scenarios_constructed = True
def _attach_nonant_indices(self):
for (sname, scenario) in self.local_scenarios.items():
_nonant_indices = dict()
_all_surrogate_nonants = ComponentSet()
nlens = scenario._mpisppy_data.nlens
for node in scenario._mpisppy_node_list:
ndn = node.name
for i in range(nlens[ndn]):
_nonant_indices[ndn,i] = node.nonant_vardata_list[i]
_all_surrogate_nonants.update(node.surrogate_vardatas)
scenario._mpisppy_data.nonant_indices = _nonant_indices
scenario._mpisppy_data.all_surrogate_nonants = _all_surrogate_nonants
self.nonant_length = len(_nonant_indices)
# sanity check the nonant length
for s in self.local_scenarios.values():
assert self.nonant_length == len(s._mpisppy_data.nonant_indices)
def _attach_nlens(self):
for (sname, scenario) in self.local_scenarios.items():
# Things need to be by node so we can bind to the
# indices of the vardata lists for the nodes.
scenario._mpisppy_data.nlens = {
node.name: len(node.nonant_vardata_list)
for node in scenario._mpisppy_node_list
}
# NOTE: This only is used by extensions.xhatbase.XhatBase._try_one.
# If that is re-factored, we can remove it here.
scenario._mpisppy_data.cistart = dict()
sofar = 0
for ndn, ndn_len in scenario._mpisppy_data.nlens.items():
scenario._mpisppy_data.cistart[ndn] = sofar
sofar += ndn_len
def _attach_varid_to_nonant_index(self):
""" Create a map from the id of nonant variables to their Pyomo index.
"""
for (sname, scenario) in self.local_scenarios.items():
# In order to support rho setting, create a map
# from the id of vardata object back its _nonant_index.
scenario._mpisppy_data.varid_to_nonant_index =\
{id(var): ndn_i for ndn_i, var in scenario._mpisppy_data.nonant_indices.items()}
def _create_communicators(self):
# Create communicator objects, one for each node
nonleafnodes = dict()
for (sname, scenario) in self.local_scenarios.items():
for node in scenario._mpisppy_node_list:
nonleafnodes[node.name] = node # might be assigned&reassigned
# check the node names given by the scenarios
for nodename in nonleafnodes:
if nodename not in self.all_nodenames:
raise RuntimeError(f"Tree node '{nodename}' not in all_nodenames list {self.all_nodenames} for {self.global_rank=}")
# loop over all nodes and make the comms (split requires all ranks)
# make sure we loop in the same order, so every rank iterate over
# the nodelist
for nodename in self.all_nodenames:
if nodename == "ROOT":
self.comms["ROOT"] = self.mpicomm
elif nodename in nonleafnodes:
#The position in all_nodenames is an integer unique id.
nodenumber = self.all_nodenames.index(nodename)
# IMPORTANT: See note in sputils._ScenTree.scen_names_to_ranks. Need to keep
# this split aligned with self.scenario_names_to_rank
self.comms[nodename] = self.mpicomm.Split(color=nodenumber, key=self.cylinder_rank)
else: # this rank is not included in the communicator
self.mpicomm.Split(color=MPI.UNDEFINED, key=self.n_proc)
## ensure we've set things up correctly for all comms
for nodename, comm in self.comms.items():
scenario_names_to_comm_rank = self.scenario_names_to_rank[nodename]
for sname, rank in scenario_names_to_comm_rank.items():
if sname in self.local_scenarios:
if rank != comm.Get_rank():
raise RuntimeError(f"For the node {nodename}, the scenario {sname} has the rank {rank} from scenario_names_to_rank and {comm.Get_rank()} from its comm.")
## ensure we've set things up correctly for all local scenarios
for sname in self.local_scenarios:
for nodename, comm in self.comms.items():
scenario_names_to_comm_rank = self.scenario_names_to_rank[nodename]
if sname in scenario_names_to_comm_rank:
if comm.Get_rank() != scenario_names_to_comm_rank[sname]:
raise RuntimeError(f"For the node {nodename}, the scenario {sname} has the rank {rank} from scenario_names_to_rank and {comm.Get_rank()} from its comm.")
def _compute_unconditional_node_probabilities(self):
""" calculates unconditional node probabilities and prob_coeff
and prob0_mask is set to a scalar 1 (used by variable_probability)"""
for k,s in self.local_scenarios.items():
root = s._mpisppy_node_list[0]
root.uncond_prob = 1.0
for parent,child in zip(s._mpisppy_node_list[:-1],s._mpisppy_node_list[1:]):
child.uncond_prob = parent.uncond_prob * child.cond_prob
if not hasattr(s._mpisppy_data, 'prob_coeff'):
s._mpisppy_data.prob_coeff = dict()
s._mpisppy_data.prob0_mask = dict()
for node in s._mpisppy_node_list:
s._mpisppy_data.prob_coeff[node.name] = (s._mpisppy_probability / node.uncond_prob)
s._mpisppy_data.prob0_mask[node.name] = 1.0 # needs to be a float
def _use_variable_probability_setter(self, verbose=False):
""" set variable probability unconditional values using a function self.variable_probability
that gives us a list of (id(vardata), probability)]
ALSO set prob0_mask, which is a mask for W calculations (mask out zero probs)
Note: We estimate that less than 0.01 of mpi-sppy runs will call this.
"""
if self.variable_probability is None:
for s in self.local_scenarios.values():
s._mpisppy_data.has_variable_probability = False
return
didit = 0
skipped = 0
variable_probability_kwargs = self.options['variable_probability_kwargs'] \
if 'variable_probability_kwargs' in self.options \
else dict()
sum_probs = {} # indexed by (ndn,i) - maps to sum of probs for that variable
for sname, s in self.local_scenarios.items():
variable_probability = self.variable_probability(s, **variable_probability_kwargs)
s._mpisppy_data.has_variable_probability = True
for (vid, prob) in variable_probability:
ndn, i = s._mpisppy_data.varid_to_nonant_index[vid]
# If you are going to do any variables at a node, you have to do all.
if type(s._mpisppy_data.prob_coeff[ndn]) is float: # not yet a vector
defprob = s._mpisppy_data.prob_coeff[ndn]
s._mpisppy_data.prob_coeff[ndn] = np.full(s._mpisppy_data.nlens[ndn], defprob, dtype='d')
s._mpisppy_data.prob0_mask[ndn] = np.ones(s._mpisppy_data.nlens[ndn], dtype='d')
s._mpisppy_data.prob_coeff[ndn][i] = prob
if prob == 0: # there's probably a way to do this in numpy...
s._mpisppy_data.prob0_mask[ndn][i] = 0
sum_probs[(ndn,i)] = sum_probs.get((ndn,i),0.0) + prob
didit += len(variable_probability)
skipped += len(s._mpisppy_data.varid_to_nonant_index) - didit
""" this needs to be MPIized; but check below should do the trick
for (ndn,i),prob in sum_probs.items():
if not math.isclose(prob, 1.0, abs_tol=self.E1_tolerance):
raise RuntimeError(f"Probability sum for variable with nonant index={i} at node={ndn} is not unity - computed sum={prob}")
"""
if verbose and self.cylinder_rank == 0:
print ("variable_probability set",didit,"and skipped",skipped)
if not self.options.get('do_not_check_variable_probabilities', False):
self._check_variable_probabilities_sum(verbose)
[docs]
def is_zero_prob( self, scenario_model, var ):
"""
Args:
scenario_model : a value in SPBase.local_scenarios
var : a Pyomo Var on the scenario_model
Returns:
True if the variable has 0 probability, False otherwise
"""
if self.variable_probability is None:
return False
_mpisppy_data = scenario_model._mpisppy_data
ndn, i = _mpisppy_data.varid_to_nonant_index[id(var)]
if isinstance(_mpisppy_data.prob_coeff[ndn], np.ndarray):
return float(_mpisppy_data.prob_coeff[ndn][i]) == 0.
else:
return False
def _check_variable_probabilities_sum(self, verbose):
nodenames = [] # to transmit to comms
local_concats = {} # keys are tree node names
global_concats = {} # values sums of node conditional probabilities
# we need to accumulate all local contributions before the reduce
for k,s in self.local_scenarios.items():
nlens = s._mpisppy_data.nlens
for node in s._mpisppy_node_list:
if node.name not in nodenames:
ndn = node.name
nodenames.append(ndn)
local_concats[ndn] = np.zeros(nlens[ndn], dtype='d')
global_concats[ndn] = np.zeros(nlens[ndn], dtype='d')
# sum local conditional probabilities
for k,s in self.local_scenarios.items():
for node in s._mpisppy_node_list:
ndn = node.name
local_concats[ndn] += s._mpisppy_data.prob_coeff[ndn]
# compute sum node conditional probabilities (reduction)
for ndn in nodenames:
self.comms[ndn].Allreduce(
[local_concats[ndn], MPI.DOUBLE],
[global_concats[ndn], MPI.DOUBLE],
op=MPI.SUM)
tol = self.E1_tolerance
checked_nodes = list()
# check sum node conditional probabilites are close to 1
for k,s in self.local_scenarios.items():
nlens = s._mpisppy_data.nlens
for node in s._mpisppy_node_list:
ndn = node.name
if ndn not in checked_nodes:
if not np.allclose(global_concats[ndn], 1., atol=tol):
notclose = ~np.isclose(global_concats[ndn], 1., atol=tol)
indices = np.nonzero(notclose)[0]
bad_vars = [ s._mpisppy_data.nonant_indices[ndn,idx].name for idx in indices ]
badprobs = [ global_concats[ndn][idx] for idx in indices]
raise RuntimeError(f"Node {ndn}, variables {bad_vars} have respective"
f" conditional probability sum {badprobs}"
" which are not 1")
checked_nodes.append(ndn)
def _look_and_leap(self):
for (sname, scenario) in self.local_scenarios.items():
if not hasattr(scenario, "_mpisppy_data"):
scenario._mpisppy_data = pyo.Block(name="For non-Pyomo mpi-sppy data")
if not hasattr(scenario, "_mpisppy_model"):
scenario._mpisppy_model = pyo.Block(name="For mpi-sppy Pyomo additions to the scenario model")
if hasattr(scenario, "PySP_prob"):
raise RuntimeError("PySP_prob is deprecated; use _mpisppy_probability")
pspec = scenario._mpisppy_probability if hasattr(scenario, "_mpisppy_probability") else None
if pspec is None or pspec == "uniform":
prob = 1./len(self.all_scenario_names)
if self.cylinder_rank == 0 and pspec is None:
print(f"Did not find _mpisppy_probability, assuming uniform probability {prob} (avoid this message by assigning a probability or the string 'uniform' to _mpisppy_probability on the scenario model object)")
scenario._mpisppy_probability = prob
if not hasattr(scenario, "_mpisppy_node_list"):
raise RuntimeError(f"_mpisppy_node_list not found on scenario {sname}")
def _options_check(self, required_options, given_options):
""" Confirm that the specified list of options contains the specified
list of required options. Raises a ValueError if anything is
missing.
"""
missing = [option for option in required_options if given_options.get(option) is None]
if missing:
raise ValueError(
"Missing the following required options:"
f" {nice_join(missing, conjunction='and', warp_in_single_quote=True)}."
)
def _set_solution_cache(self):
# set up best solution cache
for k,s in self.local_scenarios.items():
s._mpisppy_data.best_solution_cache = None
s._mpisppy_data.latest_nonant_solution_cache = np.full(len(s._mpisppy_data.nonant_indices), np.nan)
[docs]
def update_best_solution_if_improving(self, obj_val):
""" Call if the variable values have a nonanticipative solution
with associated obj_val. Will update the best_solution_cache
if the solution is better than the existing cached solution
"""
if obj_val is None:
return False
if self.best_solution_obj_val is None:
update = True
elif self.is_minimizing:
update = (obj_val < self.best_solution_obj_val)
else:
update = (self.best_solution_obj_val < obj_val)
if obj_val is not None:
self._cache_latest_solution_nonants()
if update:
self.best_solution_obj_val = obj_val
self._cache_best_solution()
return True
return False
def _cache_best_solution(self):
for k,s in self.local_scenarios.items():
scenario_cache = pyo.ComponentMap()
_put_var_vals_in_component_map_dict(
scenario_cache._dict,
s.component_data_objects(pyo.Var)
)
s._mpisppy_data.best_solution_cache = scenario_cache
def _cache_latest_solution_nonants(self):
for k,s in self.local_scenarios.items():
for idx, v in enumerate(s._mpisppy_data.nonant_indices.values()):
s._mpisppy_data.latest_nonant_solution_cache[idx] = v.value
def _get_cylinder_name(self):
if self.spcomm:
return self.spcomm.__class__.__name__
else:
return self.__class__.__name__
[docs]
def load_best_solution(self):
for k,s in self.local_scenarios.items():
if s._mpisppy_data.best_solution_cache is None:
return False
for var, value in s._mpisppy_data.best_solution_cache.items():
var.set_value(value, skip_validation=True)
self.first_stage_solution_available = True
self.tree_solution_available = True
return True
@property
def spcomm(self):
if self._spcomm is None:
return None
return self._spcomm()
@spcomm.setter
def spcomm(self, value):
if self._spcomm is None:
self._spcomm = weakref.ref(value)
else:
raise RuntimeError("SPBase.spcomm should only be set once")
[docs]
def allreduce_or(self, val):
local_val = np.array([val], dtype='int8')
global_val = np.zeros(1, dtype='int8')
self.mpicomm.Allreduce(local_val, global_val, op=MPI.LOR)
if global_val[0] > 0:
return True
else:
return False
[docs]
def gather_var_values_to_rank0(self, get_zero_prob_values=False, fixed_vars=True):
""" Gather the values of the nonanticipative variables to the root of
the `mpicomm` for the cylinder
Returns:
dict or None:
On the root (rank0), returns a dictionary mapping
(scenario_name, variable_name) pairs to their values. On other
ranks, returns None.
"""
var_values = dict()
for (sname, model) in self.local_scenarios.items():
for node in model._mpisppy_node_list:
for var in node.nonant_vardata_list:
if not fixed_vars and var.fixed:
continue
var_name = var.name
if self.bundling:
dot_index = var_name.find('.')
assert dot_index >= 0
var_name = var_name[(dot_index+1):]
if (self.is_zero_prob(model, var)) and (not get_zero_prob_values):
var_values[sname, var_name] = None
else:
var_values[sname, var_name] = pyo.value(var)
if self.n_proc == 1:
return var_values
result = self.mpicomm.gather(var_values, root=0)
if (self.cylinder_rank == 0):
result = {key: value
for dic in result
for (key, value) in dic.items()
}
return result
[docs]
def report_var_values_at_rank0(self, header="", print_zero_prob_values=False, fixed_vars=True):
""" Pretty-print the values and associated statistics for
non-anticipative variables across all scenarios. """
var_values = self.gather_var_values_to_rank0(get_zero_prob_values=print_zero_prob_values, fixed_vars=fixed_vars)
if self.cylinder_rank == 0:
if len(header) != 0:
print(header)
if len(var_values) == 0:
print("No variables to report (perhaps all are fixed?)")
return
scenario_names = sorted(set(x for (x,y) in var_values))
max_scenario_name_len = max(len(s) for s in scenario_names)
variable_names = sorted(set(y for (x,y) in var_values))
max_variable_name_len = max(len(v) for v in variable_names)
# the "10" below is a reasonable minimum for floating-point output
value_field_len = max(10, max_scenario_name_len)
print("{0: <{width}s} | ".format("", width=max_variable_name_len), end='')
for this_scenario in scenario_names:
print("{0: ^{width}s} ".format(this_scenario, width=value_field_len), end='')
print("")
for this_var in variable_names:
print("{0: <{width}} | ".format(this_var, width=max_variable_name_len), end='')
for this_scenario in scenario_names:
if (this_scenario, this_var) not in var_values:
print("{0: ^{width}s}".format("-", width=value_field_len), end='')
else:
this_var_value = var_values[this_scenario, this_var]
if (this_var_value is None) and (not print_zero_prob_values):
print("{0: ^{width}s}".format("-", width=value_field_len), end='')
else:
print("{0: {width}.4f}".format(this_var_value, width=value_field_len), end='')
print(" ", end='')
print("")
[docs]
def write_first_stage_solution(self, file_name,
first_stage_solution_writer=sputils.first_stage_nonant_writer):
""" Writes the first-stage solution, if this object reports one available.
Args:
file_name: path of file to write first stage solution to
first_stage_solution_writer (optional): custom first stage solution writer function
"""
if not self.first_stage_solution_available:
# try loading a solution
if not self.load_best_solution():
raise RuntimeError("No first stage solution available")
if self.cylinder_rank == 0:
dirname = os.path.dirname(file_name)
if dirname != '':
os.makedirs(os.path.dirname(file_name), exist_ok=True)
representative_scenario = self.local_scenarios[self.local_scenario_names[0]]
first_stage_solution_writer(file_name, representative_scenario, self.bundling)
[docs]
def write_tree_solution(self, directory_name,
scenario_tree_solution_writer=sputils.scenario_tree_solution_writer):
""" Writes the tree solution, if this object reports one available.
Raises a RuntimeError if it is not.
Args:
directory_name: directory to write tree solution to
scenario_tree_solution_writer (optional): custom scenario solution writer function
"""
if not self.tree_solution_available:
# try loading a solution
if not self.load_best_solution():
raise RuntimeError("No tree solution available")
if self.cylinder_rank == 0:
os.makedirs(directory_name, exist_ok=True)
self.mpicomm.Barrier()
for scenario_name, scenario in self.local_scenarios.items():
scenario_tree_solution_writer(directory_name, scenario_name, scenario, self.bundling)
def _put_var_vals_in_component_map_dict(sn_cache_dict, var_iter):
for var in var_iter:
sn_cache_dict[id(var)] = (var, var.value)