# Copyright 2020 by B. Knueven, D. Mildebrath, C. Muir, J-P Watson, and D.L. Woodruff
# This software is distributed under the 3-clause BSD License.
# base class for hub and for spoke strata

import os
import time
import logging
import weakref
import math
import numpy as np
import re
import pyomo.environ as pyo
import mpisppy.utils.sputils as sputils
from mpisppy import global_toc

from mpisppy import MPI

logger = logging.getLogger("SPBase")

[docs] class SPBase: """ Defines an interface to all strata (hubs and spokes) Args: options (dict): options all_scenario_names (list): all scenario names scenario_creator (fct): returns a concrete model with special things scenario_denouement (fct): for post processing and reporting all_nodenames (list): all node names (including leaves); can be None for 2 Stage mpicomm (MPI comm): if not given, use the global fullcomm scenario_creator_kwargs (dict): kwargs passed directly to scenario_creator. variable_probability (fct): returns a list of tuples of (id(var), prob) to set variable-specific probability (similar to PHBase.rho_setter). Attributes: local_scenarios (dict of scenario objects): concrete models with extra data, key is name comms (dict): keys are node names values are comm objects. local_scenario_names (list): names of locals """ def __init__( self, options, all_scenario_names, scenario_creator, scenario_denouement=None, all_nodenames=None, mpicomm=None, scenario_creator_kwargs=None, variable_probability=None, E1_tolerance=1e-5, ): # TODO add missing and private attributes (JP) # TODO add a class attribute called ROOTNODENAME = "ROOT" # TODO? add decorators to the class attributes self.start_time = time.perf_counter() self.options = options self.all_scenario_names = all_scenario_names self.scenario_creator = scenario_creator self.scenario_denouement = scenario_denouement self.comms = dict() self.local_scenarios = dict() self.local_scenario_names = list() self.E1_tolerance = E1_tolerance # probs must sum to almost 1 self.names_in_bundles = None self.scenarios_constructed = False if all_nodenames is None: self.all_nodenames = ["ROOT"] elif "ROOT" in all_nodenames: self.all_nodenames = all_nodenames self._check_nodenames() else: raise RuntimeError("'ROOT' must be in the list of node names") self.variable_probability = variable_probability self.multistage = (len(self.all_nodenames) > 1) # Set up MPI communicator and rank if mpicomm is not None: self.mpicomm = mpicomm else: self.mpicomm = MPI.COMM_WORLD self.cylinder_rank = self.mpicomm.Get_rank() self.n_proc = self.mpicomm.Get_size() self.global_rank = MPI.COMM_WORLD.Get_rank() if options.get("toc", True): global_toc("Initializing SPBase") if self.n_proc > len(self.all_scenario_names): raise RuntimeError("More ranks than scenarios") self._calculate_scenario_ranks() if "bundles_per_rank" in self.options and self.options["bundles_per_rank"] > 0: self._assign_bundles() self.bundling = True else: self.bundling = False self._create_scenarios(scenario_creator_kwargs) self._look_and_leap() self._compute_unconditional_node_probabilities() self._attach_nlens() self._attach_nonant_indices() self._attach_varid_to_nonant_index() self._create_communicators() self._verify_nonant_lengths() self._set_sense() self._use_variable_probability_setter() ## SPCommunicator object self._spcomm = None # for writers, if the appropriate # solution is loaded into the subproblems self.tree_solution_available = False self.first_stage_solution_available = False def _set_sense(self, comm=None): """ Check to confirm that all the models constructed by scenario_crator have the same sense (min v. max), and set self.is_minimizing accordingly. """ is_min, clear = sputils._models_have_same_sense(self.local_scenarios) if not clear: raise RuntimeError( "All scenario models must have the same " "model sense (minimize or maximize)" ) self.is_minimizing = is_min if self.n_proc <= 1: return # Check that all the ranks agree global_senses = self.mpicomm.gather(is_min, root=0) if self.cylinder_rank != 0: return sense = global_senses[0] clear = all(val == sense for val in global_senses) if not clear: raise RuntimeError( "All scenario models must have the same " "model sense (minimize or maximize)" ) def _verify_nonant_lengths(self): local_node_nonant_lengths = {} # keys are tree node names # we need to accumulate all local contributions before the reduce for k,s in self.local_scenarios.items(): nlens = s._mpisppy_data.nlens for node in s._mpisppy_node_list: ndn = mylen = nlens[ndn] if ndn not in local_node_nonant_lengths: local_node_nonant_lengths[ndn] = mylen elif local_node_nonant_lengths[ndn] != mylen: raise RuntimeError(f"Tree node {ndn} has scenarios with different numbers of non-anticipative " f"variables: {mylen} vs. {local_node_nonant_lengths[ndn]}") # compute node values(reduction) for ndn, val in local_node_nonant_lengths.items(): local_val = np.array([val], 'i') max_val = np.zeros(1, 'i') self.comms[ndn].Allreduce([local_val, MPI.INT], [max_val, MPI.INT], op=MPI.MAX) if val != int(max_val[0]): raise RuntimeError(f"Tree node {ndn} has scenarios with different numbers of non-anticipative " f"variables: {val} vs. max {max_val[0]}") def _check_nodenames(self): for ndn in self.all_nodenames: if ndn != 'ROOT' and sputils.parent_ndn(ndn) not in self.all_nodenames: raise RuntimeError(f"all_nodenames is inconsistent:" f"The node {sputils.parent_ndn(ndn)}, parent of {ndn}, is missing.") def _calculate_scenario_ranks(self): """ Populate the following attributes 1. self.scenario_names_to_rank (dict of dict): keys are comms (i.e., tree nodes); values are dicts with keys that are scenario names and values that are ranks within that comm 2. self._rank_slices (list of lists) indices correspond to ranks in self.mpicomm and the values are a list of scenario indices rank -> list of scenario indices for that rank 3. self._scenario_slices (list) indices are scenario indices and values are the rank of that scenario within self.mpicomm scenario index -> rank 4. self._scenario_tree (instance of sputils._ScenTree) 5. self.local_scenario_names (list) List of index names owned by the local rank """ tree = sputils._ScenTree(self.all_nodenames, self.all_scenario_names) self.scenario_names_to_rank, self._rank_slices, self._scenario_slices =\ tree.scen_names_to_ranks(self.n_proc) self._scenario_tree = tree self.nonleaves = { : node for node in tree.nonleaves} # list of scenario names owned locally self.local_scenario_names = [ self.all_scenario_names[i] for i in self._rank_slices[self.cylinder_rank] ] def _assign_bundles(self): """ Create self.names_in_bundles, a dict of dicts self.names_in_bundles[rank number][bundle number] = list of scenarios in that bundle """ scen_count = len(self.all_scenario_names) if self.options["verbose"] and self.cylinder_rank == 0: print("(rank0)", self.options["bundles_per_rank"], "bundles per rank") if self.n_proc * self.options["bundles_per_rank"] > scen_count: raise RuntimeError( "Not enough scenarios to satisfy the bundles_per_rank requirement" ) # dict: rank number --> list of scenario names owned by rank names_at_rank = { curr_rank: [self.all_scenario_names[i] for i in slc] for (curr_rank, slc) in enumerate(self._rank_slices) } self.names_in_bundles = dict() num_bundles = self.options["bundles_per_rank"] for curr_rank in range(self.n_proc): scen_count = len(names_at_rank[curr_rank]) avg = scen_count / num_bundles slices = [ range(int(i * avg), int((i + 1) * avg)) for i in range(num_bundles) ] self.names_in_bundles[curr_rank] = { curr_bundle: [names_at_rank[curr_rank][i] for i in slc] for (curr_bundle, slc) in enumerate(slices) } def _create_scenarios(self, scenario_creator_kwargs): """ Call the scenario_creator for every local scenario, and store the results in self.local_scenarios (dict indexed by scenario names). Notes: If a scenario probability is not specified as an attribute _mpisppy_probability of the ConcreteModel returned by ScenarioCreator, this function automatically assumes uniform probabilities. """ if self.scenarios_constructed: raise RuntimeError("Scenarios already constructed.") if scenario_creator_kwargs is None: scenario_creator_kwargs = dict() local_ict = list() # Local instance creation times for time tracking for sname in self.local_scenario_names: instance_creation_start_time = time.time() s = self.scenario_creator(sname, **scenario_creator_kwargs) self.local_scenarios[sname] = s if self.multistage: #Checking that the scenario can have an associated leaf node in all_nodenames stmax = np.argmax([nd.stage for nd in s._mpisppy_node_list]) if(s._mpisppy_node_list[stmax].name)+'_0' not in self.all_nodenames: raise RuntimeError("The leaf node associated with this scenario is not on all_nodenames" f"Its last non-leaf node {s._mpisppy_node_list[stmax].name} has no first child {s._mpisppy_node_list[stmax].name+'_0'}") local_ict.append(time.time() - instance_creation_start_time) if self.options.get("display_timing", False): all_instance_creation_times = self.mpicomm.gather( local_ict, root=0 ) if self.cylinder_rank == 0: aict = [ict for l_ict in all_instance_creation_times for ict in l_ict] print("Scenario instance creation times:") print(f"\tmin={np.min(aict):4.2f} mean={np.mean(aict):4.2f} max={np.max(aict):4.2f}") self.scenarios_constructed = True def _attach_nonant_indices(self): for (sname, scenario) in self.local_scenarios.items(): _nonant_indices = dict() nlens = scenario._mpisppy_data.nlens for node in scenario._mpisppy_node_list: ndn = for i in range(nlens[ndn]): _nonant_indices[ndn,i] = node.nonant_vardata_list[i] scenario._mpisppy_data.nonant_indices = _nonant_indices self.nonant_length = len(_nonant_indices) def _attach_nlens(self): for (sname, scenario) in self.local_scenarios.items(): # Things need to be by node so we can bind to the # indices of the vardata lists for the nodes. scenario._mpisppy_data.nlens = { len(node.nonant_vardata_list) for node in scenario._mpisppy_node_list } # NOTE: This only is used by extensions.xhatbase.XhatBase._try_one. # If that is re-factored, we can remove it here. scenario._mpisppy_data.cistart = dict() sofar = 0 for ndn, ndn_len in scenario._mpisppy_data.nlens.items(): scenario._mpisppy_data.cistart[ndn] = sofar sofar += ndn_len def _attach_varid_to_nonant_index(self): """ Create a map from the id of nonant variables to their Pyomo index. """ for (sname, scenario) in self.local_scenarios.items(): # In order to support rho setting, create a map # from the id of vardata object back its _nonant_index. scenario._mpisppy_data.varid_to_nonant_index =\ {id(var): ndn_i for ndn_i, var in scenario._mpisppy_data.nonant_indices.items()} def _create_communicators(self): # Create communicator objects, one for each node nonleafnodes = dict() for (sname, scenario) in self.local_scenarios.items(): for node in scenario._mpisppy_node_list: nonleafnodes[] = node # might be assigned&reassigned # check the node names given by the scenarios for nodename in nonleafnodes: if nodename not in self.all_nodenames: raise RuntimeError(f"Tree node '{nodename}' not in all_nodenames list {self.all_nodenames}") # loop over all nodes and make the comms (split requires all ranks) # make sure we loop in the same order, so every rank iterate over # the nodelist for nodename in self.all_nodenames: if nodename == "ROOT": self.comms["ROOT"] = self.mpicomm elif nodename in nonleafnodes: #The position in all_nodenames is an integer unique id. nodenumber = self.all_nodenames.index(nodename) # IMPORTANT: See note in sputils._ScenTree.scen_names_to_ranks. Need to keep # this split aligned with self.scenario_names_to_rank self.comms[nodename] = self.mpicomm.Split(color=nodenumber, key=self.cylinder_rank) else: # this rank is not included in the communicator self.mpicomm.Split(color=MPI.UNDEFINED, key=self.n_proc) ## ensure we've set things up correctly for all comms for nodename, comm in self.comms.items(): scenario_names_to_comm_rank = self.scenario_names_to_rank[nodename] for sname, rank in scenario_names_to_comm_rank.items(): if sname in self.local_scenarios: if rank != comm.Get_rank(): raise RuntimeError(f"For the node {nodename}, the scenario {sname} has the rank {rank} from scenario_names_to_rank and {comm.Get_rank()} from its comm.") ## ensure we've set things up correctly for all local scenarios for sname in self.local_scenarios: for nodename, comm in self.comms.items(): scenario_names_to_comm_rank = self.scenario_names_to_rank[nodename] if sname in scenario_names_to_comm_rank: if comm.Get_rank() != scenario_names_to_comm_rank[sname]: raise RuntimeError(f"For the node {nodename}, the scenario {sname} has the rank {rank} from scenario_names_to_rank and {comm.Get_rank()} from its comm.") def _compute_unconditional_node_probabilities(self): """ calculates unconditional node probabilities and prob_coeff and prob0_mask is set to a scalar 1 (used by variable_probability)""" for k,s in self.local_scenarios.items(): root = s._mpisppy_node_list[0] root.uncond_prob = 1.0 for parent,child in zip(s._mpisppy_node_list[:-1],s._mpisppy_node_list[1:]): child.uncond_prob = parent.uncond_prob * child.cond_prob if not hasattr(s._mpisppy_data, 'prob_coeff'): s._mpisppy_data.prob_coeff = dict() s._mpisppy_data.prob0_mask = dict() for node in s._mpisppy_node_list: s._mpisppy_data.prob_coeff[] = (s._mpisppy_probability / node.uncond_prob) s._mpisppy_data.prob0_mask[] = 1.0 # needs to be a float def _use_variable_probability_setter(self, verbose=False): """ set variable probability unconditional values using a function self.variable_probability that gives us a list of (id(vardata), probability)] ALSO set prob0_mask, which is a mask for W calculations (mask out zero probs) Note: We estimate that less than 0.01 of mpi-sppy runs will call this. """ if self.variable_probability is None: for s in self.local_scenarios.values(): s._mpisppy_data.has_variable_probability = False return didit = 0 skipped = 0 variable_probability_kwargs = self.options['variable_probability_kwargs'] \ if 'variable_probability_kwargs' in self.options \ else dict() sum_probs = {} # indexed by (ndn,i) - maps to sum of probs for that variable for sname, s in self.local_scenarios.items(): variable_probability = self.variable_probability(s, **variable_probability_kwargs) s._mpisppy_data.has_variable_probability = True for (vid, prob) in variable_probability: ndn, i = s._mpisppy_data.varid_to_nonant_index[vid] # If you are going to do any variables at a node, you have to do all. if type(s._mpisppy_data.prob_coeff[ndn]) is float: # not yet a vector defprob = s._mpisppy_data.prob_coeff[ndn] s._mpisppy_data.prob_coeff[ndn] = np.full(s._mpisppy_data.nlens[ndn], defprob, dtype='d') s._mpisppy_data.prob0_mask[ndn] = np.ones(s._mpisppy_data.nlens[ndn], dtype='d') s._mpisppy_data.prob_coeff[ndn][i] = prob if prob == 0: # there's probably a way to do this in numpy... s._mpisppy_data.prob0_mask[ndn][i] = 0 sum_probs[(ndn,i)] = sum_probs.get((ndn,i),0.0) + prob didit += len(variable_probability) skipped += len(s._mpisppy_data.varid_to_nonant_index) - didit """ this needs to be MPIized; but check below should do the trick for (ndn,i),prob in sum_probs.items(): if not math.isclose(prob, 1.0, abs_tol=self.E1_tolerance): raise RuntimeError(f"Probability sum for variable with nonant index={i} at node={ndn} is not unity - computed sum={prob}") """ if verbose and self.cylinder_rank == 0: print ("variable_probability set",didit,"and skipped",skipped) if not self.options.get('do_not_check_variable_probabilities', False): self._check_variable_probabilities_sum(verbose)
[docs] def is_zero_prob( self, scenario_model, var ): """ Args: scenario_model : a value in SPBase.local_scenarios var : a Pyomo Var on the scenario_model Returns: True if the variable has 0 probability, False otherwise """ if self.variable_probability is None: return False _mpisppy_data = scenario_model._mpisppy_data ndn, i = _mpisppy_data.varid_to_nonant_index[id(var)] if isinstance(_mpisppy_data.prob_coeff[ndn], np.ndarray): return float(_mpisppy_data.prob_coeff[ndn][i]) == 0. else: return False
def _check_variable_probabilities_sum(self, verbose): nodenames = [] # to transmit to comms local_concats = {} # keys are tree node names global_concats = {} # values sums of node conditional probabilities # we need to accumulate all local contributions before the reduce for k,s in self.local_scenarios.items(): nlens = s._mpisppy_data.nlens for node in s._mpisppy_node_list: if not in nodenames: ndn = nodenames.append(ndn) local_concats[ndn] = np.zeros(nlens[ndn], dtype='d') global_concats[ndn] = np.zeros(nlens[ndn], dtype='d') # sum local conditional probabilities for k,s in self.local_scenarios.items(): for node in s._mpisppy_node_list: ndn = local_concats[ndn] += s._mpisppy_data.prob_coeff[ndn] # compute sum node conditional probabilities (reduction) for ndn in nodenames: self.comms[ndn].Allreduce( [local_concats[ndn], MPI.DOUBLE], [global_concats[ndn], MPI.DOUBLE], op=MPI.SUM) tol = self.E1_tolerance checked_nodes = list() # check sum node conditional probabilites are close to 1 for k,s in self.local_scenarios.items(): nlens = s._mpisppy_data.nlens for node in s._mpisppy_node_list: ndn = if ndn not in checked_nodes: if not np.allclose(global_concats[ndn], 1., atol=tol): notclose = ~np.isclose(global_concats[ndn], 1., atol=tol) indices = np.nonzero(notclose)[0] bad_vars = [ s._mpisppy_data.nonant_indices[ndn,idx].name for idx in indices ] badprobs = [ global_concats[ndn][idx] for idx in indices] raise RuntimeError(f"Node {ndn}, variables {bad_vars} have respective" f" conditional probability sum {badprobs}" " which are not 1") checked_nodes.append(ndn) def _look_and_leap(self): for (sname, scenario) in self.local_scenarios.items(): if not hasattr(scenario, "_mpisppy_data"): scenario._mpisppy_data = pyo.Block(name="For non-Pyomo mpi-sppy data") if not hasattr(scenario, "_mpisppy_model"): scenario._mpisppy_model = pyo.Block(name="For mpi-sppy Pyomo additions to the scenario model") if hasattr(scenario, "PySP_prob"): raise RuntimeError(f"PySP_prob is deprecated; use _mpisppy_probability") pspec = scenario._mpisppy_probability if hasattr(scenario, "_mpisppy_probability") else None if pspec is None or pspec == "uniform": prob = 1./len(self.all_scenario_names) if self.cylinder_rank == 0 and pspec is None: print(f"Did not find _mpisppy_probability, assuming uniform probability {prob} (avoid this message by assigning a probability or the string 'uniform' to _mpisppy_probability on the scenario model object)") scenario._mpisppy_probability = prob if not hasattr(scenario, "_mpisppy_node_list"): raise RuntimeError(f"_mpisppy_node_list not found on scenario {sname}") def _options_check(self, required_options, given_options): """ Confirm that the specified list of options contains the specified list of required options. Raises a ValueError if anything is missing. """ missing = [option for option in required_options if given_options.get(option) is None] if missing: raise ValueError(f"Missing the following required options: {', '.join(missing)}") @property def spcomm(self): if self._spcomm is None: return None return self._spcomm() @spcomm.setter def spcomm(self, value): if self._spcomm is None: self._spcomm = weakref.ref(value) else: raise RuntimeError("SPBase.spcomm should only be set once")
[docs] def allreduce_or(self, val): local_val = np.array([val], dtype='int8') global_val = np.zeros(1, dtype='int8') self.mpicomm.Allreduce(local_val, global_val, op=MPI.LOR) if global_val[0] > 0: return True else: return False
[docs] def gather_var_values_to_rank0(self, get_zero_prob_values=False): """ Gather the values of the nonanticipative variables to the root of the `mpicomm` for the cylinder Returns: dict or None: On the root (rank0), returns a dictionary mapping (scenario_name, variable_name) pairs to their values. On other ranks, returns None. """ var_values = dict() for (sname, model) in self.local_scenarios.items(): for node in model._mpisppy_node_list: for var in node.nonant_vardata_list: var_name = if self.bundling: dot_index = var_name.find('.') assert dot_index >= 0 var_name = var_name[(dot_index+1):] if (self.is_zero_prob(model, var)) and (not get_zero_prob_values): var_values[sname, var_name] = None else: var_values[sname, var_name] = pyo.value(var) if self.n_proc == 1: return var_values result = self.mpicomm.gather(var_values, root=0) if (self.cylinder_rank == 0): result = {key: value for dic in result for (key, value) in dic.items() } return result
[docs] def report_var_values_at_rank0(self, header="", print_zero_prob_values=False): """ Pretty-print the values and associated statistics for non-anticipative variables across all scenarios. """ var_values = self.gather_var_values_to_rank0(get_zero_prob_values=print_zero_prob_values) if self.cylinder_rank == 0: if len(header) != 0: print(header) scenario_names = sorted(set(x for (x,y) in var_values)) max_scenario_name_len = max(len(s) for s in scenario_names) variable_names = sorted(set(y for (x,y) in var_values)) max_variable_name_len = max(len(v) for v in variable_names) # the "10" below is a reasonable minimum for floating-point output value_field_len = max(10, max_scenario_name_len) print("{0: <{width}s} | ".format("", width=max_variable_name_len), end='') for this_scenario in scenario_names: print("{0: ^{width}s} ".format(this_scenario, width=value_field_len), end='') print("") for this_var in variable_names: print("{0: <{width}} | ".format(this_var, width=max_variable_name_len), end='') for this_scenario in scenario_names: this_var_value = var_values[this_scenario, this_var] if (this_var_value == None) and (not print_zero_prob_values): print("{0: ^{width}s}".format("-", width=value_field_len), end='') else: print("{0: {width}.4f}".format(this_var_value, width=value_field_len), end='') print(" ", end='') print("")
[docs] def write_first_stage_solution(self, file_name, first_stage_solution_writer=sputils.first_stage_nonant_writer): """ Writes the first-stage solution, if this object reports one available. Args: file_name: path of file to write first stage solution to first_stage_solution_writer (optional): custom first stage solution writer function """ if not self.first_stage_solution_available: raise RuntimeError("No first stage solution available") if self.cylinder_rank == 0: dirname = os.path.dirname(file_name) if dirname != '': os.makedirs(os.path.dirname(file_name), exist_ok=True) representative_scenario = self.local_scenarios[self.local_scenario_names[0]] first_stage_solution_writer(file_name, representative_scenario, self.bundling)
[docs] def write_tree_solution(self, directory_name, scenario_tree_solution_writer=sputils.scenario_tree_solution_writer): """ Writes the tree solution, if this object reports one available. Raises a RuntimeError if it is not. Args: directory_name: directory to write tree solution to scenario_tree_solution_writer (optional): custom scenario solution writer function """ if not self.tree_solution_available: raise RuntimeError("No tree solution available") if self.cylinder_rank == 0: os.makedirs(directory_name, exist_ok=True) self.mpicomm.Barrier() for scenario_name, scenario in self.local_scenarios.items(): scenario_tree_solution_writer(directory_name, scenario_name, scenario, self.bundling)