import multiprocessing
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.operators.sampling.lhs import LHS
from pymoo.operators.survival.rank_and_crowding import RankAndCrowding
from pymoo.optimize import minimize
from pymoo.parallelization.starmap import StarmapParallelization
from carbatpy.optimizations.helpers_optimization import (
CombinedTermination,
OptiProblem,
default_n_processes,
extract_boundary_with_path,
)
def _build_case(
mode: str,
boundaries: dict,
same_fluid: bool,
heat_losses: float,
) -> tuple[str, dict, dict]:
"""
Prepare the optimization case for NSGA2.
Currently, only the ``'cb'`` (combined) mode is supported for NSGA2.
Other modes will raise a :exc:`ValueError`.
Args:
mode (str): Optimization mode. Must be ``'cb'``.
boundaries (dict): Boundaries for all optimization variables.
same_fluid (bool): Whether the working fluid is the same across all cycles.
heat_losses (float): Fraction of heat losses in the system.
Raises:
ValueError: If ``mode`` is not ``'cb'``, ``'hp'``, ``'orc'`` .
Returns:
tuple[str, dict, dict]:
- Function identifier used in :class:`OptiProblem`.
- Boundaries dictionary.
- Additional keyword arguments required by the problem.
"""
if mode == "hp":
raise ValueError("Mode HP not yet implemented for NSGA2")
if mode == "orc":
raise ValueError("Mode ORC not yet implemented for NSGA2")
if mode == "cb":
return "cb", boundaries, {"same_fluid": same_fluid, "heat_losses": heat_losses}
raise ValueError("mode must be one of: 'hp' | 'orc' | 'cb'")
[docs]
def optimize(
mode: str,
config: dict | str,
boundaries: dict,
same_fluid: bool = True,
heat_losses: float = 0.0,
verbose: bool = True,
n_processes: int | None = None,
maxtasksperchild: int = 20,
n_ieq_constr: int = 3,
pop_size: int = 50,
n_offspring: int | None = None,
sampling_iterations: int = 50,
crowding_func: str = "pcd",
eliminate_duplicates: bool = True,
n_gen: int = 100,
period: int = 20,
ftol: float = 1e-4,
save_history: bool = False,
return_least_infeasible: bool = False,
):
"""
Perform a multi-objective optimization using NSGA2.
Sets up the problem, initializes the NSGA2 algorithm, handles parallelization,
and executes the optimization using Pymoo.
Args:
mode (str): Optimization mode. Must currently be ``'cb'``.
config (dict | str): Configuration dictionary or path to configuration file.
boundaries (dict): Lower and upper bounds for all optimization variables.
same_fluid (bool): Whether the working fluid is the same for all cycles.
Only used in ``'cb'`` mode. Defaults to ``True``.
heat_losses (float): Fraction of heat losses in the system.
Only used in ``'cb'`` mode. Defaults to ``0.0``.
verbose (bool): Whether to print progress information. Defaults to ``True``.
n_processes (int | None): Number of parallel processes.
Auto-detected via :func:`default_n_processes` if ``None``.
maxtasksperchild (int): Maximum number of tasks per worker process.
Defaults to ``20``.
n_ieq_constr (int): Number of inequality constraints. Defaults to ``3``.
pop_size (int): Population size for NSGA2. Defaults to ``50``.
n_offspring (int | None): Number of offspring per generation.
Defaults to NSGA2 internal rules if ``None``.
sampling_iterations (int): Iterations for Latin Hypercube sampling.
Defaults to ``50``.
crowding_func (str): Crowding distance function for survival selection.
Defaults to ``'pcd'``.
eliminate_duplicates (bool): Whether to remove duplicate solutions.
Defaults to ``True``.
n_gen (int): Maximum number of generations. Defaults to ``100``.
period (int): Period for robust termination checking. Defaults to ``20``.
ftol (float): Function tolerance for termination. Defaults to ``1e-4``.
save_history (bool): Whether to save the optimization history.
Defaults to ``False``.
return_least_infeasible (bool): Whether to return the least infeasible
solution if no feasible solution is found. Defaults to ``False``.
Raises:
ValueError: If ``boundaries`` is ``None`` or ``mode`` is not supported.
Returns:
pymoo.core.result.Result: Result object containing the Pareto front,
best solutions, and other optimization metadata.
"""
multiprocessing.freeze_support()
if boundaries is None:
raise ValueError("boundaries must be provided.")
if n_processes is None:
n_processes = default_n_processes()
fun_to_optimize, bounds, problem_kwargs = _build_case(
mode,
boundaries=boundaries,
same_fluid=same_fluid,
heat_losses=heat_losses,
)
paths, xl, xu = extract_boundary_with_path(bounds)
pool = multiprocessing.Pool(n_processes, maxtasksperchild=maxtasksperchild)
runner = StarmapParallelization(pool.starmap)
try:
problem = OptiProblem(
dir_config=config,
paths=paths,
opti_fun=fun_to_optimize,
n_var=len(paths),
n_obj=2,
n_ieq_constr=n_ieq_constr,
xl=xl,
xu=xu,
elementwise_runner=runner,
**problem_kwargs,
)
sampling = LHS(iterations=sampling_iterations)
survival = RankAndCrowding(crowding_func=crowding_func)
algorithm = NSGA2(
pop_size=pop_size,
n_offsprings=n_offspring,
sampling=sampling, # type: ignore
survival=survival,
eliminate_duplicates=eliminate_duplicates,
)
termination = CombinedTermination(max_gen=n_gen, period=period, ftol=ftol)
result = minimize(
problem,
algorithm,
termination=termination,
save_history=save_history,
return_least_infeasible=return_least_infeasible,
verbose=verbose,
)
return result
finally:
pool.close()
pool.join()