# ___________________________________________________________________________ # # Pyomo: Python Optimization Modeling Objects # Copyright 2017 National Technology and Engineering Solutions of Sandia, LLC # Under the terms of Contract DE-NA0003525 with National Technology and # Engineering Solutions of Sandia, LLC, the U.S. Government retains certain # rights in this software. # This software is distributed under the 3-clause BSD License. # ___________________________________________________________________________ import sys import re import os import tempfile from os.path import join, dirname, abspath import difflib import filecmp import shutil import pyutilib.misc import pyutilib.th as unittest from six import StringIO thisdir = dirname(abspath(__file__)) baselinedir = os.path.join(thisdir, "smps_embedded_baselines") pysp_examples_dir = \ join(dirname(dirname(dirname(dirname(thisdir)))), "examples", "pysp") import pyomo.environ import pyomo.environ as aml from pyomo.pysp.embeddedsp import (EmbeddedSP, StochasticDataAnnotation, TableDistribution, UniformDistribution, StageCostAnnotation, VariableStageAnnotation) baa99_basemodel = None piecewise_model_embedded = None def setUpModule(): global baa99_basemodel global piecewise_model_embedded if "baa99_basemodel" in sys.modules: del sys.modules["baa99_basemodel"] fname = os.path.join(pysp_examples_dir, "baa99", "baa99_basemodel.py") if os.path.exists(fname+"c"): os.remove(fname+"c") baa99_basemodel = pyutilib.misc.import_file(fname) if "piecewise_model_embedded" in sys.modules: del sys.modules["piecewise_model_embedded"] fname = os.path.join(thisdir, "piecewise_model_embedded.py") if os.path.exists(fname+"c"): os.remove(fname+"c") piecewise_model_embedded = pyutilib.misc.import_file(fname) def tearDownModule(): global baa99_basemodel global piecewise_model_embedded if "baa99_basemodel" in sys.modules: del sys.modules["baa99_basemodel"] baa99_basemodel = None if "piecewise_model_embedded" in sys.modules: del sys.modules["piecewise_model_embedded"] piecewise_model_embedded = None @unittest.category('nightly') class TestSMPSEmbeddedBad(unittest.TestCase): @classmethod def setUpClass(cls): cls.tmpdir = tempfile.mkdtemp(dir=thisdir) @classmethod def tearDownClass(cls): shutil.rmtree(cls.tmpdir, ignore_errors=True) cls.tmpdir = None def _get_base_model(self): model = aml.ConcreteModel() model.x = aml.Var() model.y = aml.Var() model.d1 = aml.Param(mutable=True, initialize=0.0) model.d2 = aml.Param(mutable=True, initialize=0.0) model.d3 = aml.Param(mutable=True, initialize=0.0) model.cost = aml.Expression([1,2]) model.cost[1].expr = model.x model.cost[2].expr = model.d1*model.y model.o = aml.Objective(expr= model.cost[1]+model.cost[2]) model.c1 = aml.Constraint(expr= model.x >= 0) model.c2 = aml.Constraint(expr= model.y*model.d2 >= model.d3) model.varstage = VariableStageAnnotation() model.varstage.declare(model.x, 1) model.varstage.declare(model.y, 2) model.stagecost = StageCostAnnotation() model.stagecost.declare(model.cost[1], 1) model.stagecost.declare(model.cost[2], 2) model.stochdata = StochasticDataAnnotation() model.stochdata.declare( model.d1, distribution=TableDistribution([0.0,1.0])) model.stochdata.declare( model.d2, distribution=TableDistribution([0.0,1.0])) model.stochdata.declare( model.d3, distribution=TableDistribution([0.0,1.0])) return model def test_makes_directory(self): tmpdir = tempfile.mkdtemp(dir=thisdir) self.assertTrue(os.path.exists(tmpdir)) shutil.rmtree(tmpdir, ignore_errors=True) self.assertFalse(os.path.exists(tmpdir)) sp = EmbeddedSP(self._get_base_model()) pyomo.pysp.convert.smps.convert_embedded(tmpdir, 'test', sp) self.assertTrue(os.path.exists(tmpdir)) shutil.rmtree(tmpdir, ignore_errors=True) def test_too_many_stages(self): sp = EmbeddedSP(self._get_base_model()) sp.time_stages = [1,2,3] with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual(str(cm.exception), ("SMPS conversion does not yet handle more " "than 2 time-stages")) def test_stoch_variable_bounds(self): # Note there is no way to test this starting from a real model # because the Pyomo AML does not return what is assigned # to the bounds without converting it to a raw value (e.g., # we can't detect if a mutable Param was used in a bound) class _Junk(object): pass sp = _Junk() sp.time_stages = [1,2] sp.has_stochastic_variable_bounds = True self.assertEqual(sp.has_stochastic_variable_bounds, True) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual(str(cm.exception), ("Problems with stochastic variables bounds " "can not be converted into an embedded " "SMPS representation")) def test_nonlinear_stoch_objective(self): model = self._get_base_model() model.cost[2].expr = model.y**2 + model.d1 sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertTrue(str(cm.exception).startswith( "Cannot output embedded SP representation for component " "'o'. The embedded SMPS writer does not yet handle " "stochastic nonlinear expressions. Invalid expression: ")) def test_stoch_data_too_many_uses_objective(self): model = self._get_base_model() model.cost[2].expr = model.d1*model.y + model.d1 sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual( str(cm.exception), ("Cannot output embedded SP representation for component " "'o'. The embedded SMPS writer does not yet handle the " "case where a stochastic data component appears in " "multiple expressions or locations within a single " "expression (e.g., multiple constraints, or multiple " "variable coefficients within a constraint). The " "parameter 'd1' appearing in component 'o' was " "previously encountered in another location in " "component 'o'.")) def test_stoch_data_nontrivial_expression_objective1(self): model = self._get_base_model() model.cost[2].expr = -model.d1*model.y sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertTrue(str(cm.exception).startswith( "Cannot output embedded SP representation for component " "'o'. The embedded SMPS writer does not yet handle the " "case where a stochastic data component appears " "in an expression that defines a single variable's " "coefficient. The coefficient for variable 'y' must be " "exactly set to parameter 'd1' in the expression. Invalid " "expression: ")) def test_stoch_data_nontrivial_expression_objective2(self): model = self._get_base_model() model.q = aml.Param(mutable=True, initialize=0.0) model.stochdata.declare( model.q, distribution=TableDistribution([0.0,1.0])) model.cost[2].expr = (model.d1+model.q)*model.y sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual( str(cm.exception), ("Cannot output embedded SP representation for component " "'o'. The embedded SMPS writer does not yet handle the " "case where multiple stochastic data components appear " "in an expression that defines a single variable's " "coefficient. The coefficient for variable 'y' involves " "stochastic parameters: ['d1', 'q']")) def test_bad_distribution_objective(self): model = self._get_base_model() del model.stochdata model.stochdata = StochasticDataAnnotation() model.stochdata.declare( model.d1, distribution=UniformDistribution(0.0,1.0)) sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual( str(cm.exception), ("Invalid distribution type 'UniformDistribution' for stochastic " "parameter 'd1'. The embedded SMPS writer currently " "only supports discrete table distributions of type " "pyomo.pysp.embeddedsp.TableDistribution.")) def test_nonlinear_stoch_constraint(self): model = self._get_base_model() model.c2._body = model.d2*model.y**2 sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertTrue(str(cm.exception).startswith( "Cannot output embedded SP representation for component " "'c2'. The embedded SMPS writer does not yet handle " "stochastic nonlinear expressions. Invalid expression: ")) def test_stoch_constraint_body_constant(self): model = self._get_base_model() model.q = aml.Param(mutable=True, initialize=0.0) model.stochdata.declare( model.q, distribution=TableDistribution([0.0,1.0])) model.c2._body = model.d2*model.y + model.q sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual( str(cm.exception), ("Cannot output embedded SP representation for component " "'c2'. The embedded SMPS writer does not yet handle the " "case where a stochastic data appears in the body of a " "constraint expression that must be moved to the bounds. " "The constraint must be written so that the stochastic " "element 'q' is a simple bound or a simple variable " "coefficient.")) def test_stoch_range_constraint(self): model = self._get_base_model() model.q = aml.Param(mutable=True, initialize=0.0) model.stochdata.declare( model.q, distribution=TableDistribution([0.0,1.0])) model.c3 = aml.Constraint(expr=aml.inequality(model.q, model.y, 0)) sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual( str(cm.exception), ("Cannot output embedded SP representation for component " "'c3'. The embedded SMPS writer does not yet handle range " "constraints that have stochastic data.")) def test_stoch_data_too_many_uses_constraint(self): model = self._get_base_model() model.c2._lower = model.d2 sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual( str(cm.exception), ("Cannot output embedded SP representation for component " "'c2'. The embedded SMPS writer does not yet handle the " "case where a stochastic data component appears in " "multiple expressions or locations within a single " "expression (e.g., multiple constraints, or multiple " "variable coefficients within a constraint). The " "parameter 'd2' appearing in component 'c2' was " "previously encountered in another location in " "component 'c2'.")) def test_stoch_data_nontrivial_expression_constraint1(self): model = self._get_base_model() model.c2._body = -model.d2*model.y sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertTrue(str(cm.exception).startswith( "Cannot output embedded SP representation for component " "'c2'. The embedded SMPS writer does not yet handle the " "case where a stochastic data component appears " "in an expression that defines a single variable's " "coefficient. The coefficient for variable 'y' must be " "exactly set to parameter 'd2' in the expression. Invalid " "expression: ")) def test_stoch_data_nontrivial_expression_constraint2(self): model = self._get_base_model() model.q = aml.Param(mutable=True, initialize=0.0) model.stochdata.declare( model.q, distribution=TableDistribution([0.0,1.0])) model.c2._body = (model.d2+model.q)*model.y sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual( str(cm.exception), ("Cannot output embedded SP representation for component " "'c2'. The embedded SMPS writer does not yet handle the " "case where multiple stochastic data components appear " "in an expression that defines a single variable's " "coefficient. The coefficient for variable 'y' involves " "stochastic parameters: ['d2', 'q']")) def test_bad_distribution_constraint(self): model = self._get_base_model() del model.stochdata model.stochdata = StochasticDataAnnotation() model.stochdata.declare( model.d2, distribution=UniformDistribution(0.0,1.0)) sp = EmbeddedSP(model) with self.assertRaises(ValueError) as cm: pyomo.pysp.convert.smps.convert_embedded(self.tmpdir, 'test', sp) self.assertEqual( str(cm.exception), ("Invalid distribution type 'UniformDistribution' for stochastic " "parameter 'd2'. The embedded SMPS writer currently " "only supports discrete table distributions of type " "pyomo.pysp.embeddedsp.TableDistribution.")) @unittest.category('nightly') class TestSMPSEmbedded(unittest.TestCase): def _diff(self, baselinedir, outputdir, dc=None): if dc is None: dc = filecmp.dircmp(baselinedir, outputdir, ['.svn']) if dc.left_only: self.fail("Files or subdirectories missing from output: " +str(dc.left_only)) if dc.right_only: self.fail("Files or subdirectories missing from baseline: " +str(dc.right_only)) for name in dc.diff_files: fromfile = join(dc.left, name) tofile = join(dc.right, name) with open(fromfile, 'r') as f_from: fromlines = f_from.readlines() with open(tofile, 'r') as f_to: tolines = f_to.readlines() diff = difflib.context_diff(fromlines, tolines, fromfile+" (baseline)", tofile+" (output)") diff = list(diff) # The filecmp.dircmp function does a weaker # comparison that can sometimes lead to false # positives. Make sure the true diff is not empty # before we call this a failure. if len(diff) > 0: out = StringIO() out.write("Output file does not match baseline:\n") for line in diff: out.write(line) self.fail(out.getvalue()) for subdir in dc.subdirs: self._diff(join(baselinedir, subdir), join(outputdir, subdir), dc=dc.subdirs[subdir]) shutil.rmtree(outputdir, ignore_errors=True) def _run(self, sp, basename, **kwds): class_name, test_name = self.id().split('.')[-2:] output_directory = join(thisdir, class_name+"."+test_name) shutil.rmtree(output_directory, ignore_errors=True) os.makedirs(output_directory) pyomo.pysp.convert.smps.convert_embedded(output_directory, basename, sp, **kwds) return output_directory def _get_baa99_sp(self): model = baa99_basemodel.model.clone() model.varstage = VariableStageAnnotation() model.varstage.declare(model.x1, 1) model.varstage.declare(model.x2, 1) model.stagecost = StageCostAnnotation() model.stagecost.declare(model.FirstStageCost, 1) model.stagecost.declare(model.SecondStageCost, 2) model.stochdata = StochasticDataAnnotation() model.stochdata.declare( model.d1_rhs, distribution=TableDistribution(model.d1_rhs_table)) model.stochdata.declare( model.d2_rhs, distribution=TableDistribution(model.d2_rhs_table)) return EmbeddedSP(model) def test_baa99_embedded_LP_symbolic_labels(self): baseline_directory = os.path.join( baselinedir, 'baa99_embedded_LP_symbolic_names_baseline') output_directory = self._run( self._get_baa99_sp(), 'baa99', core_format='lp', io_options={'symbolic_solver_labels':True}) self._diff(baseline_directory, output_directory) def test_baa99_embedded_MPS_symbolic_labels(self): baseline_directory = os.path.join( baselinedir, 'baa99_embedded_MPS_symbolic_names_baseline') output_directory = self._run( self._get_baa99_sp(), 'baa99', core_format='mps', io_options={'symbolic_solver_labels':True}) self._diff(baseline_directory, output_directory) def test_piecewise_embedded_LP_symbolic_labels(self): baseline_directory = os.path.join( baselinedir, 'piecewise_embedded_LP_symbolic_names_baseline') output_directory = self._run( piecewise_model_embedded.create_embedded(), 'piecewise', core_format='lp', io_options={'symbolic_solver_labels':True}) self._diff(baseline_directory, output_directory) def test_piecewise_embedded_MPS_symbolic_labels(self): baseline_directory = os.path.join( baselinedir, 'piecewise_embedded_MPS_symbolic_names_baseline') output_directory = self._run( piecewise_model_embedded.create_embedded(), 'piecewise', core_format='mps', io_options={'symbolic_solver_labels':True}) self._diff(baseline_directory, output_directory) if __name__ == "__main__": unittest.main()