Source code for stoch_distr

###############################################################################
# mpi-sppy: MPI-based Stochastic Programming in PYthon
#
# Copyright (c) 2024, Lawrence Livermore National Security, LLC, Alliance for
# Sustainable Energy, LLC, The Regents of the University of California, et al.
# All rights reserved. Please see the files COPYRIGHT.md and LICENSE.md for
# full copyright and license information.
###############################################################################
# Network Flow - various formulations
import pyomo.environ as pyo
import mpisppy.utils.sputils as sputils
import mpisppy.tests.examples.distr.distr_data as distr_data
import numpy as np

# In this file, we create a (linear) inter-region minimal cost distribution problem.
# Our data, gives the constraints inside each in region in region_dict_creator
# and amongst the different regions in inter_region_dict_creator
# The data slightly differs depending on the number of regions (num_scens) which is created for 2, 3 or 4 regions

# Note: regions in our model will be represented in mpi-sppy by scenarios and to ensure the inter-region constraints
#       we will impose the inter-region arcs to be consensus-variables. They will be represented as non anticipative variables in mpi-sppy


[docs] def inter_arcs_adder(region_dict, inter_region_dict): """This function adds to the region_dict the inter-region arcs Args: region_dict (dict): dictionary for the current scenario \n inter_region_dict (dict): dictionary of the inter-region relations Returns: local_dict (dict): This dictionary copies region_dict, completes the already existing fields of region_dict to represent the inter-region arcs and their capcities and costs """ ### Note: The cost of the arc is here chosen to be split equally between the source region and target region # region_dict is renamed as local_dict because it no longer contains only information about the region # but also contains local information that can be linked to the other scenarios (with inter-region arcs) local_dict = region_dict for inter_arc in inter_region_dict["arcs"]: source,target = inter_arc region_source,node_source = source region_target,node_target = target if region_source == region_dict["name"] or region_target == region_dict["name"]: arc = node_source, node_target local_dict["arcs"].append(arc) local_dict["flow capacities"][arc] = inter_region_dict["capacities"][inter_arc] local_dict["flow costs"][arc] = inter_region_dict["costs"][inter_arc]/2 #print(f"scenario name = {region_dict['name']} \n {local_dict=} ") return local_dict
###Creates the model when local_dict is given, local_dict depends on the subproblem
[docs] def min_cost_distr_problem(local_dict, cfg, stoch_scenario_name, sense=pyo.minimize, max_revenue=None): """ Create an arcs formulation of network flow for the region and stochastic scenario considered. Args: local_dict (dict): dictionary representing a region including the inter region arcs \n cfg (): config argument used here for the random parameters \n stoch_scenario_name(str): name of the stochastic scenario. In each region, the model is scenario dependant \n sense (=pyo.minimize): we aim to minimize the cost, this should always be minimize \n max_revenue (float, opt): higher than all the possible revenues, this value allows to add a penalty slack making sure that the optimal value of the new model is obtained with a 0 slack Returns: model (Pyomo ConcreteModel) : the instantiated model """ scennum = sputils.extract_num(stoch_scenario_name) # Assert sense == pyo.minimize, "sense should be equal to pyo.minimize" # First, make the special In, Out arc lists for each node arcsout = {n: list() for n in local_dict["nodes"]} arcsin = {n: list() for n in local_dict["nodes"]} for a in local_dict["arcs"]: if a[0] in local_dict["nodes"]: arcsout[a[0]].append(a) if a[1] in local_dict["nodes"]: arcsin[a[1]].append(a) model = pyo.ConcreteModel(name='MinCostFlowArcs') def flowBounds_rule(model, i,j): return (0, local_dict["flow capacities"][(i,j)]) model.flow = pyo.Var(local_dict["arcs"], bounds=flowBounds_rule) # x def slackBounds_rule(model, n): if n in local_dict["factory nodes"]: return (0, local_dict["supply"][n]) elif n in local_dict["buyer nodes"]: return (local_dict["supply"][n], 0) elif n in local_dict["distribution center nodes"]: if cfg.ensure_xhat_feas: # Should be (0,0) but to avoid infeasibility we add a negative slack variable return (None, 0) else: return (0,0) else: raise ValueError(f"unknown node type for node {n}") model.y = pyo.Var(local_dict["nodes"], bounds=slackBounds_rule) def inventoryBounds_rule(model, n): return (0,local_dict["supply"][n]) ### We can store everything to make sure it is feasible, but it will never be interesting model.inventory = pyo.Var(local_dict["factory nodes"], bounds=inventoryBounds_rule) model.FirstStageCost = pyo.Expression(expr=\ sum(local_dict["production costs"][n]*(local_dict["supply"][n]-model.y[n]) for n in local_dict["factory nodes"])) if cfg.ensure_xhat_feas: # too big penalty to allow the stack to be non-zero model.SecondStageCost = pyo.Expression(expr=\ sum(local_dict["flow costs"][a]*model.flow[a] for a in local_dict["arcs"]) \ + sum(local_dict["revenues"][n]*(local_dict["supply"][n]-model.y[n]) for n in local_dict["buyer nodes"]) \ + sum(2*max_revenue*(-model.y[n]) for n in local_dict["distribution center nodes"]) \ - sum(local_dict["production costs"][n]/2*model.inventory[n] for n in local_dict["factory nodes"])) else: model.SecondStageCost = pyo.Expression(expr=\ sum(local_dict["flow costs"][a]*model.flow[a] for a in local_dict["arcs"]) \ + sum(local_dict["revenues"][n]*(local_dict["supply"][n]-model.y[n]) for n in local_dict["buyer nodes"]) \ - sum(local_dict["production costs"][n]/2*model.inventory[n] for n in local_dict["factory nodes"]) ) model.MinCost = pyo.Objective(expr=model.FirstStageCost + model.SecondStageCost, sense=sense) def FlowBalance_rule(m, n): if n in local_dict["factory nodes"]: # We generate pseudo randomly the loss on each factory node node_type, region_num, count = distr_data.parse_node_name(n) node_num = distr_data._node_num(cfg.mnpr, node_type, region_num, count) np.random.seed(node_num+cfg.initial_seed+(scennum+1)*2**20) #2**20 avoids the correlation with the scalable example data return sum(m.flow[a] for a in arcsout[n])\ - sum(m.flow[a] for a in arcsin[n])\ + m.inventory[n] \ == (local_dict["supply"][n] - m.y[n]) * min(1,max(0,1-np.random.normal(cfg.spm,cfg.cv)/100)) # We add the loss else: return sum(m.flow[a] for a in arcsout[n])\ - sum(m.flow[a] for a in arcsin[n])\ + m.y[n] == local_dict["supply"][n] model.FlowBalance= pyo.Constraint(local_dict["nodes"], rule=FlowBalance_rule) return model
###Functions required in other files, which constructions are specific to the problem ###Creates the scenario
[docs] def scenario_creator(admm_stoch_subproblem_scenario_name, inter_region_dict=None, cfg=None, data_params=None, all_nodes_dict=None): """Creates the model, which should include the consensus variables. \n However, this function shouldn't attach the consensus variables for the admm subproblems as it is done in stoch_admmWrapper. Therefore, only the stochastic tree as it would be represented without the decomposition needs to be created. Args: admm_stoch_subproblem_scenario_name (str): the name given to the admm problem for the stochastic scenario. \n inter_region_dict (dict): contains all the links between the regions num_scens (int): number of scenarios (regions). Useful to create the corresponding inter-region dictionary \n other parameters are key word arguments for scenario_creator Returns: Pyomo ConcreteModel: the instantiated model """ assert (inter_region_dict is not None) assert (cfg is not None) admm_subproblem_name, stoch_scenario_name = split_admm_stoch_subproblem_scenario_name(admm_stoch_subproblem_scenario_name) if cfg.scalable: assert (data_params is not None) assert (all_nodes_dict is not None) region_dict = distr_data.scalable_region_dict_creator(admm_subproblem_name, all_nodes_dict=all_nodes_dict, cfg=cfg, data_params=data_params) else: region_dict = distr_data.region_dict_creator(admm_subproblem_name) # Adding inter region arcs nodes and associated features local_dict = inter_arcs_adder(region_dict, inter_region_dict) # Generating the model model = min_cost_distr_problem(local_dict, cfg, stoch_scenario_name, max_revenue=data_params["max revenue"]) # Stash the original first-stage variable list so the module-level # first_stage_varlist hook can retrieve it. Stoch_AdmmWrapper will # call sputils.attach_root_node on our behalf using the hooks. model._first_stage_vars = [model.y[n] for n in local_dict["factory nodes"]] return model
def first_stage_cost(scenario): """Return the original problem's first-stage cost expression. Used by Stoch_AdmmWrapper to attach the root node automatically; the user no longer needs to call sputils.attach_root_node in scenario_creator. See doc/src/generic_admm.rst. """ return scenario.FirstStageCost def first_stage_varlist(scenario): """Return the original problem's first-stage variables (factory production decisions for stoch_distr). Companion to first_stage_cost. """ return scenario._first_stage_vars def scenario_denouement(rank, admm_stoch_subproblem_scenario_name, scenario, eps=10**(-6)): """For each admm stochastic scenario subproblem prints its name and the final variable values Args: rank (int): rank in which the scenario is placed admm_stoch_subproblem_scenario_name (str): name of the admm stochastic scenario subproblem scenario (Pyomo ConcreteModel): the instantiated model eps (float, opt): ensures that the dummy slack variables introduced have small values """ #print(f"slack values for the distribution centers for {admm_stoch_subproblem_scenario_name=} at {rank=}") for var in scenario.y: if 'DC' in var: if (abs(scenario.y[var].value) > eps): print(f"The penalty slack {scenario.y[var].name} is too big, its absolute value is {abs(scenario.y[var].value)}") #assert (abs(scenario.y[var].value) < eps), "the penalty slack is too big" #scenario.y[var].pprint() if 'F' in var: if (scenario.inventory[var].value > 10*eps): print(f"In {rank=} for {admm_stoch_subproblem_scenario_name}, the inventory {scenario.inventory[var].name} is big, its value is {scenario.inventory[var].value}. Although it is rare after convergence, it makes sense if in a scenario it is really interesting to produce.") scenario.inventory[var].pprint() #assert (abs(scenario.inventory[var].value) < eps), "the inventory is too big" #scenario.inventory[var].pprint() return print(f"flow values for {admm_stoch_subproblem_scenario_name=} at {rank=}") scenario.flow.pprint() print(f"slack values for {admm_stoch_subproblem_scenario_name=} at {rank=}") scenario.y.pprint()
[docs] def consensus_vars_creator(admm_subproblem_names, stoch_scenario_name, inter_region_dict=None, cfg=None, data_params=None, all_nodes_dict=None): """The following function creates the consensus variables dictionary thanks to the inter-region dictionary. \n This dictionary has redundant information, but is useful for admmWrapper. Args: admm_subproblem_names (list of str): name of the admm subproblems (regions) \n stoch_scenario_name (str): name of any stochastic_scenario, it is only used in this example to access the non anticipative variables (which are common to every stochastic scenario) and their stage. \n kwargs (opt): necessary in this example to call the scenario creator Returns: dict: dictionary which keys are the subproblems and values are the list of pairs (consensus_variable_name (str), stage (int)). """ consensus_vars = {} for inter_arc in inter_region_dict["arcs"]: source,target = inter_arc region_source,node_source = source region_target,node_target = target arc = node_source, node_target vstr = f"flow[{arc}]" #variable name as string, y is the slack #adds inter_region_arcs in the source region if region_source not in consensus_vars: #initiates consensus_vars[region_source] consensus_vars[region_source] = list() consensus_vars[region_source].append((vstr,2)) #adds inter_region_arcs in the target region if region_target not in consensus_vars: #initiates consensus_vars[region_target] consensus_vars[region_target] = list() consensus_vars[region_target].append((vstr,2)) # With the scalable example some regions may have no consensus_vars # This is not realistic because it would imply that the region is not linked for admm_subproblem_name in admm_subproblem_names: if admm_subproblem_name not in consensus_vars: print(f"WARNING: {admm_subproblem_name} has no consensus_vars") consensus_vars[admm_subproblem_name] = list() # Each ADMM subproblem's first-stage Vars are added to its # consensus list by the wrapper (Stoch_AdmmWrapper / AdmmBundler) # at construction time, driven by the first_stage_varlist hook. return consensus_vars
def stoch_scenario_names_creator(cfg): """Creates the name of every stochastic scenario. Args: cfg: config object Returns: list (str): the list of stochastic scenario names """ return [f"StochasticScenario{i+1}" for i in range(cfg.num_stoch_scens)] def admm_subproblem_names_creator(cfg): """Creates the name of every admm subproblem. Args: cfg: config object Returns: list (str): the list of admm subproblem names """ return [f"Region{i+1}" for i in range(cfg.num_admm_subproblems)] def combining_names(admm_subproblem_name,stoch_scenario_name): # Used to create the admm_stoch_subproblem_scenario_name return f"ADMM_STOCH_{admm_subproblem_name}_{stoch_scenario_name}" def admm_stoch_subproblem_scenario_names_creator(admm_subproblem_names,stoch_scenario_names): """ Creates the list of the admm stochastic subproblem scenarios, which are the admm subproblems given a scenario Args: admm_subproblem_names (list of str): names of the admm subproblem stoch_scenario_names (list of str): names of the stochastic scenario Returns: list of str: name of the admm stochastic subproblem scenarios """ ### The order is important, we may want all the subproblems to appear consecutively in a scenario return [combining_names(admm_subproblem_name,stoch_scenario_name) \ for stoch_scenario_name in stoch_scenario_names \ for admm_subproblem_name in admm_subproblem_names ] def split_admm_stoch_subproblem_scenario_name(admm_stoch_subproblem_scenario_name): """ Returns the admm_subproblem_name and the stoch_scenario_name given an admm_stoch_subproblem_scenario_name. This function, specific to the problem, is the reciprocal function of ``combining_names`` which creates the admm_stoch_subproblem_scenario_name given the admm_subproblem_name and the stoch_scenario_name. Args: admm_stoch_subproblem_scenario_name (str) Returns: (str,str): admm_subproblem_name, stoch_scenario_name """ # Method specific to our example and because the admm_subproblem_name and stoch_scenario_name don't include "_" splitted = admm_stoch_subproblem_scenario_name.split('_') assert (len(splitted) == 4), "no underscore should be attached to admm_subproblem_name nor stoch_scenario_name" admm_subproblem_name = splitted[2] stoch_scenario_name = splitted[3] return admm_subproblem_name, stoch_scenario_name
[docs] def kw_creator(cfg): """ Args: cfg (config): specifications for the problem given on the command line Returns: dict (str): the kwargs that are used in stoch_distr.scenario_creator """ if cfg.scalable: import json json_file_path = cfg.get("json_file_path", ifmissing="../distr/data_params.json") with open(json_file_path, 'r') as file: data_params = json.load(file) # In distr_data num_admm_subproblems is called num_scens if cfg.get("num_scens") is None: cfg.add_to_config("num_scens", description="num admm subproblems", domain=int, default=cfg.num_admm_subproblems) all_nodes_dict = distr_data.all_nodes_dict_creator(cfg, data_params) all_DC_nodes = [DC_node for region in all_nodes_dict for DC_node in all_nodes_dict[region]["distribution center nodes"]] inter_region_dict = distr_data.scalable_inter_region_dict_creator(all_DC_nodes, cfg, data_params) else: inter_region_dict = distr_data.inter_region_dict_creator(num_scens=cfg.num_admm_subproblems) all_nodes_dict = None data_params = {"max revenue": 1200} kwargs = { "all_nodes_dict" : all_nodes_dict, "inter_region_dict" : inter_region_dict, "cfg" : cfg, "data_params" : data_params, } return kwargs
[docs] def inparser_adder(cfg): """Adding to the config argument, specific elements to our problems. In this case the numbers of stochastic scenarios and admm subproblems which are required + elements used for random number generation + possibility to scale Args: cfg (config): specifications for the problem given on the command line """ cfg.add_to_config( "num_stoch_scens", description="Number of stochastic scenarios (default None)", domain=int, default=None, argparse_args = {"required": True} ) cfg.add_to_config( "num_admm_subproblems", description="Number of admm subproblems (regions)", domain=int, default=None, argparse_args = {"required": True} ) ### For the pseudo-random scenarios cfg.add_to_config("spm", description="mean percentage of scrap loss at the production", domain=float, default=5) cfg.add_to_config("cv", description="coefficient of variation of the loss at the production", domain=float, default=20) cfg.add_to_config("initial_seed", description="initial seed for generating the loss", domain=int, default=0) ### For the scalable example cfg.add_to_config("scalable", description="generate pseudo-random data parameterized by --mnpr instead of using the hardwired 2/3/4-region datasets", domain=bool, default=False) cfg.add_to_config("mnpr", description="max number of nodes per region and per type", domain=int, default=4) cfg.add_to_config("ensure_xhat_feas", description="adds slacks with high costs to ensure the feasibility of xhat yet maintaining the optimal", domain=bool, default=False)