Source code for mpisppy.utils.sputils

###############################################################################
# mpi-sppy: MPI-based Stochastic Programming in PYthon
#
# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for
# Sustainable Energy, LLC, The Regents of the University of California, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
# full copyright and license information.
###############################################################################
# Base and utility functions for mpisppy
# Note to developers: things called spcomm are way more than just a comm; SPCommunicator

import pyomo.environ as pyo
import sys
import os
import re
import numpy as np
import inspect
import importlib
import mpisppy.scenario_tree as scenario_tree

from enum import IntEnum
from pyomo.core import Objective
from pyomo.repn import generate_standard_repn

from mpisppy import MPI, haveMPI
from pyomo.core.expr.numeric_expr import LinearExpression
from pyomo.opt import SolutionStatus, TerminationCondition
from pyomo.core.base.indexed_component_slice import IndexedComponent_slice

from mpisppy import tt_timer

global_rank = MPI.COMM_WORLD.Get_rank()


class WarmstartStatus(IntEnum):
    FALSE = 0  # Falsy
    TRUE = 1   # Truthy
    USER_SOLUTION = 2  # Truthy
    PRIOR_SOLUTION = 3 # Truthy


def build_vardatalist(model, varlist=None):
    """
    Convert a list of pyomo variables to a list of SimpleVar and _GeneralVarData. If varlist is none, builds a
    list of all variables in the model. Written by CD Laird

    Parameters
    ----------
    model: ConcreteModel
    varlist: None or list of pyo.Var
    """
    vardatalist = None

    # if the varlist is None, then assume we want all the active variables
    if varlist is None:
        raise RuntimeError("varlist is None in scenario_tree.build_vardatalist")
        vardatalist = [v for v in model.component_data_objects(pyo.Var, active=True, sort=True)]
    elif isinstance(varlist, (pyo.Var, IndexedComponent_slice)):
        # user provided a variable, not a list of variables. Let's work with it anyway
        varlist = [varlist]

    if vardatalist is None:
        # expand any indexed components in the list to their
        # component data objects
        vardatalist = list()
        for v in varlist:
            if isinstance(v, IndexedComponent_slice):
                vardatalist.extend(v.__iter__())
            elif v.is_indexed():
                vardatalist.extend((v[i] for i in sorted(v.keys())))
            else:
                vardatalist.append(v)
    return vardatalist
    

def not_good_enough_results(results):
    return (results is None) or (len(results.solution) == 0) or \
        (results.solution(0).status == SolutionStatus.infeasible) or \
        (results.solution(0).status == SolutionStatus.unknown) or \
        (results.solver.termination_condition == TerminationCondition.infeasible) or \
        (results.solver.termination_condition == TerminationCondition.infeasibleOrUnbounded) or \
        (results.solver.termination_condition == TerminationCondition.unbounded)


_spin_the_wheel_move_msg = \
        "spin_the_wheel should now be used as the class "\
        "mpisppy.spin_the_wheel.WheelSpinner using the method `spin()`. Output "\
        "writers are now methods of the class WheelSpinner."

def spin_the_wheel(hub_dict, list_of_spoke_dict, comm_world=None):
    raise RuntimeError(
            _spin_the_wheel_move_msg + \
            " See the example code below for a fix:\n"
    '''
    from mpisppy.spin_the_wheel import WheelSpinner
    ws = WheelSpinner(hub_dict, list_of_spoke_dict)
    ws.spin(comm_world=comm_world)
    '''
    )

def first_stage_nonant_npy_serializer(file_name, scenario, bundling):
    # write just the nonants for ROOT in an npy file (e.g. for Conf Int)
    root = scenario._mpisppy_node_list[0]
    assert root.name == "ROOT"
    root_nonants = np.fromiter((pyo.value(var) for var in root.nonant_vardata_list), float)
    np.save(file_name, root_nonants)

def first_stage_nonant_writer( file_name, scenario, bundling ):
    with open(file_name, 'w') as f:
        root = scenario._mpisppy_node_list[0]
        assert root.name == "ROOT"
        for var in root.nonant_vardata_list:
            var_name = var.name
            if bundling:
                dot_index = var_name.find('.')
                assert dot_index >= 0
                var_name = var_name[(dot_index+1):]
            f.write(f"{var_name},{pyo.value(var)}\n")

def scenario_tree_solution_writer( directory_name, scenario_name, scenario, bundling ):
    with open(os.path.join(directory_name, scenario_name+'.csv'), 'w') as f:
        for var in scenario.component_data_objects(
                ctype=(pyo.Var, pyo.Expression),
                descend_into=True,
                active=True,
                sort=True):
            var_name = var.name
            if bundling:  # loose bundling
                dot_index = var_name.find('.')
                assert dot_index >= 0
                var_name = var_name[(dot_index+1):]
            f.write(f"{var_name},{pyo.value(var)}\n")
        
def write_spin_the_wheel_first_stage_solution(spcomm, opt_dict, solution_file_name,
        first_stage_solution_writer=first_stage_nonant_writer):
    raise RuntimeError(_spin_the_wheel_move_msg)

def write_spin_the_wheel_tree_solution(spcomm, opt_dict, solution_directory_name,
        scenario_tree_solution_writer=scenario_tree_solution_writer):
    raise RuntimeError(_spin_the_wheel_move_msg)

def local_nonant_cache(spcomm):
    raise RuntimeError(_spin_the_wheel_move_msg)

### a few Pyomo-related utilities ###


def get_objs(scenario_instance, allow_none=False):
    """ return the list of objective functions for scenario_instance"""
    scenario_objs = scenario_instance.component_data_objects(pyo.Objective,
                    active=True, descend_into=True)
    scenario_objs = list(scenario_objs)
    if (len(scenario_objs) == 0) and not allow_none:
        raise RuntimeError(f"Scenario {scenario_instance.name} has no active "
                           "objective functions.")
    if (len(scenario_objs) > 1):
        print(f"WARNING: Scenario {scenario_instance.name} has multiple active "
              "objectives, returning a list.")

    return scenario_objs


def stash_ref_objs(scenario_instance):
    """Stash a reference to active objs so
        Reactivate_obj can use the reference to reactivate them/it later.
    """
    scenario_instance._mpisppy_data.obj_list = get_objs(scenario_instance)


def deact_objs(scenario_instance):
    """ Deactivate objs 
    Args:
        scenario_instance (Pyomo ConcreteModel): the scenario
    Returns:
        obj_list (list of Pyomo Objectives): the deactivated objs
    Note: If none are active, just do nothing
    """
    obj_list = get_objs(scenario_instance, allow_none=True)
    for obj in obj_list:
        obj.deactivate()
    return obj_list


