Source code for carbatpy.models.components.surrogates

# -*- coding: utf-8 -*-
"""
Created on Tue Jul 30 16:15:25 2024
structure to create Surrogate models
01.08.2024: for multilayer perceptrons (have proven themselves)
needs a labeled DataFrame with training data

@author: welp
"""
import carbatpy as cb
import pandas as pd
import joblib
import os
import pickle
import numpy as np
import yaml

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import RandomizedSearchCV

from datetime import datetime

[docs] class Surrogate: def __init__(self, title):
[docs] self.title = title
def _make_xy(self, DF): ''' creates feature and target array and dataframe ''' #create X and y from feature and target list self.DF_x = DF[self.features_list] self.DF_y = DF[self.targets_list] self.x_data_arr = self.DF_x.to_numpy(copy=True) self.y_data_arr = self.DF_y.to_numpy(copy=True)
[docs] def train_surrogate( self, DF, features_list, targets_list, split = 0.2, random_state = 42, hypo="def_hyperparameter.yaml", verbose=False): """ train MLP surrogate from dataframe with chosen features and targets, uses minmaxscaler and 'r2', 'neg_root_mean_squared_error' for scoring, the latter for refitting Parameters ---------- DF : TYPE DESCRIPTION. features_list : TYPE DESCRIPTION. targets_list : TYPE DESCRIPTION. split : TYPE, optional DESCRIPTION. The default is 0.2. random_state : TYPE, optional DESCRIPTION. The default is 42. hypo : TYPE, optional DESCRIPTION. The default is "def_hyperparameter.yaml". Returns ------- Y_test : TYPE DataFrame DESCRIPTION. test targets Y_pred : TYPE DESCRIPTION. predicted targets """ # create train and test set self.features_list = features_list self.targets_list = targets_list self.split = split self.random_state = random_state self.scaler_x = MinMaxScaler() self.scaler_y = MinMaxScaler() self._make_xy(DF) X_train, X_test, y_train, y_test = train_test_split( self.x_data_arr, self.y_data_arr, test_size=self.split, random_state=self.random_state) self.x_range = [np.min(X_train, axis=0), np.max(X_train, axis=0)] X_train = self.scaler_x.fit_transform(X_train) X_test = self.scaler_x.transform(X_test) y_train= self.scaler_y.fit_transform(y_train) y_test = self.scaler_y.transform(y_test) with open(hypo, 'r') as file: self.parameters = yaml.safe_load(file) # Define and start hyperparameter optimization if hypo == "def_hyperparameter.yaml": full_set = [] for i,nhl in enumerate(self.parameters["hidden_layers"]): for j, n in enumerate(self.parameters["neurons"]): NN = [n] * nhl NN = list(NN) full_set.append(NN) del self.parameters['hidden_layers'] del self.parameters['neurons'] self.parameters['hidden_layer_sizes'] = full_set # Create MLP for all targets model = MLPRegressor(max_iter=2000, n_iter_no_change=30) RS = RandomizedSearchCV(model, self.parameters, scoring=('r2', 'neg_root_mean_squared_error'), refit='neg_root_mean_squared_error', n_iter=100, n_jobs = -1, cv=3) RS.fit(X_train, y_train) self.RS = RS self.model = RS.best_estimator_ if verbose == True: print('best parameters: ', RS.best_params_) print('best estimator: ', RS.best_estimator_) print('best score: ', RS.best_score_) y_pred = RS.predict(X_test) y_pred = self.scaler_y.inverse_transform(y_pred) y_test = self.scaler_y.inverse_transform(y_test) Y_pred = pd.DataFrame(y_pred, columns=targets_list) Y_test = pd.DataFrame(y_test, columns=targets_list) return Y_test, Y_pred
[docs] def save(self, path="default"): """ save model Parameters ---------- path : TYPE, optional DESCRIPTION. The default is CARBATPY_RES_DIR Returns ------- None. """ elements = '-'.join(self.features_list) # Get the current date and time current_time = datetime.now() # Format the date and time as a string timestamp = current_time.strftime("%Y-%m-%d_%H-%M-%S") if path == "default": file_out = cb.CB_DEFAULTS["General"]["RES_DIR"] +'\\surrogates\\MLP-R\\' + elements + '\\' + timestamp + '\\' else: file_out = path + elements + '\\' + timestamp + '\\' try: os.makedirs(file_out) except: pass # Save results joblib.dump(self.RS, file_out + 'MLP_RS.pkl') joblib.dump(self.RS, file_out + 'MLP_RS.sav') joblib.dump(self.scaler_x, file_out + 'MLP_scaler_x.pkl') joblib.dump(self.scaler_y, file_out + 'MLP_scaler_y.pkl') joblib.dump(self.model, file_out + 'mlp.pkl') self.DF_x.to_csv(file_out + 'X.txt', index=False) self.DF_y.to_csv(file_out + 'y.txt', index=False) # Creates output file with information and results filename = "MLP_Randomized_search_parameter.txt" file = open(file_out + filename, "w") for key in range(len(list(self.parameters.keys()))): text = list(self.parameters.keys())[key] + ': ' + str(self.parameters[list(self.parameters.keys())[key]])+'\n' file.write(text) file.close() with open(file_out + 'features_list.pkl', 'wb') as file: pickle.dump(self.features_list, file) with open(file_out + 'targets_list.pkl', 'wb') as file: pickle.dump(self.targets_list, file) with open(file_out + 'random_state.pkl', 'wb') as file: pickle.dump(self.random_state, file) with open(file_out + 'x_range.pkl', 'wb') as file: pickle.dump(self.x_range, file) # Ergebnisse in eine Textdatei schreiben filename = 'MLP_randomized_search_results.txt' with open(file_out + filename, "w") as file: file.write("Best parameter: {}\n".format(self.RS.best_params_)) file.write("Best score: {}\n".format(self.RS.best_score_))
[docs] def load(self, path): # loading ML models self.model = joblib.load(path + 'mlp.pkl') self.scaler_x = joblib.load(path + 'MLP_scaler_x.pkl') self.scaler_y = joblib.load(path + 'MLP_scaler_y.pkl') # Loading features and targets with open(path + 'features_list.pkl', 'rb') as file: self.features_list = pickle.load(file) with open(path + 'targets_list.pkl', 'rb') as file: self.targets_list = pickle.load(file) with open(path + 'random_state.pkl', 'rb') as file: self.random_state = pickle.load(file) with open(path + 'x_range.pkl', 'rb') as file: self.x_range = pickle.load(file)
[docs] def predict(self, DF_new_x): """ use existing surrogate to predict new data Parameters ---------- DF_new_x : TYPE DESCRIPTION. Raises ------ ValueError DESCRIPTION. needs to contain features of model DESCRIPTION. does not extrapolate Returns ------- y : TYPE DESCRIPTION. target results DF_y : TYPE DESCRIPTION. target results """ try: DF_x = DF_new_x[self.features_list] except: raise ValueError("features in DF_new_x not in features_list of model") X = DF_x.to_numpy(copy=True) if np.all(np.all((X >= self.x_range[0]) & (X <= self.x_range[1]), axis=0)) != True: raise ValueError(f"""x_predict is outside of data range used for training range of training data: {self.features_list} min: {self.x_range[0]} max: {self.x_range[1]}""") try: X_scaled = self.scaler_x.transform(X) y_scaled = self.model.predict(X_scaled) y = self.scaler_y.inverse_transform(y_scaled) except: raise DF_y = pd.DataFrame(y) DF_y.columns = self.targets_list return y, DF_y
if __name__ == "__main__":
[docs] mode = 1 # 1: train new, 2: load existing
if mode == 1: # first test case compressor = Surrogate("piston compressor") DF = pd.read_csv(cb.CB_DEFAULTS['General']['CB_DATA'] + '\\Example_compressor_data.csv') DF = DF[DF.counter < 100] # filter failed results, iteration stops after 100 trials DF = DF[DF.is_eff != 1] # eliminates 1 (invalid fluid properties) # choose features and targets features_list = ['p_ve', 'T_e', 'v_e'] targets_list = ['is_eff', 'degree_delivery', 'T_aus'] compressor.train_surrogate(DF, features_list, targets_list) #compressor.save() # not activated (will trash the discspace) elif mode == 2: my_new_compressor = Surrogate("my new compressor") my_new_compressor.load(cb.CB_DEFAULTS['General']['CB_DATA'] + '\\def_hyperparameter.yaml') x_predict = np.array([[7, 290, 0.2], [3, 305, 0.3]]) x_predict = pd.DataFrame(x_predict) x_predict.columns = ['p_ve', 'T_e', 'v_e'] y, y_DF = my_new_compressor.predict(x_predict)