Source code for carbatpy.models.components.comp

import numpy as np
import scipy.optimize as opti
import pandas as pd
import matplotlib.pyplot as plt
from abc import ABC, abstractmethod
from datetime import datetime
import os
from pathlib import Path
import copy
import json
import logging
from carbatpy import CB_DEFAULTS
from carbatpy.utils.io_utils import read_config, read_component, _set_temp
from carbatpy.utils.cost_calculation import get_cost_inst
from types import SimpleNamespace


[docs] warn = SimpleNamespace(value=0, message="All O.K.")
# Logging-Konfiguration logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
[docs] logger = logging.getLogger(__name__)
[docs] class Component(ABC): """Components of thermodynamic cycles. All components have the following attributes: - ``config``: basic configuration incl. calculation method - ``inputs``: inputs for calculations - ``output``: main results - ``all_data``: detailed data points for plotting and post-processing - ``warning``: a value and a message (0, "All o.k.") All components implement the following methods: - ``initialize``: set values that do not depend on a specific calculation - ``calculate``: main calculation routine - ``plot``: plot changes in a joint diagram - ``save_results``: save results and inputs to a directory """ def __init__(self, name, config, **kwargs): """ Set the basic configuration of a component. Parameters ---------- name : string name of the component to be configured, must be the same as in the config dictionary or file. config : string or dictionary if it is a string it must be the filename with a yaml or json file with the dictionary information. All needed basic parameters or default parameters and calculation type are set here. Check the io_utils.py file for typical values. **kwargs : dictionary any further parameters which may be needed. Returns ------- None. """
[docs] self.name = name
[docs] self.config = read_config(config)
[docs] self.working_fluid = self.config['working_fluid']['species']
[docs] self.inputs = read_component(config, name )
[docs] self.fl_name = list(self.inputs['states'].keys())[0]
[docs] self.calc_type = self.config[name].get('calc_type')
[docs] self.verbose = kwargs.get('verbose', False)
[docs] self.logger = logging.getLogger(f"{__name__}.{self.name}")
# Standard values of variables
[docs] self.output = {}
[docs] self.all_data = None
[docs] self.warning = copy.copy(warn)
self._points = CB_DEFAULTS["Components"]["n_points"]
[docs] self.cost = 0
# Initializing self.initialize(**kwargs) @abstractmethod
[docs] def initialize(self, **kwargs): pass
@abstractmethod
[docs] def calculate(self, in_states=None, out_states=None, run_param=None, **kwargs): pass
@abstractmethod
[docs] def estimate_costs(self, parameters=None, **kwargs): return 0
@abstractmethod
[docs] def plot(self, plot_info, **kwargs): pass
[docs] def entropy_production(self, temp_amb=None, **kwargs): s_prod = 0 for key, val in self.output["m_dot"].items(): s_prod += (self.output['state_out'][key] - self.output['state_in'][key])[4] * val if temp_amb == None: temp_amb = CB_DEFAULTS['General']['T_SUR'] exergy_destruction = temp_amb * s_prod self.output.update({"entropy_prod_rate": s_prod, "exergy_destruction_rate": exergy_destruction})
def _plot_shift(self, first, second, first_direction): h_in = self.output['state_in']['working_fluid'][2] h_out = self.output['state_out']['working_fluid'][2] first = np.array(first) second = np.array(second) shift = first.max() if first_direction == -1: shift = first.min() direction = +1 if h_out < h_in: shift = shift - (second.max() - second.min()) direction = -1 return shift, direction
[docs] def save_results(self, base_dir): output_path = os.path.join(base_dir, f"{self.name}_output.json") with open(output_path, 'w') as f: json.dump(self.output, f, indent=4) if isinstance(self.all_data, np.ndarray): data_path = os.path.join(base_dir, f"{self.name}_all_data.npy") np.save(data_path, self.all_data) elif isinstance(self.all_data, pd.DataFrame): data_path = os.path.join(base_dir, f"{self.name}_all_data.csv") self.all_data.to_csv(data_path, index=False) self.logger.info(f"Results saved for {self.name}")
def _storage_name(self, fluidnames): for name in fluidnames: if name != "working_fluid": return name
[docs] class FlowDevice(Component):
[docs] def initialize(self, **kwargs): pass
[docs] def estimate_costs(self, parameters=None, **kwargs): cost_name = self.inputs['parameters']['name_cost'] year = self.config["process"]["year"] attribute = self.output['power'] if self.name == 'pump': v_in = self.output["state_in"]['working_fluid'][3] m_dot = self.output["m_dot"]['working_fluid'] attribute = v_in * m_dot if self.name == 'throttle': p_ratio = self.output['state_in']["working_fluid"][1] / \ self.output['state_out']["working_fluid"][1] attribute = p_ratio self.cost = get_cost_inst().Towler_Method({"Category": self.name, "Component Name": cost_name, "Component Attribute": np.abs(attribute), }, Desired_year=year ) return self.cost
[docs] def plot(self, plot_info=None): fl_name = "working_fluid" if not plot_info: fig, ax = plt.subplots(1) plot_info = CB_DEFAULTS["Components"]["Plot"] plot_info.update({"ax": ax, "fig": fig}) plot_info["label"][0] = self.name if plot_info["what"][0] == 2: data = np.array([self.output["state_in"][fl_name][plot_info["what"][0]], self.output["state_out"][fl_name][plot_info["what"][0]]]) * \ self.output["m_dot"][fl_name] data = data - data.min() shift = self._plot_shift(plot_info["x-shift"], [data.min(), data.max()], plot_info["direction"]) data = data - data.min() + shift[0] plot_info["ax"].plot(data, [self.output["state_in"][fl_name][plot_info["what"][1]], self.output["state_out"][fl_name][plot_info["what"][1]]], plot_info["col"][0], label=plot_info["label"][0]) plot_info["ax"].legend(ncol=4, loc="upper center", bbox_to_anchor=(0.5, -0.15), frameon=False) plot_info["ax"].grid(True) return [data.min(), data.max()], shift[1] else: self.warning.__dict__.update( value=130, message=f"Pump: plotting only implemented fot T-H_dot [2,0]. You requested{plot_info['what']}") print(self.warning)
[docs] class Throttle(FlowDevice): "Simple isenthalpic throttle."
[docs] def initialize(self, **kwargs): super().initialize(**kwargs)
[docs] def calculate(self, in_states, out_states, run_param=None, **kwargs): if self.verbose: self.logger.info(f"Calculating {self.name}") # ... Berechnungslogik hier ... fl_name = list(self.inputs['states'].keys())[0] # if not within optimization, is the low pressure within 10 Pa of the # wanted value from the configuration dictionary? if run_param != None: m_dot = run_param["m_dot"][fl_name] else: m_dot = 1. self.warning.value=22, self.warning.message=f""""Throttle mass flow rate at default value {m_dot:.2f} kg/s""" fluid_act = self.inputs["act_fluids"][self.fl_name] self.p_out = out_states[self.fl_name][1] in_state_wf = in_states[self.fl_name] state_in = fluid_act.set_state([in_state_wf[1], in_state_wf[2]], "PH") if isinstance(state_in, str): print(state_in, "function/value needed") if fluid_act.properties.pressure < self.p_out: self.warning.value = (-fluid_act.properties.pressure + self.p_out) / self.p_out * 10 self.warning.message = f"Throttle pressure must drop! Output-p: {self.p_out} is higher than input p: {fluid_act.properties.pressure }" if self.calc_type == "const_h": state_out = fluid_act.set_state( [fluid_act.properties.enthalpy, self.p_out], "HP") else: self.warning.value = 101 self.warning.message = f""""The option{self.calc_type} is not yet implemented for throttles""" raise Exception( self.warning_message) self.inputs["act_fluids"][self.fl_name] = fluid_act # self.output.update({"state_in": {self.fl_name: state_in}, "state_out": {self.fl_name: state_out}, "work_specific": 0.0, "power": 0.0, "m_dot": {self.fl_name: m_dot}, "warning": self.warning}) self.entropy_production() self.all_data = {self.fl_name: np.array([state_in, state_out]) }
[docs] class HeatExchanger(Component):
[docs] def initialize(self, **kwargs): super().initialize(**kwargs) self._dt_min = self.config.get('dt_min') self._overall_u = self.config.get('overall_u') self._fixed = self.inputs['parameters']['fixed'] self._q_dot = self.config.get('q_dot')
@abstractmethod
[docs] def calculate_volume(self, parameters=None, **kwargs): pass
[docs] def estimate_costs(self, parameters=None, **kwargs): cost_name = kwargs.get( 'cost_name', self.inputs['parameters']['name_cost']) year = kwargs.get('year', self.config["process"]["year"]) self.cost = get_cost_inst().Towler_Method({"Category": self.name, "Component Name": cost_name, "Component Attribute": self.area, }, Desired_year=year ) return self.cost
[docs] class StaticHeatExchanger(HeatExchanger):
[docs] def initialize(self, **kwargs): super().initialize(**kwargs) self.warning.__dict__.update(value=0, message="All o.k.") self.output = {} self._set_defaults()
def _set_defaults(self): """ Set the default states as read from the configuration file or dictionary.""" inputs = self.inputs["states"] self._out_def = {} self._in_def = {} for flname in inputs.keys(): self._out_def[flname] = inputs[flname]['out'] self._in_def[flname] = inputs[flname]['in'] if "q_dot" in self.inputs["parameters"].keys(): self._q_dot = self.inputs["parameters"]["q_dot"]
[docs] def set_in_out(self, new_state, instate=True): """ Replace either an old input state or output state by a new one. Needed for the calculation of the heat exchanger. Parameters ---------- new_state : dict Dictionary mapping fluid names to state arrays. Format: ``{"fluid-name": numpy_array_with_state}`` instate : bool, default=True If True, replaces input state; if False, replaces output state. Returns ------- in_state : dict Input state dictionary to pass to calculate(). out_state : dict Output state dictionary to pass to calculate(). """ inputs = self.inputs["states"] out_state = {} in_state = {} for flname, value in inputs.items(): out_state[flname] = inputs[flname]['out'] in_state[flname] = inputs[flname]['in'] for key, val in new_state.items(): if instate: in_state[key] = val self.inputs['states']['working_fluid']['in'] = val else: out_state[key] = val self.inputs['states']['working_fluid']['out'] = val # if there is a string for the output, set the output state: if isinstance(self.inputs['states']['working_fluid']['out'], str): self._set_w_out() out_state = self._out_def return in_state, out_state
[docs] def calculate(self, in_states=None, out_states=None, run_param=None, **kwargs): self.warning.__dict__.update(value=0, message="All o.k.") if self.verbose: self.logger.info(f"Calculating {self.name}") self._run_param = run_param if "q_dot" in run_param: self._q_dot = run_param["q_dot"] if isinstance(self._q_dot, str): self._q_dot = run_param['q_dot'] if not in_states: in_states = self._in_def self.warning .__dict__.update( value=2, message="Default values used for I/O") if not out_states: out_states = self._out_def self.warning .__dict__.update( value=2, message="Default values used for I/O") fluid_names = list(self.inputs["act_fluids"].keys()) stn = self._storage_name(fluid_names) self.output.update({"m_dot": {}, "state_in": {}, "state_out": {}, }) self.all_data = {} w_out = {} self._heating = -1 mdw_calc = True match self._fixed: # noqa case 'q_dot': pass case 'm_dot_w' | 'q_m_dot_w': try: # {"m_dot": {"working_fluid": m_dot_w}} self.output['m_dot']['working_fluid'] = run_param["m_dot"]['working_fluid'] self.output['m_dot'][stn] = None mdw_calc = False except: raise ValueError( f"{self.name}: m_dot not set in run_param!") # case 'q_m_dot_w': """#BA 2024-08-15-continue here This is too early . must come later self.output Out [2]: {'m_dot': {}, 'state_in': {}, 'state_out': {}}""" # self.output['m_dot'][stn] = np.abs(self._q_dot / # (self.output['state_in'][stn][2] # - self.output['state_out'][stn][2])) # dh_w = self._q_dot/self.output['m_dot']['working_fluid'] # self.output['state_out'][fln] = w_in.set_state([out_states['working_fluid'][1], # in_states['working_fluid'][2] + dh_w], # "PH") case _: raise NotImplementedError(f''''StaticHeatExchanger: {self.inputs['parameters']['fixed']} not implemented!''') if self._fixed == 'q_m_dot_w': # BA 2024-08-15-continue here This is too early . must come later self.output Out [2]: {'m_dot': {}, 'state_in': {}, 'state_out': {}}""" dh_w = self._q_dot/self.output['m_dot']['working_fluid'] w_in_w = copy.copy(self.inputs['act_fluids']['working_fluid']) h_out_w = self.inputs['states']["working_fluid"]['in'][2] - dh_w out_states['working_fluid'] = w_in_w.set_state([w_in_w.val_dict["Pressure"], h_out_w], "PH") for fln in ["working_fluid", stn]: w_in = copy.copy(self.inputs['act_fluids'][fln]) if isinstance(in_states[fln], str) and in_states[fln] == 'ambient': #BA this does not seem to be true ever, delete?2025-12-04 t_amb = _set_temp(in_states[fln]) if fln != stn: t_amb += self.heating*self.inputs['parameters']["dt_min"] p_act = self.config[fln][self.config[self.name] ["species"][fln]["p_in"]] self.output['state_in'][fln] = w_in.set_state([t_amb, p_act], "TP") self.output['state_in'][fln] = w_in.set_state([in_states[fln][1], in_states[fln][2]], "PH") self.output['state_out'][fln] = w_in.set_state([out_states[fln][1], out_states[fln][2]], "PH") w_out[fln] = copy.copy(w_in) for fln in ["working_fluid", stn]: if not mdw_calc: if fln == "working_fluid": # if mdot for working fluid is fixed self._q_dot = (self.output['state_in'][fln][2] - self.output['state_out'][fln][2]) * self.output['m_dot'][fln] self.output['m_dot'][stn] = np.abs(self._q_dot / (self.output['state_in'][stn][2] - self.output['state_out'][stn][2])) elif self._fixed == "q_dot": # q_dot is fixed self.output['m_dot'][fln] = np.abs(self._q_dot / (self.output['state_in'][fln][2] - self.output['state_out'][fln][2])) else: raise NotImplementedError( f"HeatExch, this fixed value is not implemented {self._fixed}") if fln == "working_fluid": temp_w_in = self.output['state_in'][fln][0] self.all_data[fln] = self._calculate_state_array(w_out[fln], [self.output['state_in'][fln], self.output['state_out'][fln]]) else: temp_s_in = self.output['state_in'][fln][0] self.all_data[fln] = self._calculate_state_array(w_out[fln], [self.output['state_out'][fln], self.output['state_in'][fln] ]) if temp_w_in > temp_s_in: self._heating = 1 d_tempall = self.all_data["working_fluid"][:, 0] - self.all_data[stn][:, 0] self.dt_mean, self.dt_min, self.dt_max = d_tempall.mean(), np.abs( d_tempall).min(), np.abs(d_tempall).max() self._check_temperature_consistency(d_tempall) self.area = np.abs(self._q_dot / self.dt_mean / self.inputs['parameters']['overall_u']) self.output.update({"dt_mean": self.dt_mean, "dt_min": self.dt_min, "dt_max": self.dt_max, "q_dot": -self._q_dot, "area": self.area, }) self.entropy_production()
def _calculate_state_array(self, fluid, h_range): h_array = np.linspace(h_range[0][2], h_range[1][2], self._points) values = np.zeros((self._points, 2)) values[:, 0] = h_array values[:, 1] = h_range[0][1] return fluid.set_state_v(values, "HP") def _check_temperature_consistency(self, d_tempall): self.warning.__dict__.update(value=0, message="All o.k.") eps_min = -1e-3 dt_min_sep = self.inputs["parameters"]["dt_min"] positive = np.any(d_tempall > 0) negative = np.any(d_tempall < 0) below = True if self._heating < 0: below = False crossing = (positive > 0 and negative > 0) wrong_side = (positive > 0 and not below) or (negative > 0 and below) abs_dt_min = np.abs(self.dt_min) difference = abs_dt_min - dt_min_sep if difference < eps_min: self.warning .__dict__.update( value=np.abs(difference), message="Below minimum approach temperature!") elif crossing or wrong_side: val_ = np.abs((d_tempall.max() - d_tempall.min()) / (d_tempall.max() + d_tempall.min())) self.dt_mean = 1e6 self.warning.__dict__.update( value=val_, message="Temperatures crossing or wrong side!") else: pass def _set_w_out(self): stn = self._storage_name(self.inputs['act_fluids'].keys()) fl_name = "working_fluid" s_state = self.inputs['act_fluids'][stn].val_dict w_state = self.inputs['states']['working_fluid']['in'] sign = 1 if s_state["Temperature"] > w_state[0]: sign = -1 t_w_out = s_state["Temperature"] + sign * \ self.inputs['parameters']['dt_min'] w_out = self.inputs['act_fluids']['working_fluid'].set_state( [w_state[1], t_w_out], 'PT') self.inputs['states'][fl_name]['out'] = w_out self._out_def[fl_name] = w_out return w_out
[docs] def plot(self, plot_info, **kwargs): """ Plots the T-H_dot diagram (shifted enthalpy flow rate). This method visualizes the temperature profile against the enthalpy flow rate, allowing for the analysis of temperature differences (pinch points) within the component. Parameters ---------- plot_info : dict Configuration for the matplotlib plot. Required keys: * "fig": Matplotlib figure object. * "ax": Matplotlib axes object. * "what": List of indices [x, y], e.g., [2, 0] for (h, T). * "col": List of color/style strings, e.g., ["r:", "ko"]. * "label": List of labels for the legend. * "x-shift": List of initial shifts for the x-axis. Example: ``{"fig": fig, "ax": ax, "what": [2, 0], "col": ["r:", "k-"]}`` **kwargs Additional plotting parameters passed to ax.plot(). Returns ------- list A list containing the min and max values of the plotted data: [min, max]. float The actual shift applied to the enthalpy flow rate (H_dot). """ if plot_info["what"][0] == 2: for ii, fln in enumerate(self.all_data.keys()): data = (self.all_data[fln][:, plot_info["what"][0]] - self.all_data[fln][:, plot_info["what"][0]].min()) * self.output["m_dot"][fln] shift = self._plot_shift( plot_info["x-shift"], [data.min(), data.max()], plot_info["direction"]) data = data - data.min() + shift[0] plot_info["ax"].plot(data, self.all_data[fln][:, plot_info["what"][1]], plot_info["col"][ii], label=plot_info["label"][0]) else: print( f"H-Ex: plotting only implemented for T-H_dot [2,0]. You requested {plot_info['what']}") self._plot_shift = self.all_data[fln][:, plot_info["what"][0]].min() * \ self.output["m_dot"][fln] plot_info["ax"].legend(ncol=4, loc="upper center", bbox_to_anchor=(0.5, -0.15), frameon=False) plot_info["ax"].grid(True) return [data.min(), data.max()], shift[1]
def _change_h_w_out(self, h_out_new): inputs = self.inputs["states"] out_state = {} in_state = {} for flname, value in inputs.items(): out_state[flname] = inputs[flname]['out'] in_state[flname] = inputs[flname]['in'] out_state["working_fluid"][2] = h_out_new[0] return in_state, out_state def _opti_h_func(self, h_act_out, m_dot_w, inp_o=None, out_o=None, verbose=False): run_p_hex = {"m_dot": {"working_fluid": m_dot_w}} instates, outstates = self._change_h_w_out(h_act_out) self.calculate(instates, outstates, run_param=run_p_hex) if verbose: print(self.output["dt_mean"], self.warning) if self.warning.value > 2: return +100 return np.abs(self.output["dt_mean"])
[docs] def hex_opti_work_out(self, inp_act=None, out_act=None, run_p_par=None, verbose=False): """ Optimize the mean temperature difference by varying the working Fluid output state. Only enthalpy is variied within +/- 5%, pressure remains unchanged. Parameters ---------- run_p_cond : dictionary A value for {"m_dot": 0.01} must be provided, it is kept constant. verbose : boolean, optional If you want some printing. The default is False. Returns ------- None. """ self.calculate(in_states=inp_act, out_states=out_act, run_param=run_p_par) if verbose: print( f"opti_work_out: T-mean before optimization= {condenser.output['dt_mean']} K") # condenser._change_h_w_out =_change_h_w_out tolerance = 1e-2 max_iter = 240 h_act = self.output["state_out"]["working_fluid"][2] * 1.005 dh = h_act * .051 bound_act = opti.Bounds(lb=h_act-dh, ub=h_act+dh) result = opti.minimize(self._opti_h_func, h_act, args=(run_p_par['m_dot']["working_fluid"], inp_act, out_act,), method='Nelder-Mead', tol=tolerance, bounds=bound_act, options={"maxiter": max_iter, # can take long! "disp": True}) if verbose: print(f"T-mean after optimization: {result.fun} K\n{result}")
[docs] def calculate_volume(self, parameters=None, **kwargs): """ Calculates the volume and mass of the thermal storage (two tanks). It is assumed that the secondary fluid entering the heat exchanger comes from one storage and the fluid leaving it is stored in the second storage. Both volumes and the total mass are calculated for an amount of thermal energy to be stored. The costs are also estimated for cone roof tanks. Parameters ---------- parameters : dict, optional A dictionary containing the following keys: Energy_stored : float Energy in J. The default is 1 MWh (3.6e9 J). time : float Time to be stored in s. If both 'time' and 'Energy_stored' are given, 'time' is used. The default value is taken from the configuration file at ['process']['time']. **kwargs Additional keyword arguments (none currently implemented). Returns ------- dict A dictionary (vol_results) containing: storage_volumes : dict The volumes for the entering and exiting states in m3. Example: {"state_in": vol1, "state_out": vol2}. energy_density : float Energy density in J/kg. mass_storage : float The total mass of the storage in kg. Energy_stored : float The actual energy stored in J. Storage_costs : dict Estimated costs for the entering and exiting state for cone roof tanks in currency units. """ cost_name = kwargs.get('cost_name', 'fixed roof storage tank Morandin') if parameters is None: time = self.config["process"]["time"] energy = np.abs(self.output["q_dot"] * time) else: if "time" in parameters.keys(): time = parameters["time"] energy = np.abs(self.output["q_dot"] * time) else: energy = parameters["Energy_stored"] time = energy / self.output["q_dot"] parameters = {"time": time, "Energy_stored": energy, 'unit': 'J', } for key in self.output["m_dot"].keys(): if key != "working_fluid": storage = key e_density = np.abs( self.output["q_dot"] / self.output["m_dot"][storage]) v_density_in = self.output["state_in"][storage][3] / e_density v_density_out = self.output["state_out"][storage][3] / e_density volumes = energy * \ np.array([v_density_in, v_density_out]) mass_storage = energy / e_density costs = [] for vol in volumes: costs.append(get_cost_inst().Towler_Method({"Category": 'tanks', "Component Name": cost_name, "Component Attribute": vol, }, Desired_year=self.config["process"]["year"] )) vol_results = {"storage_volumes": dict(zip(["state_in", "state_out"], volumes)), "energy_density": e_density, "mass_storage": mass_storage, "Energy_stored": energy, "time": time, "Storage_costs": dict(zip(["state_in", "state_out"], costs)), } self.output.update(vol_results) return vol_results
[docs] class Start(FlowDevice):
[docs] def initialize(self, **kwargs): """ Returns the starting state of the working_fluid only. Returns ------- None. """ super().initialize(**kwargs) self.warning.__dict__.update(value=0, message="All o.k.") m_dot = kwargs.get("m_dot", 1.) self.calculate() self._check_start() fl_name = "working_fluid" if isinstance(m_dot, float): m_dot ={fl_name:m_dot} self.output = {"state_out": {fl_name: self.inputs['states'][fl_name]['out']}, "state_in": {fl_name: self.inputs['states'][fl_name]['in']}, "m_dot": m_dot, # Dummy "p_low": self.p_low, "warning": self.warning} self.entropy_production() self.all_data = {fl_name: np.array([self.inputs['states'][fl_name]['in'], self.inputs['states'][fl_name]['out']]) }
[docs] def calculate(self, in_states=None, out_states=None, run_param=None, **kwargs): if self.verbose: self.logger.info(f"Calculating {self.name}") pass
def _check_start(self): """ Validate and adjust the low-pressure inlet state of the working fluid. The inlet state at the pump/compressor is checked against: (i) required superheat/subcooling and (ii) a minimum temperature difference to the secondary fluid. If constraints are violated, ``self.p_low`` is adjusted by shifting the saturation temperature. Notes ----- Heat pump (evaporator / compressor inlet): * Superheat constraint: ``T - T_sat >= dt_superheat`` * Min. approach to source: ``T_source - T >= dt_min`` ORC (condenser / pump inlet): * Subcool constraint: ``T_sat - T >= dt_subcool`` * Min. approach to sink: ``T - T_sink >= dt_min`` The correction uses: * ``sign = -1`` for heat pump (evaporator) * ``sign = +1`` for ORC (condenser) * ``T_sat_new = T_secondary + sign * (dt_approach + dt_min)`` * ``T_new = T_sat_new - sign * dt_approach`` Side Effects ------------ * Updates ``self.p_low`` if adjustment is needed. * Updates ``self.warning.value`` and ``self.warning.message``. * Updates ``self.inputs['states']['working_fluid']['in']`` with a corrected inlet state. """ # ======================================================================== # 1. INITIALIZATION & SETUP # ======================================================================== # Get secondary fluid (storage) and working fluid names stn = self._storage_name(self.inputs['act_fluids'].keys()) fl_name = "working_fluid" # Extract current states s_state = self.inputs['act_fluids'][stn].val_dict # Secondary fluid state w_state = self.inputs['act_fluids'][fl_name].val_dict # Working fluid state # Store initial low pressure self.p_low = w_state['Pressure'] # Determine process type is_hp = self.config["process"]["name"] == "heat_pump" # ======================================================================== # 2. PROCESS-SPECIFIC PARAMETERS # ======================================================================== # Sign convention: # - HP (evaporator): -1 → T_source > T_working_fluid, heat absorbed # - ORC (condenser): +1 → T_working_fluid > T_sink, heat rejected sign = -1 if is_hp else 1 # Thermal approach constraint (superheat for HP, subcool for ORC) dt_approach = (self.inputs["parameters"]['dt_superheat'] if is_hp else self.inputs["parameters"]['dt_subcool']) # Minimum temperature difference to secondary fluid dt_min = self.inputs["parameters"]['dt_min'] # Numerical tolerance for constraint checks eps_diff = 1e-3 # Quality for saturation state: vapor (HP) or liquid (ORC) q_sat = 1 if is_hp else 0 # Flags and messages needs_adjust = False warning_counter = 0 warning_messages = [] # ======================================================================== # 3. CHECK THERMAL APPROACH (SUPERHEAT / SUBCOOL) # ======================================================================== # Calculate saturation state at current pressure sat_state = self.inputs['act_fluids'][fl_name].set_state( [w_state['Pressure'], q_sat], "PQ" ) t_sat_current = sat_state[0] # Saturation temperature [K or °C] # Calculate actual thermal approach if is_hp: # Superheating: T_actual - T_sat actual_approach = w_state["Temperature"] - t_sat_current approach_name = "Superheat" else: # Subcooling: T_sat - T_actual actual_approach = t_sat_current - w_state["Temperature"] approach_name = "Subcooling" # Check if approach meets requirement if actual_approach < dt_approach - eps_diff: needs_adjust = True warning_counter += 1 warning_messages.append( f"{approach_name} too low: {actual_approach:.3f} K < {dt_approach:.3f} K (target)" ) # ======================================================================== # 4. CHECK MINIMUM TEMPERATURE DIFFERENCE (dt_min) # ======================================================================== # Calculate temperature difference in correct direction if is_hp: # Heat Pump: T_source - T_working_fluid ≥ dt_min # (Source must be warmer to transfer heat to working fluid) delta_T = s_state["Temperature"] - w_state["Temperature"] delta_T_name = "T_source - T_wf" else: # ORC: T_working_fluid - T_sink ≥ dt_min # (Working fluid must be warmer to reject heat to sink) delta_T = w_state["Temperature"] - s_state["Temperature"] delta_T_name = "T_wf - T_sink" # Check if minimum temperature difference is satisfied if delta_T < dt_min - eps_diff: needs_adjust = True warning_counter += 1 warning_messages.append( f"Minimum temperature difference too low: {delta_T_name} = {delta_T:.3f} K < {dt_min:.3f} K (target)" ) # ======================================================================== # 5. PRESSURE ADJUSTMENT (IF NEEDED) # ======================================================================== if needs_adjust: # Calculate new saturation temperature to satisfy both constraints # Formula: T_sat_new = T_secondary + sign * (dt_approach + dt_min) # This ensures: # - Thermal approach = dt_approach # - Temperature difference = dt_min t_sat_new = s_state["Temperature"] + sign * (dt_approach + dt_min) # Get pressure corresponding to new saturation temperature w_sat_state = self.inputs['act_fluids'][fl_name].set_state( [t_sat_new, q_sat], "TQ" ) p_new = w_sat_state[1] # New pressure [Pa or bar] # Calculate new working fluid temperature # Formula: T_new = T_sat_new - sign * dt_approach t_new = t_sat_new - sign * dt_approach # Update low pressure self.p_low = p_new # Calculate new working fluid state at adjusted conditions w_fluid = self.inputs['act_fluids'][fl_name] w_new_state = w_fluid.set_state([p_new, t_new], "PT") # Update inlet state in storage self.inputs['states'][fl_name]['in'] = w_new_state # Add adjustment info to warning message warning_messages.append( f"Pressure adjusted from {w_state['Pressure']:.2f} to {p_new:.2f} bar" ) warning_messages.append( f"Temperature adjusted from {w_state['Temperature']:.2f} to {t_new:.2f} K/°C" ) # ======================================================================== # 6. UPDATE WARNING STATUS # ======================================================================== if warning_counter > 0: self.warning.value = warning_counter self.warning.message = " | ".join(warning_messages) else: # No violations detected self.warning.value = 0 self.warning.message = ""
# Optional: Verbose logging (if self.verbose is implemented) # if getattr(self, 'verbose', False): # print(f"[_check_start] Process: {self.config['process']['name']}") # print(f" Secondary T: {s_state['Temperature']:.2f}, WF T: {w_state['Temperature']:.2f}") # print(f" Actual approach: {actual_approach:.3f} K, Target: {dt_approach:.3f} K") # print(f" Delta T: {delta_T:.3f} K, dt_min: {dt_min:.3f} K") # if needs_adjust: # print(f" → Adjusted to p={self.p_low:.2f}, T={t_new:.2f}") # print(f" Warning: {self.warning.message}")
[docs] def create_component(name, config): from carbatpy.models.components.flowmachine_comp import FlowMachine component_type = config[name].get('model') if component_type == 'FlowDevice': if name == 'compressor': return FlowMachine(name, config) elif name == 'throttle': return Throttle(name, config) elif component_type == 'StaticHeatExchanger': return StaticHeatExchanger(name, config) elif component_type in ('Start', 'start'): return Start(name, config) else: raise ValueError(f"Unknown component type: {component_type, name}")
[docs] class Cycle: def __init__(self, config):
[docs] self.config = config
[docs] self.components = {}
[docs] self.fluids = {}
[docs] self.process = config['process']
[docs] self.fixed = config['process']['fixed']
[docs] self.logger = logging.getLogger(f"{__name__}.Cycle")
self.initialize()
[docs] def initialize(self): # Nur die im Prozess/Zyklus aufgeführten Komponenten initialisieren for component_name in self.process['cycle']: if component_name in self.config: self.components[component_name] = create_component( component_name, self.config) else: self.logger.warning( f"Component {component_name} mentioned in cycle but not found in configuration") # Fluide initialisieren if 'fluids_all' in self.config: for fluid_name, fluid_type in self.config['fluids_all'].items(): if fluid_name in self.config: self.fluids[fluid_name] = self.config[fluid_name] else: self.logger.warning( f"Fluid {fluid_name} mentioned in fluids_all but not found in configuration") self.logger.info("Cycle initialized")
[docs] def calculate(self, new_config=None, **kwargs): self.logger.info("Starting cycle calculation") warnings = {} outputs = {} def add_w_o(what): warnings[what.name] = what.warning outputs[what.name] = what.output m_dot_w = 1.0 outputs = {} if new_config is not None: for key in new_config: if key in self.config and isinstance(self.config[key], dict) and isinstance(new_config[key], dict): self.config[key].update(new_config[key]) else: self.config[key] = new_config[key] p_high = self.config['working_fluid']['p_high'] p_low = self.config['working_fluid']['p_low'] for pre_calc in [True, False]: for step in self.process['cycle']: comp = self.components.get(step) if comp is None: self.logger.warning(f"Step {step} not found in components") continue cfg = self.config.get(step, {}) # 1. Start-Komponente initialisieren if step.lower() == "start": comp.calculate(m_dot=m_dot_w) outputs[step] = comp.output prev_state_out = comp.output["state_out"] continue # 2. Falls Komponente "fixed" hat → Massenstrom bestimmen if pre_calc and comp.name in self.fixed: fixed_type = cfg["fixed"] if fixed_type == "power" and step == "compressor": # Beispiel: Kompressorleistung fixiert → m_dot berechnen comp.calculate( prev_state_out, {'working_fluid': [600, p_high, 5e5]}, run_param=self.fixed[comp.name]) m_dot_w = comp.output["m_dot"]['working_fluid'] m_dot = {"m_dot": {'working_fluid': m_dot_w, }} prev_state_out = comp.output["state_out"] add_w_o((comp)) self.logger.info( f"Mass flow set by {step}: {m_dot_w:.4f} kg/s") if pre_calc: print("Yes\n") break elif not pre_calc and comp.name in self.fixed: # Massenstrom schon gesetzt → skip calculation pass # 3. Normale Berechnung mit bekanntem m_dot if cfg['model'] in ('FlowDevice',): #BA 2025-08-20 'start hier für den 2. Durchgang? p_act = p_low if comp.name in ("pump", "compressor"): p_act = p_high out_state = {'working_fluid': [600, p_act, 5e5]} comp.calculate(prev_state_out, out_state, run_param=m_dot) prev_state_out = comp.output["state_out"] else: inp, outp = comp.set_in_out(prev_state_out) comp.calculate(inp, run_param=m_dot) # this is probably wrong for an evaporator of a heat pump prev_state_out = comp.output["state_out"] add_w_o((comp)) comp.estimate_costs() outputs[step] = comp.output self.logger.info("Cycle calculation completed") self.outputs = outputs
[docs] def plot(self): fig, axes = plt.subplots(len(self.components), 1, figsize=(10, 5*len(self.components))) axes = [axes] if len(self.components) == 1 else axes for i, (name, component) in enumerate(self.components.items()): component.plot(fig, axes[i]) plt.tight_layout() plt.show() self.logger.info("Cycle plots generated")
[docs] def save_results(self): base_dir = os.path.join( "results", datetime.now().strftime("%Y-%m-%d-%H-%M")) os.makedirs(base_dir, exist_ok=True) for component in self.components.values(): component.save_results(base_dir) self.logger.info(f"All results saved in {base_dir}")
# Die create_component Funktion bleibt unverändert # Verwendung
[docs] def main(dir_name=CB_DEFAULTS["General"]["CB_DATA"], new_config=None): dir_name = dir_name logger.info("Starting thermodynamic cycle calculation") cycle_config = read_config(dir_name) cycle = Cycle(cycle_config) cycle.calculate(new_config=new_config) cycle.plot() cycle.save_results() logger.info("Thermodynamic cycle calculation completed")
if __name__ == "__main__": from carbatpy.models.components.flowmachine_comp import FlowMachine
[docs] POWER_IN = 1.e6
verbose = True # ------- input data ----------- dir_name = Path(__file__).parent.parent.parent dir_name = dir_name / "data" / "io-hp-data.yaml" dir_name = str(dir_name) # ================ CALCULATIONS ============================== # ------ Start/initial condition ---- # but the mass flow rate is yet unknown, plotting must be delayed start = Start("start", dir_name, m_dot=10e-3) # ----- compressor -------------- # prescribed power, working_fluid mass flow rate is calculated here run_p_comp = {"power": POWER_IN} compressor = FlowMachine("compressor", dir_name, verbose=verbose) p_high = compressor.config['working_fluid']['p_high'] compressor.calculate(start.output["state_out"], {'working_fluid': [600, p_high, 5e5]}, run_param=run_p_comp) # , m_dot=10e-3) # for the output only p_high is used! Now m_dot is known for the working fluid. cost_compress = compressor.estimate_costs() m_dot_w = compressor.output["m_dot"]['working_fluid'] m_dot = {"m_dot": {"working_fluid": m_dot_w}} start = Start("start", dir_name, m_dot=m_dot_w) # ----- condenser -------------- run_p_cond = {"m_dot": {"working_fluid": m_dot_w}} condenser = StaticHeatExchanger("condenser", dir_name, verbose=verbose) inp_cond, _ = condenser.set_in_out({"working_fluid": compressor.output["state_out"]["working_fluid"]}) condenser.calculate(in_states=inp_cond, run_param={"m_dot": {"working_fluid": m_dot_w}}) condenser.hex_opti_work_out(inp_act=inp_cond, run_p_par=run_p_cond) volumes_c = condenser.calculate_volume( parameters={"time": 3.6e3, "Energy_stored": 3.6e6*10}, cost_name='fixed roof storage tank Morandin') print(f'Storage Volumes condenser: {volumes_c}\n') cost_cond = condenser.estimate_costs() # condenser.output["state_out"]["working_fluid"] # ----- Throttle -------------- throttle = Throttle("throttle", dir_name, verbose=verbose) throttle.calculate(condenser.output["state_out"], compressor.output["state_in"], run_param=m_dot) cost_t = throttle.estimate_costs() # ----- Evaporator -------------- evaporator = StaticHeatExchanger("evaporator", dir_name, verbose=verbose) inp1, outp1 = evaporator.set_in_out( {'working_fluid': throttle.output['state_out']["working_fluid"]}) inp2, outp2 = evaporator.set_in_out( start.output['state_in'], False) evaporator.calculate(inp1, outp2, run_param=run_p_cond) volumes_e = evaporator.calculate_volume( parameters={"time": 3.6e3, "Energy_stored": 3.6e6*10}, cost_name='fixed roof storage tank Morandin') cost_ev = evaporator.estimate_costs() total_costs = [cost_compress, cost_t, cost_cond, cost_ev, *evaporator.output['Storage_costs'].values(), *condenser.output['Storage_costs'].values()] names = ["compressor", "throttle", "condenser", "evaporator", "cold storage in", "cold storage out", "hot storage in", "hot storage out", ] cost_dict = dict(zip(names, total_costs)) sum_cost = np.sum(np.array([*cost_dict.values()])) objs = { "compressor": compressor, "throttle": throttle, "condenser": condenser, "evaporator": evaporator, } for key, val in cost_dict.items(): print(f"Costs: {key} {val:.3e} €, ratio: {val/sum_cost*100:.2f} %") obj = objs.get(key) if obj is None: continue warn = getattr(obj, "warning", None) # falls warning eine Methode ist, aufrufen if callable(warn): warn = warn() if warn is None: continue # falls warn ein Namespace mit 'message'/'value' ist, schön ausgeben msg = getattr(warn, "message", None) val = getattr(warn, "value", None) if msg is not None: print(f"{key} warning: {msg}") if val is not None: print(f"{key} deviation: {val}") else: print(f"{key} warning: {warn}") print( f"\ncosts total: {sum_cost:.3e} €,compressor power (in): {POWER_IN/1000:.3f} kW\n") print(f'Storage Volumes evaporator: {volumes_e}') print(f"COP: {np.abs(condenser.output['q_dot']/run_p_comp['power']):.4f}") # =========== Calculations finished ==================== # --------- plot preparation ------------ fig, ax = plt.subplots(1) plot_info = CB_DEFAULTS["Components"]["Plot"] plot_info.update({"ax": ax, "fig": fig}) pl_inf = plot_info.copy() # for the starting point (dot) pl_inf.update({"label": ["start", ""], "col": ["ok", "bv"], "direction": 1, }) # # Plotting starts shift, direct = start.plot(pl_inf) plot_info.update({"x-shift": shift, "direction": direct, "label": [compressor.name, ""], "col": ["-r", "bv-"]}) shift, direct = compressor.plot(plot_info) plot_info.update({"x-shift": shift, "direction": direct, "label": [condenser.name, ""], "col": [":r", "rv-"]}) shift, direct = condenser.plot(plot_info) plot_info.update({"x-shift": shift, "direction": direct, "label": [throttle.name, ""], "col": ["-b", "bv-"]}) shift, direct = throttle.plot(plot_info) plot_info.update({"x-shift": shift, "direction": direct, "label": [evaporator.name, ""], "col": [":b", "bv-"]}) shift4 = evaporator.plot(plot_info) for proc in [compressor, condenser, throttle, evaporator]: print( f'{proc.name} --Exergy destr.rate: {proc.output["exergy_destruction_rate"]:.2e} W, costs: {proc.cost:.2e}') if 'Storage_costs' in proc.output.keys(): print(f"\tStorage costs: {proc.output['Storage_costs']}")