def reactivate_objs(scenario_instance):
    """ Reactivate ojbs stashed by stash_ref_objs """
    if not hasattr(scenario_instance._mpisppy_data, "obj_list"):
        raise RuntimeError("reactivate_objs called with prior call to stash_ref_objs")
    for obj in scenario_instance._mpisppy_data.obj_list:
        obj.activate()

    
[docs] def create_EF(scenario_names, scenario_creator, scenario_creator_kwargs=None, EF_name=None, suppress_warnings=False, nonant_for_fixed_vars=True, total_number_of_scenarios=None): """ Create a ConcreteModel of the extensive form. Args: scenario_names (list of str): Names for each scenario to be passed to the scenario_creator function. scenario_creator (callable): Function which takes a scenario name as its first argument and returns a concrete model corresponding to that scenario. scenario_creator_kwargs (dict, optional): Options to pass to `scenario_creator`. EF_name (str, optional): Name of the ConcreteModel of the EF. suppress_warnings (boolean, optional): If true, do not display warnings. Default False. nonant_for_fixed_vars (bool--optional): If True, enforces non-anticipativity constraints for all variables, including those which have been fixed. Default is True. total_number_of_scenarios (int, optional): For calculating uniform probabilities, the total number of scenarios. Returns: EF_instance (ConcreteModel): ConcreteModel of extensive form with explicit non-anticipativity constraints. Note: If any of the scenarios produced by scenario_creator do not have a ._mpisppy_probability attribute, this function displays a warning, and assumes that all scenarios are equally likely. """ if scenario_creator_kwargs is None: scenario_creator_kwargs = dict() scen_dict = { name: scenario_creator(name, **scenario_creator_kwargs) for name in scenario_names } if (len(scen_dict) == 0): raise RuntimeError("create_EF() received empty scenario list") elif (len(scen_dict) == 1): scenario_instance = list(scen_dict.values())[0] scenario_instance._ef_scenario_names = list(scen_dict.keys()) if not suppress_warnings: print("WARNING: passed single scenario to create_EF()") # special code to patch in ref_vars scenario_instance.ref_vars = dict() scenario_instance.ref_suppl_vars = dict() scenario_instance.ref_surrogate_vars = dict() scenario_instance._nlens = {node.name: len(node.nonant_vardata_list) for node in scenario_instance._mpisppy_node_list} for node in scenario_instance._mpisppy_node_list: ndn = node.name for i in range(scenario_instance._nlens[ndn]): v = node.nonant_vardata_list[i] if (ndn, i) not in scenario_instance.ref_vars: scenario_instance.ref_vars[(ndn, i)] = v if v in node.surrogate_vardatas: scenario_instance.ref_surrogate_vars[(ndn, i)] = v for i, v in enumerate(node.nonant_ef_suppl_vardata_list): if (ndn, i) not in scenario_instance.ref_suppl_vars: scenario_instance.ref_suppl_vars[(ndn, i)] = v # patch in EF_Obj scenario_objs = deact_objs(scenario_instance) obj = scenario_objs[0] sense = pyo.minimize if obj.is_minimizing() else pyo.maximize scenario_instance.EF_Obj = pyo.Objective(expr=obj.expr, sense=sense) # Densified bundle position -> per-sub-scenario nonant Vars at that # position. Trivial here (one sub-scenario => singleton groups), but # the attribute is provided unconditionally so downstream consumers # don't need to special-case the single-scenario EF. See issue #673 # and _bundle_consensus_groups in mpisppy/utils/nonant_sensitivities.py. scenario_instance.consensus_groups = _build_consensus_groups( scenario_instance.ref_vars, scenario_instance.ref_surrogate_vars, {(node.name, i): [node.nonant_vardata_list[i]] for node in scenario_instance._mpisppy_node_list for i in range(scenario_instance._nlens[node.name])}, ) return scenario_instance #### special return for single scenario # Check if every scenario has a specified probability probs_specified = \ all(hasattr(scen, '_mpisppy_probability') for scen in scen_dict.values()) uniform_specified = \ probs_specified and all(scen._mpisppy_probability == "uniform" for scen in scen_dict.values()) if total_number_of_scenarios is None: total_number_of_scenarios = len(scen_dict) if not probs_specified or uniform_specified: for scen in scen_dict.values(): scen._mpisppy_probability = 1 / total_number_of_scenarios if not suppress_warnings and not uniform_specified: print('WARNING: At least one scenario is missing _mpisppy_probability attribute.', 'Assuming equally-likely scenarios...') EF_instance = _create_EF_from_scen_dict(scen_dict, EF_name=EF_name, nonant_for_fixed_vars=nonant_for_fixed_vars) return EF_instance
def _create_EF_from_scen_dict(scen_dict, EF_name=None, nonant_for_fixed_vars=True): """ Create a ConcreteModel of the extensive form from a scenario dictionary. Args: scen_dict (dict): Dictionary whose keys are scenario names and values are ConcreteModel objects corresponding to each scenario. EF_name (str--optional): Name of the resulting EF model. nonant_for_fixed_vars (bool--optional): If True, enforces non-anticipativity constraints for all variables, including those which have been fixed. Default is True. Returns: EF_instance (ConcreteModel): ConcreteModel of extensive form with explicity non-anticipativity constraints. Notes: The non-anticipativity constraints are enforced by creating "reference variables" at each node in the scenario tree (excluding leaves) and enforcing that all the variables for each scenario at that node are equal to the reference variables. This function is called directly when creating bundles for PH. Does NOT assume that each scenario is equally likely. Raises an AttributeError if a scenario object is encountered which does not have a ._mpisppy_probability attribute. Added the flag nonant_for_fixed_vars because original code only enforced non-anticipativity for non-fixed vars, which is not always desirable in the context of bundling. This allows for more fine-grained control. """ is_min, clear = _models_have_same_sense(scen_dict) if (not clear): raise RuntimeError('Cannot build the extensive form out of models ' 'with different objective senses') sense = pyo.minimize if is_min else pyo.maximize EF_instance = pyo.ConcreteModel(name=EF_name) EF_instance.EF_Obj = pyo.Objective(expr=0.0, sense=sense) # we don't strictly need these here, but it allows for eliding # of single scenarios and bundles when convenient EF_instance._mpisppy_data = pyo.Block(name="For non-Pyomo mpi-sppy data") EF_instance._mpisppy_model = pyo.Block(name="For mpi-sppy Pyomo additions to the scenario model") EF_instance._mpisppy_data.solution_available = None EF_instance._ef_scenario_names = [] EF_instance._mpisppy_probability = 0 for (sname, scenario_instance) in scen_dict.items(): EF_instance.add_component(sname, scenario_instance) EF_instance._ef_scenario_names.append(sname) # Now deactivate the scenario instance Objective scenario_objs = deact_objs(scenario_instance) obj_func = scenario_objs[0] # Select the first objective try: EF_instance.EF_Obj.expr += scenario_instance._mpisppy_probability * obj_func.expr EF_instance._mpisppy_probability += scenario_instance._mpisppy_probability except AttributeError as e: raise AttributeError("Scenario " + sname + " has no specified " "probability. Specify a value for the attribute " " _mpisppy_probability and try again.") from e # Normalization does nothing when solving the full EF, but is required for # appropriate scaling of EFs used as bundles. EF_instance.EF_Obj.expr /= EF_instance._mpisppy_probability # For each node in the scenario tree, we need to collect the # nonanticipative vars and create the constraints for them, # which we do using a reference variable. ref_vars = dict() # keys are _nonant_indices (i.e. a node name and a # variable number) ref_suppl_vars = dict() ref_surrogate_vars = dict() EF_instance._nlens = dict() nonant_constr = pyo.Constraint(pyo.Any, name='_C_EF_') EF_instance.add_component('_C_EF_', nonant_constr) nonant_constr_suppl = pyo.Constraint(pyo.Any, name='_C_EF_suppl') EF_instance.add_component('_C_EF_suppl', nonant_constr_suppl) # (ndn, i) -> [per-sub-scenario nonant Vars at that position], collected # in scenario-iteration order during the loop below. Densified into # EF_instance.consensus_groups after the loop completes (the densification # depends on the *final* state of ref_surrogate_vars, which can shrink # mid-loop via the Stoch_AdmmWrapper surrogate-upgrade path). per_pos_vars = dict() for (sname, s) in scen_dict.items(): nlens = {node.name: len(node.nonant_vardata_list) for node in s._mpisppy_node_list} for (node_name, num_nonant_vars) in nlens.items(): # copy nlens to EF if (node_name in EF_instance._nlens.keys() and num_nonant_vars != EF_instance._nlens[node_name]): raise RuntimeError("Number of non-anticipative variables is " "not consistent at node " + node_name + " in scenario " + sname) EF_instance._nlens[node_name] = num_nonant_vars nlens_ef_suppl = {node.name: len(node.nonant_ef_suppl_vardata_list) for node in s._mpisppy_node_list} for node in s._mpisppy_node_list: ndn = node.name for i in range(nlens[ndn]): v = node.nonant_vardata_list[i] per_pos_vars.setdefault((ndn, i), []).append(v) if (ndn, i) not in ref_vars: # grab the reference variable if (nonant_for_fixed_vars) or (not v.is_fixed()): ref_vars[(ndn, i)] = v if v in node.surrogate_vardatas: ref_surrogate_vars[(ndn, i)] = v # Add a non-anticipativity constraint, except in the case when # the variable is fixed and nonant_for_fixed_vars=False. # or we're in the surrogate nonants elif (nonant_for_fixed_vars) or (not v.is_fixed()): if v in node.surrogate_vardatas: continue if (ndn, i) in ref_surrogate_vars: # An earlier scenario installed a surrogate as the # ref for this (node, index). This happens with # Stoch_AdmmWrapper partial consensus: subproblems # that have a zero-probability consensus var tag that var # as a surrogate vardata (which will be fixed to 0). # Upgrade the ref to a non-zero-probability v (i.e., non-dummy) # and remove the surrogate tag. # Subsequent scenarios with this variable as # non-zero-probability (i.e., *not* as "surrogate") # are set equal to *this* non-zero-probability vardata. ref_vars[(ndn, i)] = v del ref_surrogate_vars[(ndn, i)] continue expr = LinearExpression(linear_coefs=[1,-1], linear_vars=[v,ref_vars[(ndn,i)]], constant=0.) nonant_constr[(ndn,i,sname)] = (expr, 0.0) for i in range(nlens_ef_suppl[ndn]): v = node.nonant_ef_suppl_vardata_list[i] if (ndn, i) not in ref_suppl_vars: # create the reference variable as a singleton with long name # xxxx maybe index by _nonant_index ???? rather than singleton VAR ??? ref_suppl_vars[(ndn, i)] = v # Add a non-anticipativity constraint, expect in the case when # the variable is fixed and nonant_for_fixed_vars=False. elif (nonant_for_fixed_vars) or (not v.is_fixed()): expr = LinearExpression(linear_coefs=[1,-1], linear_vars=[v,ref_suppl_vars[(ndn,i)]], constant=0.) nonant_constr_suppl[(ndn,i,sname)] = (expr, 0.0) EF_instance.ref_vars = ref_vars EF_instance.ref_suppl_vars = ref_suppl_vars EF_instance.ref_surrogate_vars = ref_surrogate_vars # Densified bundle position -> per-sub-scenario nonant Vars; stashed here # at construction time so downstream rho setters / KKT-sensitivity probes # don't re-walk the bundle to recover what create_EF already knew. See # issue #673 and _bundle_consensus_groups in nonant_sensitivities.py. EF_instance.consensus_groups = _build_consensus_groups( ref_vars, ref_surrogate_vars, per_pos_vars, ) return EF_instance def _build_consensus_groups(ref_vars, ref_surrogate_vars, per_pos_vars): """Densify {(ndn, per_scen_i): [Vars]} into {(ndn, bundle_k): [Vars]} by skipping surrogate positions and renumbering per node. Iteration order follows ``ref_vars.keys()`` (insertion order during create_EF), so a bundle position ``k`` here matches what ``s._mpisppy_data.nonant_indices`` keys against downstream. """ groups = dict() counters = dict() for (ndn, per_scen_i) in ref_vars.keys(): if (ndn, per_scen_i) in ref_surrogate_vars: continue k = counters.get(ndn, 0) counters[ndn] = k + 1 groups[(ndn, k)] = per_pos_vars.get((ndn, per_scen_i), []) return groups def _models_have_same_sense(models): ''' Check if every model in the provided dict has the same objective sense. Input: models (dict) -- Keys are scenario names, values are Pyomo ConcreteModel objects. Returns: is_minimizing (bool) -- True if and only if minimizing. None if the check fails. check (bool) -- True only if all the models have the same sense (or no models were provided) Raises: ValueError -- If any of the models has either none or multiple active objectives. ''' if (len(models) == 0): return True, True senses = [find_active_objective(scenario).is_minimizing() for scenario in models.values()] sense = senses[0] check = all(val == sense for val in senses) if (check): return (sense == pyo.minimize), check return None, check def is_persistent(solver): return isinstance(solver, pyo.pyomo.solvers.plugins.solvers.persistent_solver.PersistentSolver) def ef_scenarios(ef): """ An iterator to give the scenario sub-models in an ef Args: ef (ConcreteModel): the full extensive form model Yields: scenario name, scenario instance (str, ConcreteModel) """ for sname in ef._ef_scenario_names: yield (sname, getattr(ef, sname)) def ef_nonants(ef): """ An iterator to give representative Vars subject to non-anticipitivity Args: ef (ConcreteModel): the full extensive form model Yields: tree node name, full EF Var name, Var value Note: not on an EF object because not all ef's are part of an EF object """ for (ndn,i), var in ef.ref_vars.items(): yield (ndn, var, pyo.value(var)) def ef_nonants_csv(ef, filename): """ Dump the nonant vars from an ef to a csv file; truly a dump... Args: ef (ConcreteModel): the full extensive form model filename (str): the full name of the csv output file """ with open(filename, "w") as outfile: outfile.write("Node, EF_VarName, Value\n") for (ndname, varname, varval) in ef_nonants(ef): outfile.write("{}, {}, {}\n".format(ndname, varname, varval)) def nonant_cache_from_ef(ef,verbose=False): """ Populate a nonant_cache from an ef. Also works with multi-stage Args: ef (mpi-sppy ef): a solved ef Returns: nonant_cache (dict of numpy arrays): a special structure for nonant values """ nonant_cache = dict() nodenames = set([ndn for (ndn,i) in ef.ref_vars]) for ndn in sorted(nodenames): nonant_cache[ndn]=[] i = 0 while ((ndn,i) in ef.ref_vars): xvar = pyo.value(ef.ref_vars[(ndn,i)]) nonant_cache[ndn].append(xvar) if verbose: print("barfoo", i, xvar) i+=1 return nonant_cache def ef_ROOT_nonants_npy_serializer(ef, filename): """ write the root node nonants to be ready by a numpy load Args: ef (ConcreteModel): the full extensive form model filename (str): the full name of the .npy output file """ root_nonants = np.fromiter((v for ndn,var,v in ef_nonants(ef) if ndn == "ROOT"), float) np.save(filename, root_nonants) def write_ef_first_stage_solution(ef, solution_file_name, first_stage_solution_writer=first_stage_nonant_writer): """ Write a solution file, if a solution is available, to the solution_file_name provided Args: ef : A Concrete Model of the Extensive Form (output of create_EF). We assume it has already been solved. solution_file_name : filename to write the solution to first_stage_solution_writer (optional) : custom first stage solution writer function NOTE: This utility is replicating WheelSpinner.write_first_stage_solution for EF """ if global_rank == 0: representative_scenario = getattr(ef,ef._ef_scenario_names[0]) first_stage_solution_writer(solution_file_name, representative_scenario, bundling=True) def write_ef_tree_solution(ef, solution_directory_name, scenario_tree_solution_writer=scenario_tree_solution_writer): """ Write a tree solution directory, if available, to the solution_directory_name provided Args: ef : A Concrete Model of the Extensive Form (output of create_EF). We assume it has already been solved. solution_file_name : filename to write the solution to scenario_tree_solution_writer (optional) : custom scenario solution writer function NOTE: This utility is replicating WheelSpinner.write_tree_solution for EF """ if global_rank==0: os.makedirs(solution_directory_name, exist_ok=True) for scenario_name, scenario in ef_scenarios(ef): scenario_tree_solution_writer(solution_directory_name, scenario_name, scenario, bundling=True) def extract_num(string): ''' Given a string, extract the longest contiguous integer from the right-hand side of the string. Example: scenario324 -> 324 TODO: Add Exception Handling ''' return int(re.compile(r'(\d+)$').search(string).group(1)) def node_idx(node_path,branching_factors): ''' Computes a unique id for a given node in a scenario tree. It follows the path to the node, computing the unique id for each ascendant. Parameters ---------- node_path : list of int A list of integer, specifying the path of the node. branching_factors : list of int branching_factors of the scenario tree. Returns ------- node_idx Node unique id. NOTE: Does not work with unbalanced trees. ''' if node_path == []: #ROOT node return 0 else: stage_id = 0 #node unique id among stage t+1 nodes. for t in range(len(node_path)): stage_id = node_path[t]+branching_factors[t]*stage_id node_idx = _nodenum_before_stage(len(node_path),branching_factors)+stage_id return node_idx def _extract_node_idx(nodename,branching_factors): """ Parameters ---------- nodename : str The name of a node, e.g. 'ROOT_2_0_4'. branching_factors : list Branching factor of a scenario tree, e.g. [3,2,8,4,3]. Returns ------- node_idx : int A unique integer that can be used as a key to designate this scenario. """ if nodename =='ROOT': return 0 else: to_list = [int(x) for x in re.findall(r'\d+',nodename)] return node_idx(to_list,branching_factors) def parent_ndn(nodename): if nodename == 'ROOT': return None else: return re.search(r'(.+)_(\d+)',nodename).group(1) def option_string_to_dict(ostr): """ Convert a string to the standard dict for solver options. Intended for use in the calling program; not internal use here. Jan 27, 2026: if ostr happens to be a dict already, just return it. Args: ostr (string): space seperated options with = for arguments (if it happens to be a dict already, just return it) Returns: solver_options (dict): solver options """ def convert_value_string_to_number(s): try: return int(s) except ValueError: try: return float(s) except ValueError: return s if isinstance(ostr, dict): return ostr solver_options = dict() if ostr is None or ostr == "": return solver_options for this_option_string in ostr.split(): this_option_pieces = this_option_string.strip().split("=") if len(this_option_pieces) == 2: option_key = this_option_pieces[0] option_value = convert_value_string_to_number(this_option_pieces[1]) solver_options[option_key] = option_value elif len(this_option_pieces) == 1: option_key = this_option_pieces[0] solver_options[option_key] = None else: raise RuntimeError("Illegally formed subsolve directive"\ + " option=%s detected" % this_option_string) return solver_options def option_dict_to_string(odict): """ Convert a standard dict for solver options to a string. Args: odict (dict): options dict for Pyomo Returns: ostring (str): options string as in mpi-sppy Note: None begets None, and empty begets empty """ if odict is None: return None ostr = "" for i, v in odict.items(): if v is None: ostr += "{i} " else: ostr += f"{i}={v} " return ostr # Solver-options layered representation. See # doc/designs/solver_options_redesign.md §5.2. # # A layer is a plain dict {"when": <predicate>, "options": <dict>}. # Predicates: # "default" — applies at every iteration # "iter0" — only at iteration 0 # "iterk" — iterations k >= 1 # ("starting_at_iter", N) — iterations k >= N (N >= 1) # # N = 0 is rejected explicitly. A "matches every iteration" layer # should use the `default` predicate; using # ("starting_at_iter", 0) instead would silently rank the layer # above `iter0` / `iterk` in axis-1 specificity, which is rarely # what callers intend. def _validate_when(when): """Raise ValueError if *when* is not a recognized layer predicate.""" if when in ("default", "iter0", "iterk"): return if isinstance(when, tuple) and len(when) == 2 and when[0] == "starting_at_iter": N = when[1] # bool is a subclass of int; reject it explicitly if isinstance(N, bool) or not isinstance(N, int) or N < 1: raise ValueError( f"starting_at_iter predicate requires an int >= 1; got " f"{N!r}. Use the 'default' predicate for options that " "apply at every iteration." ) return raise ValueError(f"Unknown solver-options layer predicate: {when!r}") def solver_options_layer(when, options): """Build a single solver-options layer. Args: when: predicate, one of "default", "iter0", "iterk", or ("starting_at_iter", N) with N an int >= 1. (N = 0 is rejected; use ``"default"`` instead.) options (dict): the options to fold in when the predicate matches. Returns: dict with keys "when" and "options". """ _validate_when(when) return {"when": when, "options": dict(options)} def _layer_matches(when, k): _validate_when(when) if when == "default": return True if when == "iter0": return k == 0 if when == "iterk": return k >= 1 # only ("starting_at_iter", N) remains, validated above return k >= when[1] def fold_solver_options_layers(layers, k): """Fold a list of solver-options layers into one dict for iteration k. Walks layers in list order, picks layers whose predicate matches k, and flat-dict-unions their options into a running dict (last write wins per key). Args: layers (list): list of layers as built by solver_options_layer. k (int): iteration number (0 for iter0). Returns: dict: the merged options for iteration k. """ folded = {} for layer in layers: if _layer_matches(layer["when"], k): folded.update(layer["options"]) return folded # Solver-name-aware translation for the two canonical option keys # mpi-sppy stores internally. Keys not in this table are passed # through unchanged by translate_solver_options. # # Entries map (canonical_key, solver_name) → solver-native key when # the solver names the option differently. Solver names not listed # under a canonical key use the canonical name itself (no rename). # Persistent variants (e.g. gurobi_persistent) are normalized to the # base name before lookup. _SOLVER_OPTION_TRANSLATIONS = { "mipgap": { # HiGHS uses its native option name. "highs": "mip_rel_gap", "appsi_highs": "mip_rel_gap", }, "threads": { # Gurobi parameter is conventionally capitalized. "gurobi": "Threads", "appsi_gurobi": "Threads", }, } def translate_solver_options(opts, solver_name): """Return a copy of *opts* with canonical option keys renamed to the solver's native key, where they differ. Currently translates only ``mipgap`` and ``threads``; all other keys pass through unchanged. If the user already supplied the solver-native key alongside the canonical key, the solver-native key wins and the canonical key is dropped (so the solver does not receive both forms). Args: opts (dict | None): solver options. ``None`` returns ``None``; other inputs return a new dict (the input is not mutated). solver_name (str | None): name of the target Pyomo solver plugin (e.g. ``'gurobi'``, ``'gurobi_persistent'``, ``'appsi_highs'``). ``None`` or empty returns a copy of *opts* unchanged. Returns: dict | None: translated copy of *opts*, or ``None`` if *opts* was ``None``. """ if opts is None: return None out = dict(opts) if not solver_name: return out # gurobi_persistent → gurobi; appsi_highs stays as appsi_highs; # cplex_persistent → cplex; xpress_persistent → xpress. base_name = solver_name if base_name.endswith("_persistent"): base_name = base_name[:-len("_persistent")] for canonical, mapping in _SOLVER_OPTION_TRANSLATIONS.items(): if canonical not in out: continue target = mapping.get(solver_name) or mapping.get(base_name) if target is None or target == canonical: continue if target in out: # User explicitly supplied the solver-native key; respect # it and drop the canonical to avoid sending duplicates. del out[canonical] else: out[target] = out.pop(canonical) return out # Recognized keys in a solver-options-file's per-section sub-block # (the top-level dict and each per-spoke sub-block share this shape). _OPTIONS_FILE_PREDICATES = ("default", "iter0", "iterk", "starting_at_iter") _OPTIONS_FILE_TOP_KEYS = _OPTIONS_FILE_PREDICATES + ("spokes",) _OPTIONS_FILE_SPOKE_KEYS = _OPTIONS_FILE_PREDICATES def _parse_options_file_section(section, source, *, allow_spokes): """Validate one block of a solver-options file and normalize it. Used both for the top-level (allow_spokes=True) and each per-spoke sub-block (allow_spokes=False). Returns a dict with keys "default", "iter0", "iterk" (each a dict, possibly empty), "starting_at_iter" (dict[int, dict], possibly empty), and "spokes" (dict[str, dict] when allowed; absent in spoke sub-blocks). """ if not isinstance(section, dict): raise ValueError( f"{source}: expected a JSON object, got {type(section).__name__}") allowed = _OPTIONS_FILE_TOP_KEYS if allow_spokes else _OPTIONS_FILE_SPOKE_KEYS extra = set(section) - set(allowed) if extra: raise ValueError( f"{source}: unknown key(s) {sorted(extra)}; allowed: " f"{sorted(allowed)}") out = {p: dict(section.get(p) or {}) for p in ("default", "iter0", "iterk")} raw_after = section.get("starting_at_iter") or {} if not isinstance(raw_after, dict): raise ValueError( f"{source}.starting_at_iter: expected an object mapping iteration " f"numbers to options dicts, got {type(raw_after).__name__}") coerced_after = {} for k, v in raw_after.items(): try: N = int(k) except (TypeError, ValueError): raise ValueError( f"{source}.starting_at_iter: key {k!r} is not an integer") from None if N < 1: if N == 0: raise ValueError( f"{source}.starting_at_iter: iteration index 0 is " "not allowed. Move these options into the 'default' " "sub-block if they should apply at every iteration." ) raise ValueError( f"{source}.starting_at_iter: iteration index must be >= 1; " f"got {N}") if not isinstance(v, dict): raise ValueError( f"{source}.starting_at_iter[{k}]: expected an options object, " f"got {type(v).__name__}") coerced_after[N] = dict(v) out["starting_at_iter"] = coerced_after if allow_spokes: raw_spokes = section.get("spokes") or {} if not isinstance(raw_spokes, dict): raise ValueError( f"{source}.spokes: expected an object mapping spoke " f"names to sub-blocks, got {type(raw_spokes).__name__}") out["spokes"] = { spoke_name: _parse_options_file_section( spoke_block, f"{source}.spokes.{spoke_name}", allow_spokes=False, ) for spoke_name, spoke_block in raw_spokes.items() } return out def load_solver_options_file(path): """Read a JSON solver-options file and return its parsed structure. The file's shape (see doc/designs/solver_options_redesign.md §5.3): { "default": {...}, "iter0": {...}, "iterk": {...}, "starting_at_iter": {"5": {...}, "10": {...}}, "spokes": { "lagrangian": {"default": {...}, "iter0": {...}, ...}, "reduced_costs": {...} } } All sub-blocks are optional; absent ones default to empty dicts so callers can iterate uniformly. ``starting_at_iter`` keys are JSON strings (JSON object keys must be strings) and are coerced to ints here. Args: path (str): path to a JSON file. Returns: dict: parsed structure with keys "default", "iter0", "iterk", "starting_at_iter" (dict[int, dict]), and "spokes" (dict[str, sub-block]). Raises: ValueError: on unknown keys, malformed sub-blocks, or non-integer ``starting_at_iter`` keys. The exception message names the offending source path inside the file. """ import json with open(path) as fin: try: raw = json.load(fin) except json.JSONDecodeError as e: raise ValueError(f"{path}: invalid JSON ({e})") from None return _parse_options_file_section(raw, path, allow_spokes=True) def options_file_section_to_layers(section): """Build the layer list for one parsed sub-block in axis-1 order. The returned list folds (via fold_solver_options_layers) in the order most-general → most-specific so that more-specific predicates naturally win for any iteration that matches both. Args: section (dict): output of ``_parse_options_file_section`` (or one of the per-spoke sub-blocks). Returns: list[dict]: layers in order [default, iter0, iterk, starting_at_iter:N₁, starting_at_iter:N₂, ...] (starting_at_iter sorted by ascending N). Sub-blocks that are empty contribute nothing. """ layers = [] if section["default"]: layers.append(solver_options_layer("default", section["default"])) if section["iter0"]: layers.append(solver_options_layer("iter0", section["iter0"])) if section["iterk"]: layers.append(solver_options_layer("iterk", section["iterk"])) for N in sorted(section["starting_at_iter"]): layers.append( solver_options_layer(("starting_at_iter", N), section["starting_at_iter"][N])) return layers ################################################################################ # Various utilities related to scenario rank maps (some may not be in use) def scens_to_ranks(scen_count, n_proc, rank, branching_factors = None): """ Determine the rank assignments that are made in spbase. NOTE: Callers to this should call _scentree.scen_names_to_ranks Args: scen_count (int): number of scenarios n_proc (int): the number of intra ranks (within the cylinder) rank (int): my rank (i.e., intra; i.e., within the cylinder) Returns: slices (list of ranges): the indices into all all_scenario_names to assign to rank (the list entries are ranges that correspond to ranks) scenario_name_to_rank (dict of dict): only for multi-stage keys are comms (i.e., tree nodes); values are dicts with keys that are scenario names and values that are ranks """ if not haveMPI: raise RuntimeError("scens_to_ranks called, but cannot import mpi4py") if scen_count < n_proc: raise RuntimeError( "More MPI ranks (%d) supplied than needed given the number of scenarios (%d) " % (n_proc, scen_count) ) # for now, we are treating two-stage as a special case if (branching_factors is None): avg = scen_count / n_proc slices = [list(range(int(i * avg), int((i + 1) * avg))) for i in range(n_proc)] return slices, None else: # OCT 2020: this block is never triggered and would fail. # indecision as of May 2020 (delete this comment DLW) # just make sure things are consistent with what xhat will do... # TBD: streamline all_scenario_names = ["ID"+str(i) for i in range(scen_count)] tree = _ScenTree(branching_factors, all_scenario_names) scenario_names_to_ranks, slices, = tree.scen_name_to_rank(n_proc, rank) return slices, scenario_names_to_ranks def _nodenum_before_stage(t,branching_factors): #How many nodes in a tree of stage 1,2,...,t ? #Only works with branching factors return int(sum(np.prod(branching_factors[0:i]) for i in range(t))) def find_leaves(all_nodenames): #Take a list of all nodenames from a tree, and find the leaves of it. #WARNING: We do NOT check that the tree is well constructed if all_nodenames is None or all_nodenames == ['ROOT']: return {'ROOT':False} # 2 stage problem: no leaf nodes in all_nodenames #A leaf is simply a root with no child n°0 is_leaf = dict() for ndn in all_nodenames: if ndn+"_0" in all_nodenames: is_leaf[ndn] = False else: is_leaf[ndn] = True return is_leaf class _TreeNode(): #Create the subtree generated by a node, with associated scenarios # stages are 1-based, everything else is 0-based # scenario lists are stored as (first, last) indices in all_scenarios #This is also checking that the nodes from all_nodenames are well-named. def __init__(self, Parent, scenfirst, scenlast, desc_leaf_dict, name): #desc_leaf_dict is the output of find_leaves self.scenfirst = scenfirst #id of the first scenario with this node self.scenlast = scenlast #id of the last scenario with this node self.name = name numscens = scenlast - scenfirst + 1 #number of scenarios with this node self.is_leaf = False if Parent is None: assert(self.name == "ROOT") self.stage = 1 else: self.stage = Parent.stage + 1 if len(desc_leaf_dict)==1 and list(desc_leaf_dict.keys()) == ['ROOT']: #2-stage problem, we don't create leaf nodes self.kids = [] elif name+"_0" not in desc_leaf_dict: self.is_leaf = True self.kids = [] else: if len(desc_leaf_dict) < numscens: raise RuntimeError(f"There are more scenarios ({numscens}) than remaining leaves, for the node {name}") # make children first = scenfirst self.kids = list() child_regex = re.compile(name+r'_\d*\Z') child_list = [x for x in desc_leaf_dict if child_regex.match(x) ] for i in range(len(desc_leaf_dict)): childname = name+f"_{i}" if childname not in desc_leaf_dict: if len(child_list) != i: raise RuntimeError("The all_nodenames argument is giving an inconsistent tree." f"The node {name} has {len(child_list)} children, but {childname} is not one of them.") break childdesc_regex = re.compile(childname+r'(_\d*)*\Z') child_leaf_dict = {ndn:desc_leaf_dict[ndn] for ndn in desc_leaf_dict \ if childdesc_regex.match(ndn)} #We determine the number of children of this node child_scens_num = sum(child_leaf_dict.values()) last = first+child_scens_num - 1 self.kids.append(_TreeNode(self, first, last, child_leaf_dict, childname)) first += child_scens_num if last != scenlast: print("numscens, last, scenlast", numscens, last, scenlast) raise RuntimeError(f"Tree node did not initialize correctly for node {name}") def stage_max(self): #Return the number of stages of a subtree. #Also check that all the subtrees have the same number of stages #i.e. that the leaves are always on the same stage. if self.is_leaf: return 1 else: leaves = [child.stage_max() for child in self.kids] if leaves.count(leaves[0]) != len(leaves): maxstage = max(leaves)+ self.stage minstage = min(leaves)+ self.stage raise RuntimeError("The all_nodenames argument is giving an inconsistent tree. " f"The node {self.name} has descendant leaves with stages going from {minstage} to {maxstage}") return 1+leaves[0] class _ScenTree(): def __init__(self, all_nodenames, ScenNames): if all_nodenames is None: all_nodenames = ['ROOT'] #2 stage problem: no leaf nodes self.ScenNames = ScenNames self.NumScens = len(ScenNames) first = 0 last = self.NumScens - 1 desc_leaf_dict = find_leaves(all_nodenames) self.rootnode = _TreeNode(None, first, last, desc_leaf_dict, "ROOT") def _nonleaves(nd): if nd.is_leaf: return [] else: retval = [nd] for child in nd.kids: retval+=_nonleaves(child) return retval self.nonleaves = _nonleaves(self.rootnode) self.NumStages = \ 2 if all_nodenames == ['ROOT'] else self.rootnode.stage_max() self.NonLeafTerminals = \ [nd for nd in self.nonleaves if nd.stage == self.NumStages-1] self.NumLeaves = len(desc_leaf_dict) - len(self.nonleaves) if self.NumStages>2 and self.NumLeaves != self.NumScens: raise RuntimeError("The all_nodenames argument is giving an inconsistent tree." f"There are {self.NumLeaves} leaves for this tree, but {self.NumScens} scenarios are given.") def scen_names_to_ranks(self, n_proc): """ Args: n_proc: number of ranks in the cylinder (i.e., intra) Returns: scenario_names_to_rank (dict of dict): keys are comms (i.e., tree nodes); values are dicts with keys that are scenario names and values that are ranks within that comm slices (list of lists) indices correspond to ranks in self.mpicomm and the values are a list of scenario indices rank -> list of scenario indices for that rank list_of_ranks_per_scenario_idx (list) indices are scenario indices and values are the rank of that scenario within self.mpicomm scenario index -> rank NOTE: comm names are the same as the corresponding scenario tree node name """ scenario_names_to_rank = dict() # scenario_name_to_rank dict of dicts # one processor for the cylinder is a special case if n_proc == 1: for nd in self.nonleaves: scenario_names_to_rank[nd.name] = {s: 0 for s in self.ScenNames} return scenario_names_to_rank, [list(range(self.NumScens))], [0]*self.NumScens scen_count = len(self.ScenNames) avg = scen_count / n_proc # rank -> list of scenario indices for that rank slices = [list(range(int(i * avg), int((i + 1) * avg))) for i in range(n_proc)] # scenario index -> rank list_of_ranks_per_scenario_idx = [ rank for rank, scen_idxs in enumerate(slices) for _ in scen_idxs ] scenario_names_to_rank["ROOT"] = { s: rank for s,rank in zip(self.ScenNames, list_of_ranks_per_scenario_idx) } def _recurse_do_node(node): for child in node.kids: first_scen_idx = child.scenfirst last_scen_idx = child.scenlast ranks_in_node = list_of_ranks_per_scenario_idx[first_scen_idx:last_scen_idx+1] minimum_rank_in_node = ranks_in_node[0] # IMPORTANT: # this accords with the way SPBase.create_communicators assigns the "key" when # creating its comm for this node. E.g., the key is the existing rank, which # will then be offset by the minimum rank. As the ranks within each node are # contiguous, this is enough to infer the rank each scenario will have in this # node's comm within_comm_ranks_in_node = [(rank-minimum_rank_in_node) for rank in ranks_in_node] scenarios_in_nodes = self.ScenNames[first_scen_idx:last_scen_idx+1] scenario_names_to_rank[child.name] = { s : rank for s,rank in zip(scenarios_in_nodes, within_comm_ranks_in_node) } if child not in self.NonLeafTerminals: _recurse_do_node(child) _recurse_do_node(self.rootnode) return scenario_names_to_rank, slices, list_of_ranks_per_scenario_idx ######## Utility to attach the one and only node to a two-stage scenario ####### def attach_root_node(model, firstobj, varlist, nonant_ef_suppl_list=None, surrogate_nonant_list=None, do_uniform=True): """ Create a root node as a list to attach to a scenario model Args: model (ConcreteModel): model to which this will be attached firstobj (Pyomo Expression): First stage cost (e.g. model.FC) varlist (list): Pyomo Vars in first stage (e.g. [model.A, model.B]) nonant_ef_suppl_list (list of pyo Var, Vardata or slices): Vars for which nonanticipativity constraints will only be added to the extensive form (important for bundling), but for which mpi-sppy will not enforce them as nonanticipative elsewhere. NOTE: These types of variables are often indicator variables that are already present in the deterministic model. surrogate_nonant_list (list of pyo Var, VarData or slices): Vars for which nonanticipativity constraints are enforced implicitly by the vars in varlist, but which may speed PH convergence and/or aid in cut generation when considered explicitly. These vars will be ignored for fixers, incumbent finders which fix nonants to calculate solutions, and the EF creator. NOTE: These types of variables are typically artificially added to the model to capture hierarchical model features. do_uniform (boolean): controls a side-effect to deal with missing probs Note: attaches a list consisting of one scenario node to the model """ model._mpisppy_node_list = [ scenario_tree.ScenarioNode("ROOT", 1.0, 1, firstobj, varlist, model, nonant_ef_suppl_list = nonant_ef_suppl_list, surrogate_nonant_list = surrogate_nonant_list, ) ] if do_uniform: # Avoid a warning per scenario if not hasattr(model, "_mpisppy_probability"): model._mpisppy_probability = "uniform" ### utilities to check the slices and the map ### def check4losses(numscens, branching_factors, scenario_names_to_rank,slices,list_of_ranks_per_scenario_idx): """ Check the data structures; gag and die if it looks bad. Args: numscens (int): number of scenarios branching_factors (list of int): branching factors scenario_names_to_rank (dict of dict): keys are comms (i.e., tree nodes); values are dicts with keys that are scenario names and values that are ranks within that comm slices (list of lists) indices correspond to ranks in self.mpicomm and the values are a list of scenario indices rank -> list of scenario indices for that rank list_of_ranks_per_scenario_idx (list) indices are scenario indices and values are the rank of that scenario within self.mpicomm scenario index -> rank """ present = [False for _ in range(numscens)] for rank, scenlist in enumerate(slices): for scen in scenlist: present[scen] = True missingsome = False for scen, there in enumerate(present): if not there: print(f"Scenario {scen} is not in slices") missingsome = True if missingsome: raise RuntimeError("Internal error: slices is not correct") # not stage presence... stagepresents = {stage: [False for _ in range(numscens)] for stage in range(len(branching_factors))} # loop over the entire structure, marking those found as present for nodename, scenlist in scenario_names_to_rank.items(): stagenum = nodename.count('_') for s in scenlist: snum = int(s[8:]) stagepresents[stagenum][snum] = True missingsome = False for stage in stagepresents: for scen, there in enumerate(stagepresents[stage]): if not there: print(f"Scenario number {scen} missing from stage {stage}.") missingsome = True if missingsome: raise RuntimeError("Internal error: scenario_name_to_rank") print("check4losses: OK") def disable_tictoc_output(): f = open(os.devnull,"w") tt_timer._ostream = f def reenable_tictoc_output(): # Primarily to re-enable after a disable tt_timer._ostream.close() tt_timer._ostream = sys.stdout def find_active_objective(pyomomodel): return find_objective(pyomomodel, active=True) def find_objective(pyomomodel, active=False): # return the only active objective or raise and error obj = list(pyomomodel.component_data_objects( Objective, active=True, descend_into=True)) if len(obj) == 1: return obj[0] if active or len(obj) > 1: raise RuntimeError("Could not identify exactly one active " "Objective for model '%s' (found %d objectives)" % (pyomomodel.name, len(obj))) # search again for a single inactive objective obj = list(pyomomodel.component_data_objects( Objective, descend_into=True)) if len(obj) == 1: return obj[0] raise RuntimeError("Could not identify exactly one objective for model " f"{pyomomodel.name} (found {len(obj)} objectives)") class NonLinearProblemFound(RuntimeError): pass def nonant_cost_coeffs(s): """ return a dictionary from s._mpisppy_data.nonant_indices.keys() to the objective cost coefficient """ objective = find_objective(s) # deal with proper bundles if hasattr(s, "_ef_scenario_names"): # The bundle's nonant_indices are NOT keyed by per-scenario position: # proper_bundler builds the bundle's nonantlist by iterating # bundle.ref_vars in insertion order and skipping ref_surrogate_vars, # and create_EF skips per-scenario positions whose var is fixed # (nonant_for_fixed_vars=False). So bundle nonant index k is the # k-th surviving (ndn, per_scen_i) entry. Map per-scenario var id -> # bundle (ndn, k) using that ordering. # Counters are kept per-ndn for generality, but proper_bundler # currently only attaches a ROOT node to bundles (see # proper_bundler.attach_root_node), so in practice only ROOT keys # appear here. The per-ndn form keeps this correct if that ever # changes. per_scen_to_bundle = {} counters = {} for (ndn, per_scen_i) in s.ref_vars.keys(): if (ndn, per_scen_i) in s.ref_surrogate_vars: continue counters.setdefault(ndn, 0) per_scen_to_bundle[(ndn, per_scen_i)] = (ndn, counters[ndn]) counters[ndn] += 1 nonant_varids = {} for scenario_name in s._ef_scenario_names: scenario = s.component(scenario_name) for node in scenario._mpisppy_node_list: ndn = node.name for per_scen_i, v in enumerate(node.nonant_vardata_list): bundle_key = per_scen_to_bundle.get((ndn, per_scen_i)) if bundle_key is not None: nonant_varids[id(v)] = bundle_key else: nonant_varids = s._mpisppy_data.varid_to_nonant_index # initialize to 0 cost_coefs = {ndn_i: 0 for ndn_i in s._mpisppy_data.nonant_indices} repn = generate_standard_repn(objective.expr, quadratic=False) for coef, var in zip(repn.linear_coefs, repn.linear_vars): if id(var) in nonant_varids: cost_coefs[nonant_varids[id(var)]] += coef for var in repn.nonlinear_vars: if id(var) in s._mpisppy_data.varid_to_nonant_index: raise RuntimeError( "A call to nonant_cost_coefficient found nonlinear variables in the objective function. " f"Variable {var} has nonlinear interactions in the objective funtion. " "Consider using gradient-based rho." ) return cost_coefs def create_nodenames_from_branching_factors(BFS): """ This function creates the node names of a tree without creating the whole tree. Parameters ---------- BFS : list of integers Branching factors. Returns ------- nodenames : list of str a list of the node names induced by branching_factors, including leaf nodes. """ stage_nodes = ["ROOT"] nodenames = ['ROOT'] if len(BFS)==1 : #2stage return(nodenames) for bf in BFS[:(len(BFS))]: old_stage_nodes = stage_nodes stage_nodes = [] for k in range(len(old_stage_nodes)): stage_nodes += ['%s_%i'%(old_stage_nodes[k],b) for b in range(bf)] nodenames += stage_nodes return nodenames def get_branching_factors_from_nodenames(all_nodenames): #WARNING: Do not work with unbalanced trees staget_node = "ROOT" branching_factors = [] while staget_node+"_0" in all_nodenames: child_regex = re.compile(staget_node+r'_\d*\Z') child_list = [x for x in all_nodenames if child_regex.match(x) ] branching_factors.append(len(child_list)) staget_node += "_0" if len(branching_factors)==1: #2stage return None else: return branching_factors def number_of_nodes(branching_factors): #How many nodes does a tree with a given branching_factors have ? last_node_stage_num = [i-1 for i in branching_factors] return node_idx(last_node_stage_num, branching_factors) def module_name_to_module(module_name): if inspect.ismodule(module_name): module = module_name else: module = importlib.import_module(module_name) return module if __name__ == "__main__": branching_factors = [2,2,2,3] numscens = np.prod(branching_factors) scennames = ["Scenario"+str(i) for i in range(numscens)] testtree = _ScenTree(branching_factors, scennames) print("nonleaves:") for nd in testtree.nonleaves: print(" ", nd.name) print("NonLeafTerminals:") for nd in testtree.NonLeafTerminals: print(" ", nd.name) n_proc = 8 sntr, slices, ranks_per_scenario = testtree.scen_names_to_ranks(n_proc) print("map:") for ndn,v in sntr.items(): print(ndn, v) print(f"slices: {slices}") check4losses(numscens, branching_factors, sntr, slices, ranks_per_scenario)