from dataclasses import dataclass
from typing import Any, Iterable
import psutil
from pymoo.core.problem import ElementwiseProblem
from pymoo.core.termination import Termination
from pymoo.termination.ftol import MultiObjectiveSpaceTermination
from pymoo.termination.max_gen import MaximumGenerationTermination
from pymoo.termination.robust import RobustTermination
import carbatpy as cb
[docs]
def default_n_processes() -> int:
"""
Determine the default number of processes to use for parallelization.
By default, this is the number of physical CPU cores minus 2, with a minimum of 1.
If the number of physical cores cannot be determined, it falls back to the
total logical cores, or 1 as a last resort.
Returns:
int: Recommended number of worker processes.
"""
physical: int | None = psutil.cpu_count(logical=False)
if physical is None:
physical = psutil.cpu_count(logical=True) or 1
return max(1, physical - 2)
@dataclass
[docs]
class BoundaryEntry:
"""
Represents a single optimization variable with its location and bounds.
:param path: Keys describing the variable's location in the nested
configuration dictionary, e.g. ``['hp', 'evaporator', 'dt_min']``.
:param lower: Lower bound of the variable.
:param upper: Upper bound of the variable.
Example::
>>> entry = BoundaryEntry(path=['hp', 'evaporator', 'dt_min'], lower=1.0, upper=10.0)
>>> entry.path
['hp', 'evaporator', 'dt_min']
>>> entry.lower
1.0
"""
[docs]
lower: float | int
[docs]
upper: float | int
[docs]
def traverse_dict(
d: dict[str, Any],
current_path: list[str] | None = None,
entries: list[BoundaryEntry] | None = None,
) -> list[BoundaryEntry]:
"""
Recursively traverse a nested dictionary to extract variable boundaries.
Args:
d (dict[str, Any]): Dictionary to traverse.
current_path (list[str] | None): Current key path. Defaults to ``None``.
entries (list[BoundaryEntry] | None): Collected entries. Defaults to ``None``.
Returns:
list[BoundaryEntry]: List of :class:`BoundaryEntry` objects with path,
lower, and upper bounds.
"""
if current_path is None:
current_path = []
if entries is None:
entries = []
for key, val in d.items():
new_path = current_path + [key]
if isinstance(val, dict):
traverse_dict(val, new_path, entries)
elif key == "fractions":
for i, (l, u) in enumerate(val):
entries.append(BoundaryEntry(new_path + [str(i)], l, u))
else:
l, u = val
entries.append(BoundaryEntry(new_path, l, u))
return entries
[docs]
def create_config(
values: Iterable[float] | None, paths: list[list[str]]
) -> dict[str, Any]:
"""
Build a nested configuration dictionary from flattened optimizer variables and paths.
Fractions are handled specially: the last fraction is calculated as
``1 - sum(others)``.
Args:
values (Iterable[float] | None): Variable values from the optimizer.
paths (list[list[str]]): Corresponding paths in the configuration dictionary.
Raises:
ValueError: If ``values`` is ``None``.
Returns:
dict[str, Any]: Nested dictionary usable by carbatpy.
"""
if values is None:
raise ValueError("Values must not be None")
config = {}
normal_paths = []
fraction_paths = []
# Separate fractions from other parameters
for path, value in zip(paths, values):
if "fractions" in path:
fraction_paths.append((path, float(value)))
else:
normal_paths.append((path, float(value)))
# Create config with other parameters
for path, value in normal_paths:
current = config
for key in path[:-1]:
if key not in current:
current[key] = {}
current = current[key]
current[path[-1]] = value
# Create list with fractions (last one is 1-sum(fractions))
fractions_by_base_path = {}
for path, value in fraction_paths:
base_path = path[:-1]
index = path[-1]
base_key = tuple(base_path)
if base_key not in fractions_by_base_path:
fractions_by_base_path[base_key] = {}
fractions_by_base_path[base_key][index] = value
for base_key, fractions_dict in fractions_by_base_path.items():
fractions_list = [val for _, val in fractions_dict.items()]
sum_existing = sum(fractions_list)
missing_value = 1 - sum_existing
fractions_list.append(missing_value)
# Add fractions to config
current = config
for key in base_key[:-1]:
if key not in current:
current[key] = {}
current = current[key]
current[base_key[-1]] = fractions_list
return config
[docs]
def warning_score(res: dict[str, Any], system_key: None | str = None) -> float:
"""
Compute a total warning score from carbatpy simulation results.
Args:
res (dict[str, Any]): Simulation results from carbatpy.
system_key (str | None): Specific system (``'hp'`` or ``'orc'``) to compute
warnings for. Defaults to ``None`` for all systems.
Returns:
float: Absolute sum of warnings (``0.0`` if none).
"""
w = res["warnings"] if system_key is None else res[system_key]["warnings"]
return abs(sum(item.value for item in w.values()))
[docs]
def apply_dt_min_start(
conf_act: dict[str, Any],
opti_fun: str,
paths: list[list[str]],
) -> dict[str, Any]:
"""
Set starting delta-T (``dT_min``) values in the config based on heat exchanger values.
Args:
conf_act (dict[str, Any]): Current configuration dictionary.
opti_fun (str): Cycle type. Must be one of ``'hp'``, ``'orc'``, or ``'cb'``.
paths (list[list[str]]): Paths to check whether ``dt_min`` is being optimized.
Returns:
dict[str, Any]: Updated configuration with starting ``dT_min`` values.
"""
if not any("dt_min" in p for p in paths):
return conf_act
if opti_fun == "cb":
hp = conf_act.setdefault("hp", {})
orc = conf_act.setdefault("orc", {})
hp_start = hp.setdefault("start", {})
orc_start = orc.setdefault("start", {})
hp_start["dt_min"] = hp["evaporator"]["dt_min"] + 0.001
orc_start["dt_min"] = orc["condenser"]["dt_min"]
elif opti_fun == "hp":
start = conf_act.setdefault("start", {})
start["dt_min"] = conf_act["evaporator"]["dt_min"]
elif opti_fun == "orc":
start = conf_act.setdefault("start", {})
start["dt_min"] = conf_act["condenser"]["dt_min"]
return conf_act
[docs]
def calc_costs(
results: dict[str, Any],
system: str,
verbose: bool = False,
) -> tuple[float, dict[str, Any], dict[str, Any]]:
"""
Calculate total system costs and cost distribution for HP, ORC, or CB.
For ``'cb'``, distinguishes between HP and ORC components to avoid
double counting shared components.
Args:
results (dict[str, Any]): Simulation results containing cost data.
system (str): System type. Must be one of ``'hp'``, ``'orc'``, or ``'cb'``.
verbose (bool): Print details about ORC cost components. Defaults to ``False``.
Raises:
ValueError: If ``system`` is not ``'hp'``, ``'orc'``, or ``'cb'``.
Returns:
tuple[float, dict[str, Any], dict[str, Any]]:
- Total system cost.
- HP cost dictionary.
- ORC cost dictionary.
"""
select = ("total_costs", "cost_distribution")
if system in ("hp", "orc"):
return results["costs"]["total_costs"], {}, {}
if system != "cb":
raise ValueError(system)
costs_hp = {k: results["hp"]["costs"][k] for k in select}
costs_orc = {k: results["orc"]["costs"][k] for k in select}
hp_total = sum(costs_hp["cost_distribution"].values())
new_orc_components = {
"expander",
"pump",
"condenser",
"evaporator",
}
orc_unique = sum(
c
for comp, c in costs_orc["cost_distribution"].items()
if comp in new_orc_components
)
if verbose:
print(
f"CB cost assumption: ORC unique components counted: {sorted(new_orc_components)}"
)
return hp_total + orc_unique, costs_hp, costs_orc
@dataclass(frozen=True)
[docs]
class EvalResult:
"""
Container for the results of a carbatpy optimization evaluation.
:param performance: Performance metric of the system
(e.g., COP or thermal efficiency).
:param costs: Total system cost calculated for the optimization case.
:param p_low: Minimum pressure value in the system,
used for constraints.
:param warn: Warning score representing thermodynamic or design
violations.
"""
[docs]
def opti_func(
x: Iterable[float],
dir_config: Any,
paths: list[list[str]],
opti_fun: str,
same_fluid: bool = True,
heat_losses: float = 0.0,
COP: float | None = None,
q_dot: float | None = None,
) -> EvalResult:
"""
Run a carbatpy simulation and return evaluation metrics for optimization.
Constructs a complete configuration from optimizer variables ``x`` and
the corresponding ``paths``, simulates the chosen cycle, and assesses
any thermodynamic warnings. Infeasible or violating results are returned
with heavily penalized values to guide the optimizer away from invalid
solutions.
Args:
x (Iterable[float]): Decision variable values generated by the optimizer.
dir_config (Any): General carbatpy configuration.
paths (list[list[str]]): Paths indicating where each value in ``x``
should be placed in the configuration dictionary.
opti_fun (str): Cycle to optimize. Must be one of ``'hp'``,
``'orc'``, or ``'cb'``.
same_fluid (bool): Whether to reuse the HP working fluid in the ORC.
Only used in ``'cb'`` mode. Defaults to ``True``.
heat_losses (float): Thermal losses in storage components.
Only used in ``'cb'`` mode. Defaults to ``0.0``.
COP (float | None): COP of the HP. Required if ``opti_fun='orc'``.
q_dot (float | None): Heat flow of the HP. Required if ``opti_fun='orc'``.
Raises:
ValueError: If an unknown ``opti_fun`` is specified or required
parameters are missing.
Returns:
EvalResult: Evaluation result containing:
- ``performance``: Cycle performance (e.g., COP or thermal efficiency).
- ``costs``: Total system costs.
- ``p_low``: Low-side pressure, used for constraints.
- ``warn``: Warning score (``0.0`` = no violations;
high values indicate infeasible solutions).
Note:
In the presence of warnings, costs are penalized with ``1e12``
to discourage infeasible solutions during optimization.
"""
try:
conf_act = create_config(x, paths)
conf_act = apply_dt_min_start(conf_act, opti_fun, paths)
if opti_fun == "cb":
performance, res = cb.cb_comp.cb_calc(
dir_config,
same_fluid=same_fluid,
heat_losses=heat_losses,
config=conf_act,
plotting=False,
verbose=False,
)
p_low = res["hp"]["output"]["start"]["p_low"]
warn_hp = warning_score(res, system_key="hp")
if warn_hp > 0:
return EvalResult(0.0, 1e12, p_low, 100.0 + float(warn_hp))
warn_orc = warning_score(res, system_key="orc")
if warn_orc > 0:
return EvalResult(0.0, 1e12, p_low, float(warn_orc))
costs, _, _ = calc_costs(res, opti_fun)
return EvalResult(float(performance), float(costs), p_low, 0.0)
elif opti_fun == "hp":
res = cb.hp_comp.heat_pump(
dir_config, config=conf_act, verbose=False, plotting=False
)
p_low = res["output"]["start"]["p_low"]
warn = float(warning_score(res))
if warn > 0:
return EvalResult(0.0, 1e12, p_low, warn)
performance = res["COP"]
costs, _, _ = calc_costs(res, opti_fun)
return EvalResult(performance, float(costs), p_low, 0.0)
elif opti_fun == "orc":
res = cb.orc_comp.orc(
dir_config, COP, q_dot, config=conf_act, verbose=False, plotting=False
)
p_low = res["output"]["start"]["p_low"]
warn = float(warning_score(res))
if warn > 0:
return EvalResult(0.0, 1e12, 1e4, warn)
performance = res["eta_th"]
costs, _, _ = calc_costs(res, opti_fun)
return EvalResult(performance, float(costs), p_low, 0.0)
raise ValueError(f"Unknown opti_fun={opti_fun}")
except:
return EvalResult(0.0, 1e12, 1e4, 1e6)
# Pymoo initialization
[docs]
class CombinedTermination(Termination):
"""
Termination criteria combining multiple conditions for Pymoo optimizations.
Combines:
- Maximum number of generations (:class:`MaximumGenerationTermination`)
- Multi-objective function tolerance with robust checking
(:class:`RobustTermination`)
:param max_gen_termination: Termination based on maximum number
of generations.
:type max_gen_termination: MaximumGenerationTermination
:param ftol_termination: Termination based on objective function
tolerance over a rolling period.
:type ftol_termination: RobustTermination
:param criteria: List of active termination criteria.
:type criteria: list[Termination]
"""
def __init__(
self,
max_gen: int,
ftol: float,
period: int,
n_skip: int = 5,
) -> None:
"""
Initialize the combined termination.
Args:
max_gen (int): Maximum number of generations.
ftol (float): Tolerance for objective function changes.
period (int): Number of generations to check tolerance over.
n_skip (int): Number of initial generations to skip for robust
checking. Defaults to ``5``.
"""
super().__init__()
[docs]
self.max_gen_termination = MaximumGenerationTermination(max_gen)
[docs]
self.ftol_termination = RobustTermination(
MultiObjectiveSpaceTermination(ftol, only_feas=True, n_skip=n_skip),
period=period,
)
# Combine all criteria
[docs]
self.criteria = [
self.ftol_termination,
self.max_gen_termination,
]
def _update(self, algorithm):
"""
Update all termination criteria based on the current algorithm state.
Args:
algorithm: Pymoo algorithm instance.
Returns:
float: Maximum value across all criteria
(``1.0`` if any criterion is met, else ``0.0``).
"""
p = [criterion.update(algorithm) for criterion in self.criteria]
criteria_names = [
"Objective values tolerance",
"Max number of generations",
]
for i in range(len(p)):
if p[i] == 1:
print(f"{criteria_names[i]} reached!")
return max(p)
# Define class for pymoo
[docs]
class OptiProblem(ElementwiseProblem):
"""
Pymoo :class:`ElementwiseProblem` wrapper for carbatpy optimizations.
Handles different optimization modes (``'hp'``, ``'orc'``, ``'cb'``)
and sets up variables, bounds, constraints, and the evaluation function
to interface with Pymoo.
:param dir_config: General configuration passed to carbatpy.
:param paths_var: Paths to decision variables in the config.
:type paths_var: list[list[str]]
:param opti_fun: Optimized cycle (``'hp'``, ``'orc'``, or ``'cb'``).
:type opti_fun: str
:param same_fluid: Whether fluids are reused across cycles.
:type same_fluid: bool
:param heat_losses: Heat loss fraction in ``'cb'`` mode.
:type heat_losses: float
:param COP: COP used for ``'orc'`` optimizations.
:type COP: float | None
:param q_dot: Heat flow used for ``'orc'`` optimizations.
:type q_dot: float | None
"""
def __init__(self, **kwargs):
[docs]
self.dir_config: Any = kwargs.get("dir_config", None)
[docs]
self.paths_var: list[list[str]] = kwargs.get("paths", [])
[docs]
self.opti_fun: str = kwargs.get("opti_fun", "")
[docs]
self.same_fluid: bool = kwargs.get("same_fluid", True)
[docs]
self.heat_losses: float | int = kwargs.get("heat_losses", 0.0)
# Needed for ORC optimization
[docs]
self.COP = kwargs.get("COP", None)
[docs]
self.q_dot = kwargs.get("q_dot", None)
super().__init__(**kwargs)
def _evaluate(self, x, out) -> None:
"""
Evaluate a single solution.
Args:
x: Array of decision variable values for one candidate solution.
out (dict): Output dictionary to store results.
Sets:
out["F"]: Objective value(s) — negated performance for minimization,
optionally with scaled costs as second objective.
out["G"]: Constraint violations:
- ``g1``: Minimum performance constraint.
- ``g2``: Minimum low-side pressure constraint.
- ``g3``: Warning score (thermodynamic violations).
"""
res = opti_func(
x,
dir_config=self.dir_config,
paths=self.paths_var,
opti_fun=self.opti_fun,
same_fluid=self.same_fluid,
heat_losses=self.heat_losses,
COP=self.COP,
q_dot=self.q_dot,
)
PERF_MIN = 0.05
HP_P_LOW_MIN = 1e5
g1 = PERF_MIN - res.performance
g2 = (HP_P_LOW_MIN - res.p_low) / HP_P_LOW_MIN
g3 = res.warn
out["G"] = [g1, g2, g3]
f1 = -res.performance
if self.n_obj == 1:
out["F"] = f1
elif self.n_obj == 2:
f2 = res.costs / 1e6 # Scale to 1e6 for better handling
out["F"] = [f1, f2]