Source code for carbatpy.optimizations.helpers_optimization

from dataclasses import dataclass
from typing import Any, Iterable

import psutil
from pymoo.core.problem import ElementwiseProblem
from pymoo.core.termination import Termination
from pymoo.termination.ftol import MultiObjectiveSpaceTermination
from pymoo.termination.max_gen import MaximumGenerationTermination
from pymoo.termination.robust import RobustTermination

import carbatpy as cb


[docs] def default_n_processes() -> int: """ Determine the default number of processes to use for parallelization. By default, this is the number of physical CPU cores minus 2, with a minimum of 1. If the number of physical cores cannot be determined, it falls back to the total logical cores, or 1 as a last resort. Returns: int: Recommended number of worker processes. """ physical: int | None = psutil.cpu_count(logical=False) if physical is None: physical = psutil.cpu_count(logical=True) or 1 return max(1, physical - 2)
@dataclass
[docs] class BoundaryEntry: """ Represents a single optimization variable with its location and bounds. :param path: Keys describing the variable's location in the nested configuration dictionary, e.g. ``['hp', 'evaporator', 'dt_min']``. :param lower: Lower bound of the variable. :param upper: Upper bound of the variable. Example:: >>> entry = BoundaryEntry(path=['hp', 'evaporator', 'dt_min'], lower=1.0, upper=10.0) >>> entry.path ['hp', 'evaporator', 'dt_min'] >>> entry.lower 1.0 """
[docs] path: list[str]
[docs] lower: float | int
[docs] upper: float | int
[docs] def extract_boundary_with_path( bounds: dict[str, Any], ) -> tuple[list[list[str]], tuple[float, ...], tuple[float, ...]]: """ Extract variable paths and their lower/upper boundaries from a nested dictionary. Args: bounds (dict[str, Any]): Nested dictionary with variable boundaries, e.g. ``{'param': [lower, upper]}``. Returns: tuple[list[list[str]], tuple[float, ...], tuple[float, ...]]: - List of variable paths (each path is a list of keys). - Tuple of lower bounds. - Tuple of upper bounds. """ entries = traverse_dict(bounds) path = [e.path for e in entries] lower = tuple(e.lower for e in entries) upper = tuple(e.upper for e in entries) return path, lower, upper
[docs] def traverse_dict( d: dict[str, Any], current_path: list[str] | None = None, entries: list[BoundaryEntry] | None = None, ) -> list[BoundaryEntry]: """ Recursively traverse a nested dictionary to extract variable boundaries. Args: d (dict[str, Any]): Dictionary to traverse. current_path (list[str] | None): Current key path. Defaults to ``None``. entries (list[BoundaryEntry] | None): Collected entries. Defaults to ``None``. Returns: list[BoundaryEntry]: List of :class:`BoundaryEntry` objects with path, lower, and upper bounds. """ if current_path is None: current_path = [] if entries is None: entries = [] for key, val in d.items(): new_path = current_path + [key] if isinstance(val, dict): traverse_dict(val, new_path, entries) elif key == "fractions": for i, (l, u) in enumerate(val): entries.append(BoundaryEntry(new_path + [str(i)], l, u)) else: l, u = val entries.append(BoundaryEntry(new_path, l, u)) return entries
[docs] def create_config( values: Iterable[float] | None, paths: list[list[str]] ) -> dict[str, Any]: """ Build a nested configuration dictionary from flattened optimizer variables and paths. Fractions are handled specially: the last fraction is calculated as ``1 - sum(others)``. Args: values (Iterable[float] | None): Variable values from the optimizer. paths (list[list[str]]): Corresponding paths in the configuration dictionary. Raises: ValueError: If ``values`` is ``None``. Returns: dict[str, Any]: Nested dictionary usable by carbatpy. """ if values is None: raise ValueError("Values must not be None") config = {} normal_paths = [] fraction_paths = [] # Separate fractions from other parameters for path, value in zip(paths, values): if "fractions" in path: fraction_paths.append((path, float(value))) else: normal_paths.append((path, float(value))) # Create config with other parameters for path, value in normal_paths: current = config for key in path[:-1]: if key not in current: current[key] = {} current = current[key] current[path[-1]] = value # Create list with fractions (last one is 1-sum(fractions)) fractions_by_base_path = {} for path, value in fraction_paths: base_path = path[:-1] index = path[-1] base_key = tuple(base_path) if base_key not in fractions_by_base_path: fractions_by_base_path[base_key] = {} fractions_by_base_path[base_key][index] = value for base_key, fractions_dict in fractions_by_base_path.items(): fractions_list = [val for _, val in fractions_dict.items()] sum_existing = sum(fractions_list) missing_value = 1 - sum_existing fractions_list.append(missing_value) # Add fractions to config current = config for key in base_key[:-1]: if key not in current: current[key] = {} current = current[key] current[base_key[-1]] = fractions_list return config
[docs] def warning_score(res: dict[str, Any], system_key: None | str = None) -> float: """ Compute a total warning score from carbatpy simulation results. Args: res (dict[str, Any]): Simulation results from carbatpy. system_key (str | None): Specific system (``'hp'`` or ``'orc'``) to compute warnings for. Defaults to ``None`` for all systems. Returns: float: Absolute sum of warnings (``0.0`` if none). """ w = res["warnings"] if system_key is None else res[system_key]["warnings"] return abs(sum(item.value for item in w.values()))
[docs] def apply_dt_min_start( conf_act: dict[str, Any], opti_fun: str, paths: list[list[str]], ) -> dict[str, Any]: """ Set starting delta-T (``dT_min``) values in the config based on heat exchanger values. Args: conf_act (dict[str, Any]): Current configuration dictionary. opti_fun (str): Cycle type. Must be one of ``'hp'``, ``'orc'``, or ``'cb'``. paths (list[list[str]]): Paths to check whether ``dt_min`` is being optimized. Returns: dict[str, Any]: Updated configuration with starting ``dT_min`` values. """ if not any("dt_min" in p for p in paths): return conf_act if opti_fun == "cb": hp = conf_act.setdefault("hp", {}) orc = conf_act.setdefault("orc", {}) hp_start = hp.setdefault("start", {}) orc_start = orc.setdefault("start", {}) hp_start["dt_min"] = hp["evaporator"]["dt_min"] + 0.001 orc_start["dt_min"] = orc["condenser"]["dt_min"] elif opti_fun == "hp": start = conf_act.setdefault("start", {}) start["dt_min"] = conf_act["evaporator"]["dt_min"] elif opti_fun == "orc": start = conf_act.setdefault("start", {}) start["dt_min"] = conf_act["condenser"]["dt_min"] return conf_act
[docs] def calc_costs( results: dict[str, Any], system: str, verbose: bool = False, ) -> tuple[float, dict[str, Any], dict[str, Any]]: """ Calculate total system costs and cost distribution for HP, ORC, or CB. For ``'cb'``, distinguishes between HP and ORC components to avoid double counting shared components. Args: results (dict[str, Any]): Simulation results containing cost data. system (str): System type. Must be one of ``'hp'``, ``'orc'``, or ``'cb'``. verbose (bool): Print details about ORC cost components. Defaults to ``False``. Raises: ValueError: If ``system`` is not ``'hp'``, ``'orc'``, or ``'cb'``. Returns: tuple[float, dict[str, Any], dict[str, Any]]: - Total system cost. - HP cost dictionary. - ORC cost dictionary. """ select = ("total_costs", "cost_distribution") if system in ("hp", "orc"): return results["costs"]["total_costs"], {}, {} if system != "cb": raise ValueError(system) costs_hp = {k: results["hp"]["costs"][k] for k in select} costs_orc = {k: results["orc"]["costs"][k] for k in select} hp_total = sum(costs_hp["cost_distribution"].values()) new_orc_components = { "expander", "pump", "condenser", "evaporator", } orc_unique = sum( c for comp, c in costs_orc["cost_distribution"].items() if comp in new_orc_components ) if verbose: print( f"CB cost assumption: ORC unique components counted: {sorted(new_orc_components)}" ) return hp_total + orc_unique, costs_hp, costs_orc
@dataclass(frozen=True)
[docs] class EvalResult: """ Container for the results of a carbatpy optimization evaluation. :param performance: Performance metric of the system (e.g., COP or thermal efficiency). :param costs: Total system cost calculated for the optimization case. :param p_low: Minimum pressure value in the system, used for constraints. :param warn: Warning score representing thermodynamic or design violations. """
[docs] performance: float
[docs] costs: float
[docs] p_low: float
[docs] warn: float
[docs] def opti_func( x: Iterable[float], dir_config: Any, paths: list[list[str]], opti_fun: str, same_fluid: bool = True, heat_losses: float = 0.0, COP: float | None = None, q_dot: float | None = None, ) -> EvalResult: """ Run a carbatpy simulation and return evaluation metrics for optimization. Constructs a complete configuration from optimizer variables ``x`` and the corresponding ``paths``, simulates the chosen cycle, and assesses any thermodynamic warnings. Infeasible or violating results are returned with heavily penalized values to guide the optimizer away from invalid solutions. Args: x (Iterable[float]): Decision variable values generated by the optimizer. dir_config (Any): General carbatpy configuration. paths (list[list[str]]): Paths indicating where each value in ``x`` should be placed in the configuration dictionary. opti_fun (str): Cycle to optimize. Must be one of ``'hp'``, ``'orc'``, or ``'cb'``. same_fluid (bool): Whether to reuse the HP working fluid in the ORC. Only used in ``'cb'`` mode. Defaults to ``True``. heat_losses (float): Thermal losses in storage components. Only used in ``'cb'`` mode. Defaults to ``0.0``. COP (float | None): COP of the HP. Required if ``opti_fun='orc'``. q_dot (float | None): Heat flow of the HP. Required if ``opti_fun='orc'``. Raises: ValueError: If an unknown ``opti_fun`` is specified or required parameters are missing. Returns: EvalResult: Evaluation result containing: - ``performance``: Cycle performance (e.g., COP or thermal efficiency). - ``costs``: Total system costs. - ``p_low``: Low-side pressure, used for constraints. - ``warn``: Warning score (``0.0`` = no violations; high values indicate infeasible solutions). Note: In the presence of warnings, costs are penalized with ``1e12`` to discourage infeasible solutions during optimization. """ try: conf_act = create_config(x, paths) conf_act = apply_dt_min_start(conf_act, opti_fun, paths) if opti_fun == "cb": performance, res = cb.cb_comp.cb_calc( dir_config, same_fluid=same_fluid, heat_losses=heat_losses, config=conf_act, plotting=False, verbose=False, ) p_low = res["hp"]["output"]["start"]["p_low"] warn_hp = warning_score(res, system_key="hp") if warn_hp > 0: return EvalResult(0.0, 1e12, p_low, 100.0 + float(warn_hp)) warn_orc = warning_score(res, system_key="orc") if warn_orc > 0: return EvalResult(0.0, 1e12, p_low, float(warn_orc)) costs, _, _ = calc_costs(res, opti_fun) return EvalResult(float(performance), float(costs), p_low, 0.0) elif opti_fun == "hp": res = cb.hp_comp.heat_pump( dir_config, config=conf_act, verbose=False, plotting=False ) p_low = res["output"]["start"]["p_low"] warn = float(warning_score(res)) if warn > 0: return EvalResult(0.0, 1e12, p_low, warn) performance = res["COP"] costs, _, _ = calc_costs(res, opti_fun) return EvalResult(performance, float(costs), p_low, 0.0) elif opti_fun == "orc": res = cb.orc_comp.orc( dir_config, COP, q_dot, config=conf_act, verbose=False, plotting=False ) p_low = res["output"]["start"]["p_low"] warn = float(warning_score(res)) if warn > 0: return EvalResult(0.0, 1e12, 1e4, warn) performance = res["eta_th"] costs, _, _ = calc_costs(res, opti_fun) return EvalResult(performance, float(costs), p_low, 0.0) raise ValueError(f"Unknown opti_fun={opti_fun}") except: return EvalResult(0.0, 1e12, 1e4, 1e6)
# Pymoo initialization
[docs] class CombinedTermination(Termination): """ Termination criteria combining multiple conditions for Pymoo optimizations. Combines: - Maximum number of generations (:class:`MaximumGenerationTermination`) - Multi-objective function tolerance with robust checking (:class:`RobustTermination`) :param max_gen_termination: Termination based on maximum number of generations. :type max_gen_termination: MaximumGenerationTermination :param ftol_termination: Termination based on objective function tolerance over a rolling period. :type ftol_termination: RobustTermination :param criteria: List of active termination criteria. :type criteria: list[Termination] """ def __init__( self, max_gen: int, ftol: float, period: int, n_skip: int = 5, ) -> None: """ Initialize the combined termination. Args: max_gen (int): Maximum number of generations. ftol (float): Tolerance for objective function changes. period (int): Number of generations to check tolerance over. n_skip (int): Number of initial generations to skip for robust checking. Defaults to ``5``. """ super().__init__()
[docs] self.max_gen_termination = MaximumGenerationTermination(max_gen)
[docs] self.ftol_termination = RobustTermination( MultiObjectiveSpaceTermination(ftol, only_feas=True, n_skip=n_skip), period=period, )
# Combine all criteria
[docs] self.criteria = [ self.ftol_termination, self.max_gen_termination, ]
def _update(self, algorithm): """ Update all termination criteria based on the current algorithm state. Args: algorithm: Pymoo algorithm instance. Returns: float: Maximum value across all criteria (``1.0`` if any criterion is met, else ``0.0``). """ p = [criterion.update(algorithm) for criterion in self.criteria] criteria_names = [ "Objective values tolerance", "Max number of generations", ] for i in range(len(p)): if p[i] == 1: print(f"{criteria_names[i]} reached!") return max(p)
# Define class for pymoo
[docs] class OptiProblem(ElementwiseProblem): """ Pymoo :class:`ElementwiseProblem` wrapper for carbatpy optimizations. Handles different optimization modes (``'hp'``, ``'orc'``, ``'cb'``) and sets up variables, bounds, constraints, and the evaluation function to interface with Pymoo. :param dir_config: General configuration passed to carbatpy. :param paths_var: Paths to decision variables in the config. :type paths_var: list[list[str]] :param opti_fun: Optimized cycle (``'hp'``, ``'orc'``, or ``'cb'``). :type opti_fun: str :param same_fluid: Whether fluids are reused across cycles. :type same_fluid: bool :param heat_losses: Heat loss fraction in ``'cb'`` mode. :type heat_losses: float :param COP: COP used for ``'orc'`` optimizations. :type COP: float | None :param q_dot: Heat flow used for ``'orc'`` optimizations. :type q_dot: float | None """ def __init__(self, **kwargs):
[docs] self.dir_config: Any = kwargs.get("dir_config", None)
[docs] self.paths_var: list[list[str]] = kwargs.get("paths", [])
[docs] self.opti_fun: str = kwargs.get("opti_fun", "")
[docs] self.same_fluid: bool = kwargs.get("same_fluid", True)
[docs] self.heat_losses: float | int = kwargs.get("heat_losses", 0.0)
# Needed for ORC optimization
[docs] self.COP = kwargs.get("COP", None)
[docs] self.q_dot = kwargs.get("q_dot", None)
super().__init__(**kwargs) def _evaluate(self, x, out) -> None: """ Evaluate a single solution. Args: x: Array of decision variable values for one candidate solution. out (dict): Output dictionary to store results. Sets: out["F"]: Objective value(s) — negated performance for minimization, optionally with scaled costs as second objective. out["G"]: Constraint violations: - ``g1``: Minimum performance constraint. - ``g2``: Minimum low-side pressure constraint. - ``g3``: Warning score (thermodynamic violations). """ res = opti_func( x, dir_config=self.dir_config, paths=self.paths_var, opti_fun=self.opti_fun, same_fluid=self.same_fluid, heat_losses=self.heat_losses, COP=self.COP, q_dot=self.q_dot, ) PERF_MIN = 0.05 HP_P_LOW_MIN = 1e5 g1 = PERF_MIN - res.performance g2 = (HP_P_LOW_MIN - res.p_low) / HP_P_LOW_MIN g3 = res.warn out["G"] = [g1, g2, g3] f1 = -res.performance if self.n_obj == 1: out["F"] = f1 elif self.n_obj == 2: f2 = res.costs / 1e6 # Scale to 1e6 for better handling out["F"] = [f1, f2]