Source code for nnodely.nnodely

# Extern packages
import random, torch, copy, os
import numpy as np
import pandas as pd

# nnodely packages
from nnodely.visualizer import TextVisualizer, Visualizer
from nnodely.loss import CustomLoss
from nnodely.model import Model
from nnodely.optimizer import Optimizer, SGD, Adam
from nnodely.exporter import Exporter, StandardExporter
from nnodely.modeldef import ModelDef

from nnodely.utils import check, argmax_dict, argmin_dict, tensor_to_list, TORCH_DTYPE, NP_DTYPE

from nnodely.logger import logging, nnLogger
log = nnLogger(__name__, logging.INFO)


[docs] class Modely: """ Create the main object, the nnodely object, that will be used to create the network, train and export it. Parameters ---------- visualizer : str, Visualizer, optional The visualizer to be used. Default is the 'Standard' visualizer. exporter : str, Exporter, optional The exporter to be used. Default is the 'Standard' exporter. seed : int, optional Set the seed for all the random modules inside the nnodely framework. Default is None. workspace : str The path of the workspace where all the exported files will be saved. log_internal : bool Whether or not save the logs. Default is False. save_history : bool Whether or not save the history. Default is False. Example ------- >>> model = Modely() """ def __init__(self, visualizer:str|Visualizer|None = 'Standard', exporter:str|Exporter|None = 'Standard', seed:int|None = None, workspace:str|None = None, log_internal:bool = False, save_history:bool = False): # Visualizer if visualizer == 'Standard': self.visualizer = TextVisualizer(1) elif visualizer != None: self.visualizer = visualizer else: self.visualizer = Visualizer() self.visualizer.set_n4m(self) # Exporter if exporter == 'Standard': self.exporter = StandardExporter(workspace, self.visualizer, save_history) elif exporter != None: self.exporter = exporter else: self.exporter = Exporter() ## Set the random seed for reproducibility if seed is not None: self.resetSeed(seed) # Save internal self.log_internal = log_internal if self.log_internal == True: self.internals = {} # Models definition self.model_def = ModelDef() self.input_n_samples = {} self.max_n_samples = 0 self.neuralized = False self.traced = False self.model = None self.states = {} # Dataaset Parameters self.data_loaded = False self.file_count = 0 self.num_of_samples = {} self.data = {} self.n_datasets = 0 self.datasets_loaded = set() self.multifile = {} # Training Parameters self.standard_train_parameters = { 'models' : None, 'train_dataset' : None, 'validation_dataset' : None, 'test_dataset' : None, 'splits' : [70, 20, 10], 'closed_loop' : {}, 'connect' : {}, 'step' : 0, 'prediction_samples' : 0, 'shuffle_data' : True, 'early_stopping' : None, 'early_stopping_params' : {}, 'select_model' : 'last', 'select_model_params' : {}, 'minimize_gain' : {}, 'num_of_epochs': 100, 'train_batch_size' : 128, 'val_batch_size' : None, 'test_batch_size' : None, 'optimizer' : 'Adam', 'lr' : 0.001, 'lr_param' : {}, 'optimizer_params' : [], 'add_optimizer_params' : [], 'optimizer_defaults' : {}, 'add_optimizer_defaults' : {} } # Optimizer self.optimizer = None # Training Losses self.loss_functions = {} # Validation Parameters self.training = {} self.performance = {} self.prediction = {}
[docs] def resetSeed(self, seed): """ Resets the random seed for reproducibility. This method sets the seed for various random number generators used in the project to ensure reproducibility of results. :param seed: The seed value to be used for the random number generators. :type seed: int Example: >>> model = nnodely() >>> model.resetSeed(42) """ torch.manual_seed(seed) ## set the pytorch seed torch.cuda.manual_seed_all(seed) random.seed(seed) ## set the random module seed np.random.seed(seed) ## set the numpy seed
def __call__(self, inputs={}, sampled=False, closed_loop={}, connect={}, prediction_samples='auto', num_of_samples=None): ##, align_input=False): """ Performs inference on the model. Parameters ---------- inputs : dict, optional A dictionary of input data. The keys are input names and the values are the corresponding data. Default is an empty dictionary. sampled : bool, optional A boolean indicating whether the inputs are already sampled. Default is False. closed_loop : dict, optional A dictionary specifying closed loop connections. The keys are input names and the values are output names. Default is an empty dictionary. connect : dict, optional A dictionary specifying connections. The keys are input names and the values are output names. Default is an empty dictionary. prediction_samples : str or int, optional The number of prediction samples. Can be 'auto', None or an integer. Default is 'auto'. num_of_samples : str or int, optional The number of samples. Can be 'auto', None or an integer. Default is 'auto'. Returns ------- dict A dictionary containing the model's prediction outputs. Raises ------ RuntimeError If the network is not neuralized. ValueError If an input variable is not in the model definition or if an output variable is not in the model definition. Example ------- Example usage: >>> model = Modely() >>> x = Input('x') >>> out = Output('out', Fir(x.last())) >>> model.addModel('example_model', [out]) >>> model.neuralizeModel() >>> predictions = model(inputs={'x': [1, 2, 3]}) """ ## Copy dict for avoid python bug inputs = copy.deepcopy(inputs) closed_loop = copy.deepcopy(closed_loop) connect = copy.deepcopy(connect) ## Check neuralize check(self.neuralized, RuntimeError, "The network is not neuralized.") ## Check closed loop integrity for close_in, close_out in closed_loop.items(): check(close_in in self.model_def['Inputs'], ValueError, f'the tag {close_in} is not an input variable.') check(close_out in self.model_def['Outputs'], ValueError, f'the tag {close_out} is not an output of the network') ## List of keys model_inputs = list(self.model_def['Inputs'].keys()) model_states = list(self.model_def['States'].keys()) state_closed_loop = [key for key, value in self.model_def['States'].items() if 'closedLoop' in value.keys()] + list(closed_loop.keys()) state_connect = [key for key, value in self.model_def['States'].items() if 'connect' in value.keys()] + list(connect.keys()) extra_inputs = list(set(list(inputs.keys())) - set(model_inputs) - set(model_states)) non_mandatory_inputs = state_closed_loop + state_connect mandatory_inputs = list(set(model_inputs) - set(non_mandatory_inputs)) ## Remove extra inputs for key in extra_inputs: log.warning(f'The provided input {key} is not used inside the network. the inference will continue without using it') del inputs[key] ## Get the number of data windows for each input/state num_of_windows = {key: len(value) for key, value in inputs.items()} if sampled else {key: len(value) - self.input_n_samples[key] + 1 for key, value in inputs.items()} ## Get the maximum inference window if num_of_samples: window_dim = num_of_samples for key in inputs.keys(): input_dim = self.model_def['Inputs'][key]['dim'] if key in model_inputs else self.model_def['States'][key]['dim'] if input_dim > 1: inputs[key] += [[0 for _ in range(input_dim)] for _ in range(num_of_samples - (len(inputs[key]) - self.input_n_samples[key] + 1))] else: inputs[key] += [0 for _ in range(num_of_samples - (len(inputs[key]) - self.input_n_samples[key] + 1))] elif inputs: windows = [] for key in inputs.keys(): if key in mandatory_inputs: n_samples = len(inputs[key]) if sampled else len(inputs[key]) - self.model_def['Inputs'][key]['ntot'] + 1 windows.append(n_samples) if not windows: for key in inputs.keys(): if key in non_mandatory_inputs: if key in model_inputs: n_samples = len(inputs[key]) if sampled else len(inputs[key]) - self.model_def['Inputs'][key]['ntot'] + 1 else: n_samples = len(inputs[key]) if sampled else len(inputs[key]) - self.model_def['States'][key]['ntot'] + 1 windows.append(n_samples) window_dim = min(windows) if windows else 0 else: ## No inputs window_dim = 1 if non_mandatory_inputs else 0 check(window_dim > 0, StopIteration, f'Missing samples in the input window') if len(set(num_of_windows.values())) > 1: max_ind_key, max_dim = argmax_dict(num_of_windows) min_ind_key, min_dim = argmin_dict(num_of_windows) log.warning(f'Different number of samples between inputs [MAX {num_of_windows[max_ind_key]} = {max_dim}; MIN {num_of_windows[min_ind_key]} = {min_dim}]') ## Autofill the missing inputs provided_inputs = list(inputs.keys()) missing_inputs = list(set(mandatory_inputs) - set(provided_inputs)) if missing_inputs: log.warning(f'Inputs not provided: {missing_inputs}. Autofilling with zeros..') for key in missing_inputs: inputs[key] = np.zeros(shape=(self.input_n_samples[key] + window_dim - 1, self.model_def['Inputs'][key]['dim']),dtype=NP_DTYPE).tolist() ## Transform inputs in 3D Tensors for key, val in inputs.items(): input_dim = self.model_def['Inputs'][key]['dim'] if key in model_inputs else self.model_def['States'][key]['dim'] inputs[key] = torch.from_numpy(np.array(inputs[key])).to(TORCH_DTYPE) if input_dim > 1: correct_dim = 3 if sampled else 2 check(len(inputs[key].shape) == correct_dim, ValueError,f'The input {key} must have {correct_dim} dimensions') check(inputs[key].shape[correct_dim - 1] == input_dim, ValueError,f'The second dimension of the input "{key}" must be equal to {input_dim}') if input_dim == 1 and inputs[key].shape[-1] != 1: ## add the input dimension inputs[key] = inputs[key].unsqueeze(-1) if inputs[key].ndim <= 1: ## add the batch dimension inputs[key] = inputs[key].unsqueeze(0) if inputs[key].ndim <= 2: ## add the time dimension inputs[key] = inputs[key].unsqueeze(0) ## initialize the resulting dictionary result_dict = {} for key in self.model_def['Outputs'].keys(): result_dict[key] = [] ## Inference with torch.inference_mode(): self.model.eval() ## Update with virtual states if prediction_samples is not None: self.model.update(closed_loop=closed_loop, connect=connect) else: prediction_samples = 0 X = {} count = 0 first = True for idx in range(window_dim): ## Get mandatory data inputs for key in mandatory_inputs: X[key] = inputs[key][idx:idx+1] if sampled else inputs[key][:, idx:idx + self.input_n_samples[key]] ## reset states if count == 0 or prediction_samples=='auto': count = prediction_samples for key in non_mandatory_inputs: ## Get non mandatory data (from inputs, from states, or with zeros) ## if prediction_samples is 'auto' and i have enough samples ## if prediction_samples is NOT 'auto' but i have enough extended window (with zeros) if (key in inputs.keys() and prediction_samples == 'auto' and idx < num_of_windows[key]) or (key in inputs.keys() and prediction_samples != 'auto' and idx < inputs[key].shape[1]): X[key] = inputs[key][idx:idx+1].clone() if sampled else inputs[key][:, idx:idx + self.input_n_samples[key]].clone() ## if im in the first reset ## if i have a state in memory ## if i have prediction_samples = 'auto' and not enough samples elif (key in self.states.keys() and (first or prediction_samples == 'auto')) and (prediction_samples == 'auto' or prediction_samples == None): X[key] = self.states[key] else: ## if i have no samples and no states window_size = self.input_n_samples[key] dim = self.model_def['Inputs'][key]['dim'] if key in model_inputs else self.model_def['States'][key]['dim'] X[key] = torch.zeros(size=(1, window_size, dim), dtype=TORCH_DTYPE, requires_grad=False) self.states[key] = X[key] first = False else: count -= 1 ## Forward pass result, _, out_closed_loop, out_connect = self.model(X) ## Append the prediction of the current sample to the result dictionary for key in self.model_def['Outputs'].keys(): if result[key].shape[-1] == 1: result[key] = result[key].squeeze(-1) if result[key].shape[-1] == 1: result[key] = result[key].squeeze(-1) result_dict[key].append(result[key].detach().squeeze(dim=0).tolist()) ## Update closed_loop and connect if prediction_samples: for key, val in out_closed_loop.items(): shift = val.shape[1] ## take the output time dimension X[key] = torch.roll(X[key], shifts=-1, dims=1) ## Roll the time window X[key][:, -shift:, :] = val ## substitute with the predicted value self.states[key] = X[key] for key, val in out_connect.items(): X[key] = val self.states[key] = X[key] ## Remove virtual states for key in (connect.keys() | closed_loop.keys()): if key in self.states.keys(): del self.states[key] return result_dict
[docs] def getSamples(self, dataset, index = None, window=1): """ Retrieves a window of samples from a given dataset. Parameters ---------- dataset : str The name of the dataset to retrieve samples from. index : int, optional The starting index of the samples. If None, a random index is chosen. Default is None. window : int, optional The number of consecutive samples to retrieve. Default is 1. Returns ------- dict A dictionary containing the retrieved samples. The keys are input and state names, and the values are lists of samples. Raises ------ ValueError If the dataset is not loaded. Example ------- Example usage: >>> model = Modely() >>> model.loadData('dataset_name') >>> samples = model.getSamples('dataset_name', index=10, window=5) """ if index is None: index = random.randint(0, self.num_of_samples[dataset] - window) check(self.data_loaded, ValueError, 'The Dataset must first be loaded using <loadData> function!') if self.data_loaded: result_dict = {} for key in (self.model_def['Inputs'].keys() | self.model_def['States'].keys()): result_dict[key] = [] for idx in range(window): for key ,samples in self.data[dataset].items(): if key in (self.model_def['Inputs'].keys() | self.model_def['States'].keys()): result_dict[key].append(samples[index+idx]) return result_dict
[docs] def addConnect(self, stream_out, state_list_in): """ Adds a connection from a relation stream to an input state. Parameters ---------- stream_out : Stream The relation stream to connect from. state_list_in : list of State The list of input states to connect to. Example ------- Example usage: >>> model = Modely() >>> x = Input('x') >>> y = State('y') >>> relation = Fir(x.last()) >>> model.addConnect(relation, y) """ self.model_def.addConnect(stream_out, state_list_in)
[docs] def addClosedLoop(self, stream_out, state_list_in): """ Adds a closed loop connection from a relation stream to an input state. Parameters ---------- stream_out : Stream The relation stream to connect from. state_list_in : list of State The list of input states to connect to. Example ------- Example usage: >>> model = Modely() >>> x = Input('x') >>> y = State('y') >>> relation = Fir(x.last()) >>> model.addClosedLoop(relation, y) """ self.model_def.addClosedLoop(stream_out, state_list_in)
[docs] def addModel(self, name, stream_list): """ Adds a new model with the given name along with a list of Outputs. Parameters ---------- name : str The name of the model. stream_list : list of Stream The list of Outputs stream in the model. Example ------- Example usage: >>> model = Modely() >>> x = Input('x') >>> out = Output('out', Fir(x.last())) >>> model.addModel('example_model', [out]) """ self.model_def.addModel(name, stream_list)
[docs] def removeModel(self, name_list): """ Removes models with the given list of names. Parameters ---------- name_list : list of str The list of model names to remove. Example ------- Example usage: >>> model.removeModel(['sub_model1', 'sub_model2']) """ self.model_def.removeModel(name_list)
[docs] def addMinimize(self, name, streamA, streamB, loss_function='mse'): """ Adds a minimize loss function to the model. Parameters ---------- name : str The name of the cost function. streamA : Stream The first relation stream for the minimize operation. streamB : Stream The second relation stream for the minimize operation. loss_function : str, optional The loss function to use from the ones provided. Default is 'mse'. Example ------- Example usage: >>> model.addMinimize('minimize_op', streamA, streamB, loss_function='mse') """ self.model_def.addMinimize(name, streamA, streamB, loss_function) self.visualizer.showaddMinimize(name)
[docs] def removeMinimize(self, name_list): """ Removes minimize loss functions using the given list of names. Parameters ---------- name_list : list of str The list of minimize operation names to remove. Example ------- Example usage: >>> model.removeMinimize(['minimize_op1', 'minimize_op2']) """ self.model_def.removeMinimize(name_list)
[docs] def resetStates(self, states=[], batch=1): if states: ## reset only specific states for key in states: window_size = self.input_n_samples[key] dim = self.model_def['States'][key]['dim'] self.states[key] = torch.zeros(size=(batch, window_size, dim), dtype=TORCH_DTYPE, requires_grad=False) else: ## reset all states self.states = {} for key, state in self.model_def['States'].items(): window_size = self.input_n_samples[key] dim = state['dim'] self.states[key] = torch.zeros(size=(batch, window_size, dim), dtype=TORCH_DTYPE, requires_grad=False)
[docs] def neuralizeModel(self, sample_time = None, clear_model = False, model_def = None): """ Neuralizes the model, preparing it for inference and training. This method creates a neural network model starting from the model definition. It will also create all the time windows for the inputs and states. Parameters ---------- sample_time : float or None, optional The sample time for the model. Default is None. clear_model : bool, optional Whether to clear the existing model definition. Default is False. model_def : dict or None, optional A dictionary defining the model. If provided, it overrides the existing model definition. Default is None. Raises ------ ValueError If sample_time is not None and model_def is provided. If clear_model is True and model_def is provided. Example ------- Example usage: >>> model = Modely(name='example_model') >>> model.neuralizeModel(sample_time=0.1, clear_model=True) """ if model_def is not None: check(sample_time == None, ValueError, 'The sample_time must be None if a model_def is provided') check(clear_model == False, ValueError, 'The clear_model must be False if a model_def is provided') self.model_def = ModelDef(model_def) else: if clear_model: self.model_def.update() else: self.model_def.updateParameters(self.model) self.model_def.setBuildWindow(sample_time) self.model = Model(self.model_def.json) input_ns_backward = {key:value['ns'][0] for key, value in (self.model_def['Inputs']|self.model_def['States']).items()} input_ns_forward = {key:value['ns'][1] for key, value in (self.model_def['Inputs']|self.model_def['States']).items()} self.input_n_samples = {} for key, value in (self.model_def['Inputs'] | self.model_def['States']).items(): self.input_n_samples[key] = input_ns_backward[key] + input_ns_forward[key] self.max_n_samples = max(input_ns_backward.values()) + max(input_ns_forward.values()) ## Initialize States self.resetStates() self.neuralized = True self.traced = False self.visualizer.showModel(self.model_def.json) self.visualizer.showModelInputWindow() self.visualizer.showBuiltModel()
[docs] def loadData(self, name, source, format=None, skiplines=0, delimiter=',', header=None): """ Loads data into the model. The data can be loaded from a directory path containing the csv files or from a crafted dataset. Parameters ---------- name : str The name of the dataset. source : str or list The source of the data. Can be a directory path containing the csv files or a list of custom data. format : list or None, optional The format of the data. When loading multiple csv files the format parameter will define how to read each column of the file. Default is None. skiplines : int, optional The number of lines to skip at the beginning of the file. Default is 0. delimiter : str, optional The delimiter used in the data files. Default is ','. header : list or None, optional The header of the data files. Default is None. Raises ------ ValueError If the network is not neuralized. If the delimiter is not valid. Example ------- Example - load data from files: >>> x = Input('x') >>> y = Input('y') >>> out = Output('out',Fir(x.tw(0.05))) >>> test = Modely(visualizer=None) >>> test.addModel('example_model', out) >>> test.neuralizeModel(0.01) >>> data_struct = ['x', '', 'y'] >>> test.loadData(name='example_dataset', source='path/to/data', format=data_struct) Example - load data from a crafted dataset: >>> x = Input('x') >>> y = Input('y') >>> out = Output('out',Fir(x.tw(0.05))) >>> test = Modely(visualizer=None) >>> test.addModel('example_model', out) >>> test.neuralizeModel(0.01) >>> data_x = np.array(range(10)) >>> dataset = {'x': data_x, 'y': (2*data_x)} >>> test.loadData(name='example_dataset',source=dataset) """ check(self.neuralized, ValueError, "The network is not neuralized.") check(delimiter in ['\t', '\n', ';', ',', ' '], ValueError, 'delimiter not valid!') json_inputs = self.model_def['Inputs'] | self.model_def['States'] model_inputs = list(json_inputs.keys()) ## Initialize the dictionary containing the data if name in list(self.data.keys()): log.warning(f'Dataset named {name} already loaded! overriding the existing one..') self.data[name] = {} input_ns_backward = {key:value['ns'][0] for key, value in json_inputs.items()} input_ns_forward = {key:value['ns'][1] for key, value in json_inputs.items()} max_samples_backward = max(input_ns_backward.values()) max_samples_forward = max(input_ns_forward.values()) max_n_samples = max_samples_backward + max_samples_forward num_of_samples = {} if type(source) is str: ## we have a directory path containing the files ## collect column indexes format_idx = {} idx = 0 for item in format: if isinstance(item, tuple): for key in item: if key not in model_inputs: idx += 1 break n_cols = json_inputs[key]['dim'] format_idx[key] = (idx, idx+n_cols) idx += n_cols else: if item not in model_inputs: idx += 1 continue n_cols = json_inputs[item]['dim'] format_idx[item] = (idx, idx+n_cols) idx += n_cols ## Initialize each input key for key in format_idx.keys(): self.data[name][key] = [] ## obtain the file names try: _,_,files = next(os.walk(source)) files.sort() except StopIteration as e: check(False,StopIteration, f'ERROR: The path "{source}" does not exist!') return self.file_count = len(files) if self.file_count > 1: ## Multifile self.multifile[name] = [] ## Cycle through all the files for file in files: try: ## read the csv df = pd.read_csv(os.path.join(source,file), skiprows=skiplines, delimiter=delimiter, header=header) except: log.warning(f'Cannot read file {os.path.join(source,file)}') continue if self.file_count > 1: self.multifile[name].append((self.multifile[name][-1] + (len(df) - max_n_samples + 1)) if self.multifile[name] else len(df) - max_n_samples + 1) ## Cycle through all the windows for key, idxs in format_idx.items(): back, forw = input_ns_backward[key], input_ns_forward[key] ## Save as numpy array the data data = df.iloc[:, idxs[0]:idxs[1]].to_numpy() self.data[name][key] += [data[i-back:i+forw] for i in range(max_samples_backward, len(df)-max_samples_forward+1)] ## Stack the files for key in format_idx.keys(): self.data[name][key] = np.stack(self.data[name][key]) num_of_samples[key] = self.data[name][key].shape[0] elif type(source) is dict: ## we have a crafted dataset self.file_count = 1 ## Check if the inputs are correct #assert set(model_inputs).issubset(source.keys()), f'The dataset is missing some inputs. Inputs needed for the model: {model_inputs}' # Merge a list of for key in model_inputs: if key not in source.keys(): continue self.data[name][key] = [] ## Initialize the dataset back, forw = input_ns_backward[key], input_ns_forward[key] for idx in range(len(source[key]) - max_n_samples+1): self.data[name][key].append(source[key][idx + (max_samples_backward - back):idx + (max_samples_backward + forw)]) ## Stack the files for key in model_inputs: if key not in source.keys(): continue self.data[name][key] = np.stack(self.data[name][key]) if self.data[name][key].ndim == 2: ## Add the sample dimension self.data[name][key] = np.expand_dims(self.data[name][key], axis=-1) if self.data[name][key].ndim > 3: self.data[name][key] = np.squeeze(self.data[name][key], axis=1) num_of_samples[key] = self.data[name][key].shape[0] # Check dim of the samples check(len(set(num_of_samples.values())) == 1, ValueError, f"The number of the sample of the dataset {name} are not the same for all input in the dataset: {num_of_samples}") self.num_of_samples[name] = num_of_samples[list(num_of_samples.keys())[0]] ## Set the Loaded flag to True self.data_loaded = True ## Update the number of datasets loaded self.n_datasets = len(self.data.keys()) self.datasets_loaded.add(name) ## Show the dataset self.visualizer.showDataset(name=name)
[docs] def filterData(self, filter_function, dataset_name = None): """ Filters the data in the dataset using the provided filter function. Parameters ---------- filter_function : Callable A function that takes a sample as input and returns True if the sample should be kept, and False if it should be removed. dataset_name : str or None, optional The name of the dataset to filter. If None, all datasets are filtered. Default is None. Example ------- Example usage: >>> model = Modely() >>> model.loadData('dataset_name', 'path/to/data') >>> def filter_fn(sample): >>> return sample['input1'] > 0 >>> model.filterData(filter_fn, 'dataset_name') """ idx_to_remove = [] if dataset_name is None: for name in self.data.keys(): dataset = self.data[name] n_samples = len(dataset[list(dataset.keys())[0]]) data_for_filter = [] for i in range(n_samples): new_sample = {key: val[i] for key, val in dataset.items()} data_for_filter.append(new_sample) for idx, sample in enumerate(data_for_filter): if not filter_function(sample): idx_to_remove.append(idx) for key in self.data[name].keys(): self.data[name][key] = np.delete(self.data[name][key], idx_to_remove, axis=0) self.num_of_samples[name] = self.data[name][key].shape[0] self.visualizer.showDataset(name=name) else: dataset = self.data[dataset_name] n_samples = len(dataset[list(dataset.keys())[0]]) data_for_filter = [] for i in range(n_samples): new_sample = {key: val[i] for key, val in dataset.items()} data_for_filter.append(new_sample) for idx, sample in enumerate(data_for_filter): if not filter_function(sample): idx_to_remove.append(idx) for key in self.data[dataset_name].keys(): self.data[dataset_name][key] = np.delete(self.data[dataset_name][key], idx_to_remove, axis=0) self.num_of_samples[dataset_name] = self.data[dataset_name][key].shape[0] self.visualizer.showDataset(name=dataset_name)
def __save_internal(self, key, value): self.internals[key] = tensor_to_list(value) def __get_train_parameters(self, training_params): run_train_parameters = copy.deepcopy(self.standard_train_parameters) if training_params is None: return run_train_parameters for key, value in training_params.items(): check(key in run_train_parameters, KeyError, f"The param {key} is not exist as standard parameters") run_train_parameters[key] = value return run_train_parameters def __get_parameter(self, **parameter): assert len(parameter) == 1 name = list(parameter.keys())[0] self.run_training_params[name] = parameter[name] if parameter[name] is not None else self.run_training_params[name] return self.run_training_params[name] def __get_batch_sizes(self, train_batch_size, val_batch_size, test_batch_size): ## Check if the batch_size can be used for the current dataset, otherwise set the batch_size to the maximum value self.__get_parameter(train_batch_size = train_batch_size) self.__get_parameter(val_batch_size = val_batch_size) self.__get_parameter(test_batch_size = test_batch_size) if self.run_training_params['recurrent_train']: if self.run_training_params['train_batch_size'] > self.run_training_params['n_samples_train']: self.run_training_params['train_batch_size'] = self.run_training_params['n_samples_train'] - self.run_training_params['prediction_samples'] if self.run_training_params['val_batch_size'] is None or self.run_training_params['val_batch_size'] > self.run_training_params['n_samples_val']: self.run_training_params['val_batch_size'] = max(0,self.run_training_params['n_samples_val'] - self.run_training_params['prediction_samples']) if self.run_training_params['test_batch_size'] is None or self.run_training_params['test_batch_size'] > self.run_training_params['n_samples_test']: self.run_training_params['test_batch_size'] = max(0,self.run_training_params['n_samples_test'] - self.run_training_params['prediction_samples']) else: if self.run_training_params['train_batch_size'] > self.run_training_params['n_samples_train']: self.run_training_params['train_batch_size'] = self.run_training_params['n_samples_train'] if self.run_training_params['val_batch_size'] is None or self.run_training_params['val_batch_size'] > self.run_training_params['n_samples_val']: self.run_training_params['val_batch_size'] = self.run_training_params['n_samples_val'] if self.run_training_params['test_batch_size'] is None or self.run_training_params['test_batch_size'] > self.run_training_params['n_samples_test']: self.run_training_params['test_batch_size'] = self.run_training_params['n_samples_test'] check(self.run_training_params['train_batch_size'] > 0, ValueError, f'The auto train_batch_size ({self.run_training_params["train_batch_size"] }) = n_samples_train ({self.run_training_params["n_samples_train"]}) - prediction_samples ({self.run_training_params["prediction_samples"]}), must be greater than 0.') return self.run_training_params['train_batch_size'], self.run_training_params['val_batch_size'], self.run_training_params['test_batch_size'] def __inizilize_optimizer(self, optimizer, optimizer_params, optimizer_defaults, add_optimizer_params, add_optimizer_defaults, models, lr, lr_param): # Get optimizer and initialization parameters optimizer = copy.deepcopy(self.__get_parameter(optimizer=optimizer)) optimizer_params = copy.deepcopy(self.__get_parameter(optimizer_params=optimizer_params)) optimizer_defaults = copy.deepcopy(self.__get_parameter(optimizer_defaults=optimizer_defaults)) add_optimizer_params = copy.deepcopy(self.__get_parameter(add_optimizer_params=add_optimizer_params)) add_optimizer_defaults = copy.deepcopy(self.__get_parameter(add_optimizer_defaults=add_optimizer_defaults)) ## Get parameter to be trained json_models = [] models = self.__get_parameter(models=models) if 'Models' in self.model_def: json_models = list(self.model_def['Models'].keys()) if type(self.model_def['Models']) is dict else [self.model_def['Models']] if models is None: models = json_models self.run_training_params['models'] = models params_to_train = set() if isinstance(models, str): models = [models] for model in models: check(model in json_models, ValueError, f'The model {model} is not in the model definition') if type(self.model_def['Models']) is dict: params_to_train |= set(self.model_def['Models'][model]['Parameters']) else: params_to_train |= set(self.model_def['Parameters'].keys()) # Get the optimizer if type(optimizer) is str: if optimizer == 'SGD': optimizer = SGD({},[]) elif optimizer == 'Adam': optimizer = Adam({},[]) else: check(issubclass(type(optimizer), Optimizer), TypeError, "The optimizer must be an Optimizer or str") optimizer.set_params_to_train(self.model.all_parameters, params_to_train) optimizer.add_defaults('lr', self.run_training_params['lr']) optimizer.add_option_to_params('lr', self.run_training_params['lr_param']) if optimizer_defaults != {}: optimizer.set_defaults(optimizer_defaults) if optimizer_params != []: optimizer.set_params(optimizer_params) for key, value in add_optimizer_defaults.items(): optimizer.add_defaults(key, value) add_optimizer_params = optimizer.unfold(add_optimizer_params) for param in add_optimizer_params: par = param['params'] del param['params'] for key, value in param.items(): optimizer.add_option_to_params(key, {par:value}) # Modify the parameter optimizer.add_defaults('lr', lr) optimizer.add_option_to_params('lr', lr_param) return optimizer
[docs] def trainModel(self, models=None, train_dataset = None, validation_dataset = None, test_dataset = None, splits = None, closed_loop = None, connect = None, step = None, prediction_samples = None, shuffle_data = None, early_stopping = None, early_stopping_params = None, select_model = None, select_model_params = None, minimize_gain = None, num_of_epochs = None, train_batch_size = None, val_batch_size = None, test_batch_size = None, optimizer = None, lr = None, lr_param = None, optimizer_params = None, optimizer_defaults = None, training_params = None, add_optimizer_params = None, add_optimizer_defaults = None ): """ Trains the model using the provided datasets and parameters. Parameters ---------- models : list or None, optional A list of models to train. Default is None. train_dataset : str or None, optional The name of the training dataset. Default is None. validation_dataset : str or None, optional The name of the validation dataset. Default is None. test_dataset : str or None, optional The name of the test dataset. Default is None. splits : list or None, optional A list of 3 elements specifying the percentage of splits for training, validation, and testing. The three elements must sum up to 100! The parameter splits is only used when there is only 1 dataset loaded. Default is None. closed_loop : dict or None, optional A dictionary specifying closed loop connections. The keys are input names and the values are output names. Default is None. connect : dict or None, optional A dictionary specifying connections. The keys are input names and the values are output names. Default is None. step : int or None, optional The step size for training. A big value will result in less data used for each epochs and a faster train. Default is None. prediction_samples : int or None, optional The size of the prediction horizon. Number of samples at each recurrent window Default is None. shuffle_data : bool or None, optional Whether to shuffle the data during training. Default is None. early_stopping : Callable or None, optional A callable for early stopping. Default is None. early_stopping_params : dict or None, optional A dictionary of parameters for early stopping. Default is None. select_model : Callable or None, optional A callable for selecting the best model. Default is None. select_model_params : dict or None, optional A dictionary of parameters for selecting the best model. Default is None. minimize_gain : dict or None, optional A dictionary specifying the gain for each minimization loss function. Default is None. num_of_epochs : int or None, optional The number of epochs to train the model. Default is None. train_batch_size : int or None, optional The batch size for training. Default is None. val_batch_size : int or None, optional The batch size for validation. Default is None. test_batch_size : int or None, optional The batch size for testing. Default is None. optimizer : Optimizer or None, optional The optimizer to use for training. Default is None. lr : float or None, optional The learning rate. Default is None. lr_param : dict or None, optional A dictionary of learning rate parameters. Default is None. optimizer_params : dict or None, optional A dictionary of optimizer parameters. Default is None. optimizer_defaults : dict or None, optional A dictionary of default optimizer settings. Default is None. training_params : dict or None, optional A dictionary of training parameters. Default is None. add_optimizer_params : dict or None, optional Additional optimizer parameters. Default is None. add_optimizer_defaults : dict or None, optional Additional default optimizer settings. Default is None. Raises ------ RuntimeError If no data is loaded or if there are no modules with learnable parameters. KeyError If the sample horizon is not positive. ValueError If an input or output variable is not in the model definition. Example ------- Example - basic feed-forward training: >>> x = Input('x') >>> F = Input('F') >>> xk1 = Output('x[k+1]', Fir()(x.tw(0.2))+Fir()(F.last())) >>> mass_spring_damper = Modely(seed=0) >>> mass_spring_damper.addModel('xk1',xk1) >>> mass_spring_damper.neuralizeModel(sample_time = 0.05) >>> data_struct = ['time','x','dx','F'] >>> data_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)),'dataset','data') >>> mass_spring_damper.loadData(name='mass_spring_dataset', source=data_folder, format=data_struct, delimiter=';') >>> params = {'num_of_epochs': 100,'train_batch_size': 128,'lr':0.001} >>> mass_spring_damper.trainModel(splits=[70,20,10], training_params = params) Example - recurrent training: >>> x = Input('x') >>> F = Input('F') >>> xk1 = Output('x[k+1]', Fir()(x.tw(0.2))+Fir()(F.last())) >>> mass_spring_damper = Modely(seed=0) >>> mass_spring_damper.addModel('xk1',xk1) >>> mass_spring_damper.addClosedLoop(xk1, x) >>> mass_spring_damper.neuralizeModel(sample_time = 0.05) >>> data_struct = ['time','x','dx','F'] >>> data_folder = os.path.join(os.path.dirname(os.path.realpath(__file__)),'dataset','data') >>> mass_spring_damper.loadData(name='mass_spring_dataset', source=data_folder, format=data_struct, delimiter=';') >>> params = {'num_of_epochs': 100,'train_batch_size': 128,'lr':0.001} >>> mass_spring_damper.trainModel(splits=[70,20,10], prediction_samples=10, training_params = params) """ check(self.data_loaded, RuntimeError, 'There is no data loaded! The Training will stop.') check(list(self.model.parameters()), RuntimeError, 'There are no modules with learnable parameters! The Training will stop.') ## Get running parameter from dict self.run_training_params = copy.deepcopy(self.__get_train_parameters(training_params)) ## Get connect and closed_loop prediction_samples = self.__get_parameter(prediction_samples = prediction_samples) check(prediction_samples >= 0, KeyError, 'The sample horizon must be positive!') ## Check close loop and connect step = self.__get_parameter(step = step) closed_loop = self.__get_parameter(closed_loop = closed_loop) connect = self.__get_parameter(connect = connect) recurrent_train = True if closed_loop: for input, output in closed_loop.items(): check(input in self.model_def['Inputs'], ValueError, f'the tag {input} is not an input variable.') check(output in self.model_def['Outputs'], ValueError, f'the tag {output} is not an output of the network') log.warning(f'Recurrent train: closing the loop between the the input ports {input} and the output ports {output} for {prediction_samples} samples') elif connect: for connect_in, connect_out in connect.items(): check(connect_in in self.model_def['Inputs'], ValueError, f'the tag {connect_in} is not an input variable.') check(connect_out in self.model_def['Outputs'], ValueError, f'the tag {connect_out} is not an output of the network') log.warning(f'Recurrent train: connecting the input ports {connect_in} with output ports {connect_out} for {prediction_samples} samples') elif self.model_def['States']: ## if we have state variables we have to do the recurrent train log.warning(f"Recurrent train: update States variables {list(self.model_def['States'].keys())} for {prediction_samples} samples") else: if prediction_samples != 0: log.warning( f"The value of the prediction_samples={prediction_samples} is not used in not recursive network.") recurrent_train = False self.run_training_params['recurrent_train'] = recurrent_train ## Get early stopping early_stopping = self.__get_parameter(early_stopping = early_stopping) if early_stopping: self.run_training_params['early_stopping'] = early_stopping.__name__ early_stopping_params = self.__get_parameter(early_stopping_params = early_stopping_params) ## Get dataset for training shuffle_data = self.__get_parameter(shuffle_data = shuffle_data) ## Get the dataset name train_dataset = self.__get_parameter(train_dataset = train_dataset) #TODO manage multiple datasets if train_dataset is None: ## If we use all datasets with the splits splits = self.__get_parameter(splits = splits) check(len(splits)==3, ValueError, '3 elements must be inserted for the dataset split in training, validation and test') check(sum(splits)==100, ValueError, 'Training, Validation and Test splits must sum up to 100.') check(splits[0] > 0, ValueError, 'The training split cannot be zero.') ## Get the dataset name dataset = list(self.data.keys())[0] ## take the dataset name train_dataset_name = val_dataset_name = test_dataset_name = dataset ## Collect the split sizes train_size = splits[0] / 100.0 val_size = splits[1] / 100.0 test_size = 1 - (train_size + val_size) num_of_samples = self.num_of_samples[dataset] n_samples_train = round(num_of_samples*train_size) if splits[1] == 0: n_samples_test = num_of_samples-n_samples_train n_samples_val = 0 else: n_samples_test = round(num_of_samples*test_size) n_samples_val = num_of_samples-n_samples_train-n_samples_test ## Split into train, validation and test XY_train, XY_val, XY_test = {}, {}, {} for key, samples in self.data[dataset].items(): if val_size == 0.0 and test_size == 0.0: ## we have only training set XY_train[key] = torch.from_numpy(samples).to(TORCH_DTYPE) elif val_size == 0.0 and test_size != 0.0: ## we have only training and test set XY_train[key] = torch.from_numpy(samples[:n_samples_train]).to(TORCH_DTYPE) XY_test[key] = torch.from_numpy(samples[n_samples_train:]).to(TORCH_DTYPE) elif val_size != 0.0 and test_size == 0.0: ## we have only training and validation set XY_train[key] = torch.from_numpy(samples[:n_samples_train]).to(TORCH_DTYPE) XY_val[key] = torch.from_numpy(samples[n_samples_train:]).to(TORCH_DTYPE) else: ## we have training, validation and test set XY_train[key] = torch.from_numpy(samples[:n_samples_train]).to(TORCH_DTYPE) XY_val[key] = torch.from_numpy(samples[n_samples_train:-n_samples_test]).to(TORCH_DTYPE) XY_test[key] = torch.from_numpy(samples[n_samples_train+n_samples_val:]).to(TORCH_DTYPE) ## Set name for resultsAnalysis train_dataset = self.__get_parameter(train_dataset = f"train_{dataset}_{train_size:0.2f}") validation_dataset = self.__get_parameter(validation_dataset =f"validation_{dataset}_{val_size:0.2f}") test_dataset = self.__get_parameter(test_dataset = f"test_{dataset}_{test_size:0.2f}") else: ## Multi-Dataset ## Get the names of the datasets datasets = list(self.data.keys()) validation_dataset = self.__get_parameter(validation_dataset=validation_dataset) test_dataset = self.__get_parameter(test_dataset=test_dataset) train_dataset_name, val_dataset_name, test_dataset_name = train_dataset, validation_dataset, test_dataset ## Collect the number of samples for each dataset n_samples_train, n_samples_val, n_samples_test = 0, 0, 0 check(train_dataset in datasets, KeyError, f'{train_dataset} Not Loaded!') if validation_dataset is not None and validation_dataset not in datasets: log.warning(f'Validation Dataset [{validation_dataset}] Not Loaded. The training will continue without validation') if test_dataset is not None and test_dataset not in datasets: log.warning(f'Test Dataset [{test_dataset}] Not Loaded. The training will continue without test') ## Split into train, validation and test XY_train, XY_val, XY_test = {}, {}, {} n_samples_train = self.num_of_samples[train_dataset] XY_train = {key: torch.from_numpy(val).to(TORCH_DTYPE) for key, val in self.data[train_dataset].items()} if validation_dataset in datasets: n_samples_val = self.num_of_samples[validation_dataset] XY_val = {key: torch.from_numpy(val).to(TORCH_DTYPE) for key, val in self.data[validation_dataset].items()} if test_dataset in datasets: n_samples_test = self.num_of_samples[test_dataset] XY_test = {key: torch.from_numpy(val).to(TORCH_DTYPE) for key, val in self.data[test_dataset].items()} for key in XY_train.keys(): assert n_samples_train == XY_train[key].shape[0], f'The number of train samples {n_samples_train}!={XY_train[key].shape[0]} not compliant.' if key in XY_val: assert n_samples_val == XY_val[key].shape[0], f'The number of val samples {n_samples_val}!={XY_val[key].shape[0]} not compliant.' if key in XY_test: assert n_samples_test == XY_test[key].shape[0], f'The number of test samples {n_samples_test}!={XY_test[key].shape[0]} not compliant.' assert n_samples_train > 0, f'There are {n_samples_train} samples for training.' self.run_training_params['n_samples_train'] = n_samples_train self.run_training_params['n_samples_val'] = n_samples_val self.run_training_params['n_samples_test'] = n_samples_test train_batch_size, val_batch_size, test_batch_size = self.__get_batch_sizes(train_batch_size, val_batch_size, test_batch_size) ## Define the optimizer optimizer = self.__inizilize_optimizer(optimizer, optimizer_params, optimizer_defaults, add_optimizer_params, add_optimizer_defaults, models, lr, lr_param) self.run_training_params['optimizer'] = optimizer.name self.run_training_params['optimizer_params'] = optimizer.optimizer_params self.run_training_params['optimizer_defaults'] = optimizer.optimizer_defaults self.optimizer = optimizer.get_torch_optimizer() ## Get num_of_epochs num_of_epochs = self.__get_parameter(num_of_epochs = num_of_epochs) ## Define the loss functions minimize_gain = self.__get_parameter(minimize_gain = minimize_gain) self.run_training_params['minimizers'] = {} for name, values in self.model_def['Minimizers'].items(): self.loss_functions[name] = CustomLoss(values['loss']) self.run_training_params['minimizers'][name] = {} self.run_training_params['minimizers'][name]['A'] = values['A'] self.run_training_params['minimizers'][name]['B'] = values['B'] self.run_training_params['minimizers'][name]['loss'] = values['loss'] if name in minimize_gain: self.run_training_params['minimizers'][name]['gain'] = minimize_gain[name] ## Clean the dict of the training parameter del self.run_training_params['minimize_gain'] del self.run_training_params['lr'] del self.run_training_params['lr_param'] if not recurrent_train: del self.run_training_params['connect'] del self.run_training_params['closed_loop'] del self.run_training_params['step'] del self.run_training_params['prediction_samples'] if early_stopping is None: del self.run_training_params['early_stopping'] del self.run_training_params['early_stopping_params'] ## Create the train, validation and test loss dictionaries train_losses, val_losses, test_losses = {}, {}, {} for key in self.model_def['Minimizers'].keys(): train_losses[key] = [] if n_samples_val > 0: val_losses[key] = [] ## Check the needed keys are in the datasets keys = set(self.model_def['Inputs'].keys()) keys |= {value['A'] for value in self.model_def['Minimizers'].values()}|{value['B'] for value in self.model_def['Minimizers'].values()} keys -= set(self.model_def['Relations'].keys()) keys -= set(self.model_def['States'].keys()) keys -= set(self.model_def['Outputs'].keys()) if 'connect' in self.run_training_params: keys -= set(self.run_training_params['connect'].keys()) if 'closed_loop' in self.run_training_params: keys -= set(self.run_training_params['closed_loop'].keys()) check(set(keys).issubset(set(XY_train.keys())), KeyError, f"Not all the mandatory keys {keys} are present in the training dataset {set(XY_train.keys())}.") # Evaluate the number of update for epochs and the unsued samples if recurrent_train: list_of_batch_indexes = range(0, (n_samples_train - train_batch_size - prediction_samples + 1), (train_batch_size + step)) check(n_samples_train - train_batch_size - prediction_samples + 1 > 0, ValueError, f"The number of available sample are (n_samples_train ({n_samples_train}) - train_batch_size ({train_batch_size}) - prediction_samples ({prediction_samples}) + 1) = {n_samples_train - train_batch_size - prediction_samples + 1}.") update_per_epochs = (n_samples_train - train_batch_size - prediction_samples + 1)//(train_batch_size + step) + 1 unused_samples = n_samples_train - list_of_batch_indexes[-1] - train_batch_size - prediction_samples else: update_per_epochs = (n_samples_train - train_batch_size)/train_batch_size + 1 unused_samples = n_samples_train - update_per_epochs * train_batch_size self.run_training_params['update_per_epochs'] = update_per_epochs self.run_training_params['unused_samples'] = unused_samples ## Select the model select_model = self.__get_parameter(select_model = select_model) select_model_params = self.__get_parameter(select_model_params = select_model_params) selected_model_def = ModelDef(self.model_def.json) ## Show the training parameters self.visualizer.showTrainParams() import time ## start the train timer start = time.time() self.visualizer.showStartTraining() for epoch in range(num_of_epochs): ## TRAIN self.model.train() if recurrent_train: losses = self.__recurrentTrain(XY_train, n_samples_train, train_dataset_name, train_batch_size, minimize_gain, closed_loop, connect, prediction_samples, step, shuffle=shuffle_data, train=True) else: losses = self.__Train(XY_train, n_samples_train, train_batch_size, minimize_gain, shuffle=shuffle_data, train=True) ## save the losses for ind, key in enumerate(self.model_def['Minimizers'].keys()): train_losses[key].append(torch.mean(losses[ind]).tolist()) if n_samples_val > 0: ## VALIDATION self.model.eval() if recurrent_train: losses = self.__recurrentTrain(XY_val, n_samples_val, val_dataset_name, val_batch_size, minimize_gain, closed_loop, connect, prediction_samples, step, shuffle=False, train=False) else: losses = self.__Train(XY_val, n_samples_val, val_batch_size, minimize_gain, shuffle=False, train=False) ## save the losses for ind, key in enumerate(self.model_def['Minimizers'].keys()): val_losses[key].append(torch.mean(losses[ind]).tolist()) ## Early-stopping if callable(early_stopping): if early_stopping(train_losses, val_losses, early_stopping_params): log.info(f'Stopping the training at epoch {epoch} due to early stopping.') break if callable(select_model): if select_model(train_losses, val_losses, select_model_params): best_model_epoch = epoch selected_model_def.updateParameters(self.model) ## Visualize the training... self.visualizer.showTraining(epoch, train_losses, val_losses) self.visualizer.showWeightsInTrain(epoch = epoch) ## Save the training time end = time.time() ## Visualize the training time for key in self.model_def['Minimizers'].keys(): self.training[key] = {'train': train_losses[key]} if n_samples_val > 0: self.training[key]['val'] = val_losses[key] self.visualizer.showEndTraining(num_of_epochs-1, train_losses, val_losses) self.visualizer.showTrainingTime(end-start) ## Select the model if callable(select_model): log.info(f'Selected the model at the epoch {best_model_epoch+1}.') self.model = Model(selected_model_def) else: log.info('The selected model is the LAST model of the training.') self.resultAnalysis(train_dataset, XY_train, minimize_gain, closed_loop, connect, prediction_samples, step, train_batch_size) if self.run_training_params['n_samples_val'] > 0: self.resultAnalysis(validation_dataset, XY_val, minimize_gain, closed_loop, connect, prediction_samples, step, val_batch_size) if self.run_training_params['n_samples_test'] > 0: self.resultAnalysis(test_dataset, XY_test, minimize_gain, closed_loop, connect, prediction_samples, step, test_batch_size) self.visualizer.showResults() ## Get trained model from torch and set the model_def self.model_def.updateParameters(self.model)
def __recurrentTrain(self, data, n_samples, dataset_name, batch_size, loss_gains, closed_loop, connect, prediction_samples, step, shuffle=False, train=True): model_inputs = list(self.model_def['Inputs'].keys()) state_closed_loop = [key for key, value in self.model_def['States'].items() if 'closedLoop' in value.keys()] + list(closed_loop.keys()) state_connect = [key for key, value in self.model_def['States'].items() if 'connect' in value.keys()] + list(connect.keys()) non_mandatory_inputs = state_closed_loop + state_connect mandatory_inputs = list(set(model_inputs) - set(non_mandatory_inputs)) n_available_samples = n_samples - prediction_samples list_of_batch_indexes = list(range(n_available_samples)) ## Remove forbidden indexes in case of a multi-file dataset if dataset_name in self.multifile.keys(): ## Multi-file Dataset if n_samples == self.run_training_params['n_samples_train']: ## Training start_idx, end_idx = 0, n_samples elif n_samples == self.run_training_params['n_samples_val']: ## Validation start_idx, end_idx = self.run_training_params['n_samples_train'], self.run_training_params['n_samples_train'] + n_samples else: ## Test start_idx, end_idx = self.run_training_params['n_samples_train'] + self.run_training_params['n_samples_val'], self.run_training_params['n_samples_train'] + self.run_training_params['n_samples_val'] + n_samples forbidden_idxs = [] for i in self.multifile[dataset_name]: if i < end_idx and i > start_idx: forbidden_idxs.extend(range(i-prediction_samples, i, 1)) list_of_batch_indexes = [idx for idx in list_of_batch_indexes if idx not in forbidden_idxs] ## Loss vector check((batch_size+step)>0, ValueError, f"The batch_size+step must be greater than 0.") aux_losses = torch.zeros([len(self.model_def['Minimizers']), round(len(list_of_batch_indexes)/(batch_size+step))]) ## Update with virtual states self.model.update(closed_loop=closed_loop, connect=connect) X = {} batch_val = 0 while len(list_of_batch_indexes) >= batch_size: idxs = random.sample(list_of_batch_indexes, batch_size) if shuffle else list_of_batch_indexes[:batch_size] for num in idxs: list_of_batch_indexes.remove(num) if step > 0: if len(list_of_batch_indexes) >= step: step_idxs = random.sample(list_of_batch_indexes, step) if shuffle else list_of_batch_indexes[:step] for num in step_idxs: list_of_batch_indexes.remove(num) if train: self.optimizer.zero_grad() ## Reset the gradient ## Reset horizon_losses = {ind: [] for ind in range(len(self.model_def['Minimizers']))} for key in non_mandatory_inputs: if key in data.keys(): ## with data X[key] = data[key][idxs] else: ## with zeros window_size = self.input_n_samples[key] dim = self.model_def['Inputs'][key]['dim'] if key in model_inputs else self.model_def['States'][key]['dim'] X[key] = torch.zeros(size=(batch_size, window_size, dim), dtype=TORCH_DTYPE, requires_grad=False) self.states[key] = X[key] for horizon_idx in range(prediction_samples + 1): ## Get data for key in mandatory_inputs: X[key] = data[key][[idx+horizon_idx for idx in idxs]] ## Forward pass out, minimize_out, out_closed_loop, out_connect = self.model(X) if self.log_internal and train: internals_dict = {'XY':tensor_to_list(X),'out':out,'param':self.model.all_parameters,'closedLoop':self.model.closed_loop_update,'connect':self.model.connect_update} ## Loss Calculation for ind, (key, value) in enumerate(self.model_def['Minimizers'].items()): loss = self.loss_functions[key](minimize_out[value['A']], minimize_out[value['B']]) loss = (loss * loss_gains[key]) if key in loss_gains.keys() else loss ## Multiply by the gain if necessary horizon_losses[ind].append(loss) ## Update for key, val in out_closed_loop.items(): shift = val.shape[1] ## take the output time dimension X[key] = torch.roll(X[key], shifts=-1, dims=1) ## Roll the time window X[key][:, -shift:, :] = val ## substitute with the predicted value self.states[key] = X[key].clone() for key, value in out_connect.items(): X[key] = value self.states[key] = X[key].clone() if self.log_internal and train: internals_dict['state'] = self.states self.__save_internal('inout_'+str(batch_val)+'_'+str(horizon_idx),internals_dict) ## Calculate the total loss total_loss = 0 for ind in range(len(self.model_def['Minimizers'])): loss = sum(horizon_losses[ind])/(prediction_samples+1) aux_losses[ind][batch_val] = loss.item() total_loss += loss ## Gradient Step if train: total_loss.backward() ## Backpropagate the error self.optimizer.step() self.visualizer.showWeightsInTrain(batch = batch_val) batch_val += 1 ## Remove virtual states for key in (connect.keys() | closed_loop.keys()): if key in self.states.keys(): del self.states[key] ## return the losses return aux_losses def __Train(self, data, n_samples, batch_size, loss_gains, shuffle=True, train=True): check((n_samples - batch_size + 1) > 0, ValueError, f"The number of available sample are (n_samples_train - train_batch_size + 1) = {n_samples - batch_size + 1}.") if shuffle: randomize = torch.randperm(n_samples) data = {key: val[randomize] for key, val in data.items()} ## Initialize the train losses vector aux_losses = torch.zeros([len(self.model_def['Minimizers']),n_samples//batch_size]) for idx in range(0, (n_samples - batch_size + 1), batch_size): ## Build the input tensor XY = {key: val[idx:idx+batch_size] for key, val in data.items()} ## Reset gradient if train: self.optimizer.zero_grad() ## Model Forward _, minimize_out, _, _ = self.model(XY) ## Forward pass ## Loss Calculation total_loss = 0 for ind, (key, value) in enumerate(self.model_def['Minimizers'].items()): loss = self.loss_functions[key](minimize_out[value['A']], minimize_out[value['B']]) loss = (loss * loss_gains[key]) if key in loss_gains.keys() else loss ## Multiply by the gain if necessary aux_losses[ind][idx//batch_size] = loss.item() total_loss += loss ## Gradient step if train: total_loss.backward() self.optimizer.step() self.visualizer.showWeightsInTrain(batch = idx//batch_size) ## return the losses return aux_losses def resultAnalysis(self, dataset, data = None, minimize_gain = {}, closed_loop = {}, connect = {}, prediction_samples = None, step = 0, batch_size = None): import warnings with torch.inference_mode(): ## Init model for retults analysis self.model.eval() self.performance[dataset] = {} self.prediction[dataset] = {} A = {} B = {} total_losses = {} # Create the losses losses = {} for name, values in self.model_def['Minimizers'].items(): losses[name] = CustomLoss(values['loss']) recurrent = False if (closed_loop or connect or self.model_def['States']) and prediction_samples is not None: recurrent = True if data is None: check(dataset in self.data.keys(), ValueError, f'The dataset {dataset} is not loaded!') data = {key: torch.from_numpy(val).to(TORCH_DTYPE) for key, val in self.data[dataset].items()} n_samples = len(data[list(data.keys())[0]]) if recurrent: batch_size = batch_size if batch_size is not None else n_samples - prediction_samples model_inputs = list(self.model_def['Inputs'].keys()) state_closed_loop = [key for key, value in self.model_def['States'].items() if 'closedLoop' in value.keys()] + list(closed_loop.keys()) state_connect = [key for key, value in self.model_def['States'].items() if 'connect' in value.keys()] + list(connect.keys()) non_mandatory_inputs = state_closed_loop + state_connect mandatory_inputs = list(set(model_inputs) - set(non_mandatory_inputs)) for key, value in self.model_def['Minimizers'].items(): total_losses[key], A[key], B[key] = [], [], [] for horizon_idx in range(prediction_samples + 1): A[key].append([]) B[key].append([]) list_of_batch_indexes = list(range(n_samples - prediction_samples)) ## Remove forbidden indexes in case of a multi-file dataset if dataset in self.multifile.keys(): ## Multi-file Dataset if n_samples == self.run_training_params['n_samples_train']: ## Training start_idx, end_idx = 0, n_samples elif n_samples == self.run_training_params['n_samples_val']: ## Validation start_idx, end_idx = self.run_training_params['n_samples_train'], self.run_training_params['n_samples_train'] + n_samples else: ## Test start_idx, end_idx = self.run_training_params['n_samples_train'] + self.run_training_params['n_samples_val'], self.run_training_params['n_samples_train'] + self.run_training_params['n_samples_val'] + n_samples forbidden_idxs = [] for i in self.multifile[dataset]: if i < end_idx and i > start_idx: forbidden_idxs.extend(range(i-prediction_samples, i, 1)) list_of_batch_indexes = [idx for idx in list_of_batch_indexes if idx not in forbidden_idxs] X = {} ## Update with virtual states self.model.update(closed_loop=closed_loop, connect=connect) while len(list_of_batch_indexes) >= batch_size: idxs = list_of_batch_indexes[:batch_size] for num in idxs: list_of_batch_indexes.remove(num) if step > 0: if len(list_of_batch_indexes) >= step: step_idxs = list_of_batch_indexes[:step] for num in step_idxs: list_of_batch_indexes.remove(num) ## Reset horizon_losses = {key: [] for key in self.model_def['Minimizers'].keys()} for key in non_mandatory_inputs: if key in data.keys(): # and len(data[key]) >= (idx + self.input_n_samples[key]): ## with data X[key] = data[key][idxs] else: ## with zeros window_size = self.input_n_samples[key] dim = self.model_def['Inputs'][key]['dim'] if key in model_inputs else self.model_def['States'][key]['dim'] X[key] = torch.zeros(size=(batch_size, window_size, dim), dtype=TORCH_DTYPE, requires_grad=False) self.states[key] = X[key] for horizon_idx in range(prediction_samples + 1): ## Get data for key in mandatory_inputs: X[key] = data[key][[idx+horizon_idx for idx in idxs]] ## Forward pass out, minimize_out, out_closed_loop, out_connect = self.model(X) ## Loss Calculation for key, value in self.model_def['Minimizers'].items(): A[key][horizon_idx].append(minimize_out[value['A']]) B[key][horizon_idx].append(minimize_out[value['B']]) loss = losses[key](minimize_out[value['A']], minimize_out[value['B']]) loss = (loss * minimize_gain[key]) if key in minimize_gain.keys() else loss ## Multiply by the gain if necessary horizon_losses[key].append(loss) ## Update for key, val in out_closed_loop.items(): shift = val.shape[1] ## take the output time dimension X[key] = torch.roll(X[key], shifts=-1, dims=1) ## Roll the time window X[key][:, -shift:, :] = val ## substitute with the predicted value self.states[key] = X[key].clone() for key, value in out_connect.items(): X[key] = value self.states[key] = X[key].clone() ## Calculate the total loss for key in self.model_def['Minimizers'].keys(): loss = sum(horizon_losses[key]) / (prediction_samples + 1) total_losses[key].append(loss.detach().numpy()) for key, value in self.model_def['Minimizers'].items(): for horizon_idx in range(prediction_samples + 1): A[key][horizon_idx] = np.concatenate(A[key][horizon_idx]) B[key][horizon_idx] = np.concatenate(B[key][horizon_idx]) total_losses[key] = np.mean(total_losses[key]) else: if batch_size is None: batch_size = n_samples for key, value in self.model_def['Minimizers'].items(): total_losses[key], A[key], B[key] = [], [], [] for idx in range(0, (n_samples - batch_size + 1), batch_size): ## Build the input tensor XY = {key: val[idx:idx + batch_size] for key, val in data.items()} ## Model Forward _, minimize_out, _, _ = self.model(XY) ## Forward pass ## Loss Calculation for key, value in self.model_def['Minimizers'].items(): A[key].append(minimize_out[value['A']].numpy()) B[key].append(minimize_out[value['B']].numpy()) loss = losses[key](minimize_out[value['A']], minimize_out[value['B']]) loss = (loss * minimize_gain[key]) if key in minimize_gain.keys() else loss total_losses[key].append(loss.detach().numpy()) for key, value in self.model_def['Minimizers'].items(): A[key] = np.concatenate(A[key]) B[key] = np.concatenate(B[key]) total_losses[key] = np.mean(total_losses[key]) for ind, (key, value) in enumerate(self.model_def['Minimizers'].items()): A_np = np.array(A[key]) B_np = np.array(B[key]) self.performance[dataset][key] = {} self.performance[dataset][key][value['loss']] = np.mean(total_losses[key]).item() self.performance[dataset][key]['fvu'] = {} # Compute FVU residual = A_np - B_np error_var = np.var(residual) error_mean = np.mean(residual) #error_var_manual = np.sum((residual-error_mean) ** 2) / (len(self.prediction['B'][ind]) - 0) #print(f"{key} var np:{new_error_var} and var manual:{error_var_manual}") with warnings.catch_warnings(record=True) as w: self.performance[dataset][key]['fvu']['A'] = (error_var / np.var(A_np)).item() self.performance[dataset][key]['fvu']['B'] = (error_var / np.var(B_np)).item() if w and np.var(A_np) == 0.0 and np.var(B_np) == 0.0: self.performance[dataset][key]['fvu']['A'] = np.nan self.performance[dataset][key]['fvu']['B'] = np.nan self.performance[dataset][key]['fvu']['total'] = np.mean([self.performance[dataset][key]['fvu']['A'],self.performance[dataset][key]['fvu']['B']]).item() # Compute AIC #normal_dist = norm(0, error_var ** 0.5) #probability_of_residual = normal_dist.pdf(residual) #log_likelihood_first = sum(np.log(probability_of_residual)) p1 = -len(residual)/2.0*np.log(2*np.pi) with warnings.catch_warnings(record=True) as w: p2 = -len(residual)/2.0*np.log(error_var) p3 = -1 / (2.0 * error_var) * np.sum(residual ** 2) if w and p2 == np.float32(np.inf) and p3 == np.float32(-np.inf): p2 = p3 = 0.0 log_likelihood = p1+p2+p3 #print(f"{key} log likelihood second mode:{log_likelihood} = {p1}+{p2}+{p3} first mode: {log_likelihood_first}") total_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad) #TODO to be check the number is doubled #print(f"{key} total_params:{total_params}") aic = - 2 * log_likelihood + 2 * total_params #print(f"{key} aic:{aic}") self.performance[dataset][key]['aic'] = {'value':aic,'total_params':total_params,'log_likelihood':log_likelihood} # Prediction and target self.prediction[dataset][key] = {} self.prediction[dataset][key]['A'] = A_np.tolist() self.prediction[dataset][key]['B'] = B_np.tolist() ## Remove virtual states for key in (connect.keys() | closed_loop.keys()): if key in self.states.keys(): del self.states[key] self.performance[dataset]['total'] = {} self.performance[dataset]['total']['mean_error'] = np.mean([value for key,value in total_losses.items()]) self.performance[dataset]['total']['fvu'] = np.mean([self.performance[dataset][key]['fvu']['total'] for key in self.model_def['Minimizers'].keys()]) self.performance[dataset]['total']['aic'] = np.mean([self.performance[dataset][key]['aic']['value']for key in self.model_def['Minimizers'].keys()]) self.visualizer.showResult(dataset) def getWorkspace(self): return self.exporter.getWorkspace()
[docs] def saveTorchModel(self, name = 'net', model_folder = None, models = None): """ Saves the neural network model in PyTorch format. Parameters ---------- name : str, optional The name of the saved model file. Default is 'net'. model_folder : str or None, optional The folder to save the model file in. Default is None. models : list or None, optional A list of model names to save. If None, the entire model is saved. Default is None. Raises ------ RuntimeError If the model is not neuralized. Example ------- Example usage: >>> model = Modely() >>> model.neuralizeModel() >>> model.saveTorchModel(name='example_model', model_folder='path/to/save') """ check(self.neuralized == True, RuntimeError, 'The model is not neuralized yet!') if models is not None: if name == 'net': name += '_' + '_'.join(models) model_def = ModelDef() model_def.update(model_dict = {key: self.model_dict[key] for key in models if key in self.model_dict}) model_def.setBuildWindow(self.model_def['Info']['SampleTime']) model_def.updateParameters(self.model) model = Model(model_def.json) else: model = self.model self.exporter.saveTorchModel(model, name, model_folder)
[docs] def loadTorchModel(self, name = 'net', model_folder = None): """ Loads a neural network model from a PyTorch format file. Parameters ---------- name : str, optional The name of the model file to load. Default is 'net'. model_folder : str or None, optional The folder to load the model file from. Default is None. Raises ------ RuntimeError If the model is not neuralized. Example ------- Example usage: >>> model = Modely() >>> model.neuralizeModel() >>> model.loadTorchModel(name='example_model', model_folder='path/to/load') """ check(self.neuralized == True, RuntimeError, 'The model is not neuralized yet.') self.exporter.loadTorchModel(self.model, name, model_folder)
[docs] def saveModel(self, name = 'net', model_path = None, models = None): """ Saves the neural network model definition in a json file. Parameters ---------- name : str, optional The name of the saved model file. Default is 'net'. model_path : str or None, optional The path to save the model file. Default is None. models : list or None, optional A list of model names to save. If None, the entire model is saved. Default is None. Raises ------ RuntimeError If the network has not been defined. Example ------- Example usage: >>> model = Modely() >>> model.neuralizeModel() >>> model.saveModel(name='example_model', model_path='path/to/save') """ if models is not None: if name == 'net': name += '_' + '_'.join(models) model_def = ModelDef() model_def.update(model_dict = {key: self.model_dict[key] for key in models if key in self.model_dict}) model_def.setBuildWindow(self.model_def['Info']['SampleTime']) model_def.updateParameters(self.model) else: model_def = self.model_def check(model_def.isDefined(), RuntimeError, "The network has not been defined.") self.exporter.saveModel(model_def.json, name, model_path)
[docs] def loadModel(self, name = None, model_folder = None): """ Loads a neural network model from a json file containing the model definition. Parameters ---------- name : str or None, optional The name of the model file to load. Default is 'net'. model_folder : str or None, optional The folder to load the model file from. Default is None. Raises ------ RuntimeError If there is an error loading the network. Example ------- Example usage: >>> model = Modely() >>> model.loadModel(name='example_model', model_folder='path/to/load') """ if name is None: name = 'net' model_def = self.exporter.loadModel(name, model_folder) check(model_def, RuntimeError, "Error to load the network.") self.model_def = ModelDef(model_def) self.model = None self.neuralized = False self.traced = False
[docs] def exportPythonModel(self, name = 'net', model_path = None, models = None): """ Exports the neural network model as a standalone PyTorch Module class. Parameters ---------- name : str, optional The name of the exported model file. Default is 'net'. model_path : str or None, optional The path to save the exported model file. Default is None. models : list or None, optional A list of model names to export. If None, the entire model is exported. Default is None. Raises ------ RuntimeError If the network has not been defined. If the model is traced and cannot be exported to Python. If the model is not neuralized. Example ------- Example usage: >>> model = Modely(name='example_model') >>> model.neuralizeModel() >>> model.exportPythonModel(name='example_model', model_path='path/to/export') """ if models is not None: if name == 'net': name += '_' + '_'.join(models) model_def = ModelDef() model_def.update(model_dict = {key: self.model_dict[key] for key in models if key in self.model_dict}) model_def.setBuildWindow(self.model_def['Info']['SampleTime']) model_def.updateParameters(self.model) model = Model(model_def.json) else: model_def = self.model_def model = self.model #check(model_def['States'] == {}, TypeError, "The network has state variables. The export to python is not possible.") check(model_def.isDefined(), RuntimeError, "The network has not been defined.") check(self.traced == False, RuntimeError, 'The model is traced and cannot be exported to Python.\n Run neuralizeModel() to recreate a standard model.') check(self.neuralized == True, RuntimeError, 'The model is not neuralized yet.') self.exporter.saveModel(model_def.json, name, model_path) self.exporter.exportPythonModel(model_def, model, name, model_path)
[docs] def importPythonModel(self, name = None, model_folder = None): """ Imports a neural network model from a standalone PyTorch Module class. Parameters ---------- name : str or None, optional The name of the model file to import. Default is 'net'. model_folder : str or None, optional The folder to import the model file from. Default is None. Raises ------ RuntimeError If there is an error loading the network. Example ------- Example usage: >>> model = Modely() >>> model.importPythonModel(name='example_model', model_folder='path/to/import') """ if name is None: name = 'net' model_def = self.exporter.loadModel(name, model_folder) check(model_def is not None, RuntimeError, "Error to load the network.") self.neuralizeModel(model_def=model_def) self.model = self.exporter.importPythonModel(name, model_folder) self.traced = True self.model_def.updateParameters(self.model)
[docs] def exportONNX(self, inputs_order, outputs_order, models = None, name = 'net', model_folder = None): """ Exports the neural network model to an ONNX file. ----- .. note:: The input_order must contain all the inputs and states of the model in the order that you want to export them. Parameters ---------- inputs_order : list The order of the input and state variables. outputs_order : list The order of the output variables. models : list or None, optional A list of model names to export. If None, the entire model is exported. Default is None. name : str, optional The name of the exported ONNX file. Default is 'net'. model_folder : str or None, optional The folder to save the exported ONNX file. Default is None. Raises ------ RuntimeError If the network has not been defined. If the model is traced and cannot be exported to ONNX. If the model is not neuralized. If the model is loaded and not created. Example ------- Example usage: >>> input1 = Input('input1').last() >>> input2 = Input('input2').last() >>> out = Output('output1', input1+input2) >>> model = Modely() >>> model.neuralizeModel() >>> model.exportONNX(inputs_order=['input1', 'input2'], outputs_order=['output1'], name='example_model', model_folder='path/to/export') """ check(self.model_def.isDefined(), RuntimeError, "The network has not been defined.") check(self.traced == False, RuntimeError, 'The model is traced and cannot be exported to ONNX.\n Run neuralizeModel() to recreate a standard model.') check(self.neuralized == True, RuntimeError, 'The model is not neuralized yet.') check(self.model_def.model_dict != {}, RuntimeError, 'The model is loaded and not created.') model_def = ModelDef() if models is not None: if name == 'net': name += '_' + '_'.join(models) model_def.update(model_dict = {key: self.model_def.model_dict[key] for key in models if key in self.model_def.model_dict}) else: model_def.update(model_dict = self.model_def.model_dict) model_def.setBuildWindow(self.model_def['Info']['SampleTime']) model_def.updateParameters(self.model) model = Model(model_def.json) model.update() self.exporter.exportONNX(model_def, model, inputs_order, outputs_order, name, model_folder)
[docs] def onnxInference(self, inputs:dict, path:str): """ Run an inference session using an onnx model previously exported using the nnodely framework. ----- .. note:: Feed-Forward ONNX model For feed-forward models, the onnx model expect all the inputs and states to have 3 dimensions. The first dimension is the batch size, the second is the time window and the third is the feature dimension. .. note:: Recurrent ONNX model For recurrent models, the onnx model expect all the inputs to have 4 dimensions. The first dimension is the prediction horizon, the second is the batch size, the third is the time window and the fourth is the feature dimension. For recurrent models, the onnx model expect all the States to have 3 dimensions. The first dimension is the batch size, the second is the time window, the third is the feature dimension Parameters ---------- inputs : dict A dictionary containing the input and state variables to be used to make the inference. State variables are mandatory and are used to initialize the states of the model. path : str The path to the ONNX file to use. Raises ------ RuntimeError If the shape of the inputs are not equals to the ones defined in the onnx model. If the batch size is not equal for all the inputs and states. Example ------- feed-forward Example: >>> x = Input('x') >>> onnx_model_path = path/to/net.onnx >>> dummy_input = {'x':np.ones(shape=(3, 1, 1)).astype(np.float32)} >>> predictions = Modely().onnxInference(dummy_input, onnx_model_path) Recurrent Example: >>> x = Input('x') >>> y = State('y') >>> onnx_model_path = path/to/net.onnx >>> dummy_input = {'x':np.ones(shape=(3, 1, 1, 1)).astype(np.float32) 'y':np.ones(shape=(1, 1, 1)).astype(np.float32)} >>> predictions = Modely().onnxInference(dummy_input, onnx_model_path) """ return self.exporter.onnxInference(inputs, path)
[docs] def exportReport(self, name = 'net', model_folder = None): """ Generates a PDF report with plots containing the results of the training and validation of the neural network. Parameters ---------- name : str, optional The name of the exported report file. Default is 'net'. model_folder : str or None, optional The folder to save the exported report file. Default is None. Example ------- Example usage: >>> model = Modely() >>> model.neuralizeModel() >>> model.trainModel(train_dataset='train_dataset', validation_dataset='val_dataset', num_of_epochs=10) >>> model.exportReport(name='example_model', model_folder='path/to/export') """ self.exporter.exportReport(self, name, model_folder)
nnodely = Modely