Source code for nnodely.layers.fuzzify

import inspect, copy, textwrap, torch

import numpy as np
import torch.nn as nn

from collections.abc import Callable

from nnodely.basic.relation import NeuObj, Stream
from nnodely.basic.model import Model
from nnodely.support.utils import check, enforce_types
from nnodely.support.jsonutils import merge

fuzzify_relation_name = 'Fuzzify'

[docs] class Fuzzify(NeuObj): """ Represents a Fuzzify relation in the neural network model. Parameters ---------- output_dimension : int, optional The output dimension of the Fuzzify relation. If provided, `range` must also be provided and `centers` must be None. range : list, optional A list containing the start and end values for the range. Required if `output_dimension` is provided. centers : list, optional A list of center values for the fuzzy functions. Required if `output_dimension` is None. The `output_dimension` will be inferred from the number of centers provided. functions : str, list, or Callable, optional The fuzzy functions to use. Can be a string specifying a predefined function type, a custom function, or a list of callable functions. Default is 'Triangular'. Notes ----- .. note:: The predefined function types are 'Triangular' and 'Rectangular'. It is also possible to pass a list of custom functions. In this case, each center will be associated with the respective function in the list. Attributes ---------- relation_name : str The name of the relation. output_dimension : dict The output dimension of the Fuzzify relation. json : dict A dictionary containing the configuration of the Fuzzify relation. Examples -------- .. include:: /examples_basics/layer_module_ex/fuzzy.rst """ @enforce_types def __init__(self, output_dimension: int | None = None, range: list | None = None, *, centers: list | None = None, functions: str | list | Callable = 'Triangular'): self.relation_name = fuzzify_relation_name super().__init__('F' + fuzzify_relation_name + str(NeuObj.count)) self.json['Functions'][self.name] = {} if output_dimension is not None: check(range is not None, ValueError, 'if "output_dimension" is not None, "range" must be not setted') check(centers is None, ValueError, 'if "output_dimension" and "range" are not None, then "centers" must be None') self.output_dimension = {'dim': output_dimension} interval = ((range[1] - range[0]) / (output_dimension - 1)) self.json['Functions'][self.name]['centers'] = np.arange(range[0], range[1] + interval, interval).tolist() else: check(centers is not None, ValueError, 'if "output_dimension" is None and "centers" must be setted') self.output_dimension = {'dim': len(centers)} self.json['Functions'][self.name]['centers'] = np.array(centers).tolist() self.json['Functions'][self.name]['dim_out'] = copy.deepcopy(self.output_dimension) if type(functions) is str: self.json['Functions'][self.name]['functions'] = functions self.json['Functions'][self.name]['names'] = functions elif type(functions) is list: self.json['Functions'][self.name]['functions'] = [] self.json['Functions'][self.name]['names'] = [] for func in functions: code = textwrap.dedent(inspect.getsource(func)).replace('\"', '\'') self.json['Functions'][self.name]['functions'].append(code) self.json['Functions'][self.name]['names'].append(func.__name__) else: code = textwrap.dedent(inspect.getsource(functions)).replace('\"', '\'') self.json['Functions'][self.name]['functions'] = code self.json['Functions'][self.name]['names'] = functions.__name__ @enforce_types def __call__(self, obj: Stream) -> Stream: stream_name = fuzzify_relation_name + str(Stream.count) check(type(obj) is Stream, TypeError, f"The type of {obj} is {type(obj)} and is not supported for Fuzzify operation.") check('dim' in obj.dim and obj.dim['dim'] == 1, ValueError, 'Input dimension must be scalar') output_dimension = copy.deepcopy(obj.dim) output_dimension.update(self.output_dimension) stream_json = merge(self.json, obj.json) stream_json['Relations'][stream_name] = [fuzzify_relation_name, [obj.name], self.name] return Stream(stream_name, stream_json, output_dimension)
def return_fuzzify(json, xlim=None, num_points=1000): if xlim is not None: x = torch.from_numpy(np.linspace(xlim[0], xlim[1], num=num_points)) else: x = torch.from_numpy(np.linspace(json['centers'][0] - 2, json['centers'][-1] + 2, num=num_points)) chan_centers = np.array(json['centers']) activ_fun = {} if isinstance(json['names'], list): n_func = len(json['names']) else: n_func = 1 for i in range(len(chan_centers)): if json['functions'] == 'Triangular': activ_fun[i] = triangular(x, i, chan_centers).tolist() elif json['functions'] == 'Rectangular': activ_fun[i] = rectangular(x, i, chan_centers).tolist() else: if isinstance(json['names'], list): if i >= n_func: func_idx = i - round(n_func * (i // n_func)) else: func_idx = i exec(json['functions'][func_idx], globals()) function_to_call = globals()[json['names'][func_idx]] else: exec(json['functions'], globals()) function_to_call = globals()[json['names']] activ_fun[i] = custom_function(function_to_call, x, i, chan_centers).tolist() return x.tolist(), activ_fun def triangular(x, idx_channel, chan_centers): # Compute the number of channels num_channels = len(chan_centers) # First dimension of activation if idx_channel == 0: if num_channels != 1: ampl = chan_centers[1] - chan_centers[0] act_fcn = torch.minimum(torch.maximum(-(x - chan_centers[0]) / ampl + 1, torch.tensor(0.0)), torch.tensor(1.0)) else: # In case the user only wants one channel act_fcn = 1 elif idx_channel != 0 and idx_channel == (num_channels - 1): ampl = chan_centers[-1] - chan_centers[-2] act_fcn = torch.minimum(torch.maximum((x - chan_centers[-2]) / ampl, torch.tensor(0.0)), torch.tensor(1.0)) else: ampl_1 = chan_centers[idx_channel] - chan_centers[idx_channel - 1] ampl_2 = chan_centers[idx_channel + 1] - chan_centers[idx_channel] act_fcn = torch.minimum(torch.maximum((x - chan_centers[idx_channel - 1]) / ampl_1, torch.tensor(0.0)), torch.maximum(-(x - chan_centers[idx_channel]) / ampl_2 + 1, torch.tensor(0.0))) return act_fcn def rectangular(x, idx_channel, chan_centers): ## compute number of channels num_channels = len(chan_centers) ## First dimension of activation if idx_channel == 0: if num_channels != 1: width = abs(chan_centers[idx_channel + 1] - chan_centers[idx_channel]) / 2 act_fcn = torch.where(x < (chan_centers[idx_channel] + width), 1.0, 0.0) else: # In case the user only wants one channel act_fcn = 1.0 elif idx_channel != 0 and idx_channel == (num_channels - 1): width = abs(chan_centers[idx_channel] - chan_centers[idx_channel - 1]) / 2 act_fcn = torch.where(x >= chan_centers[idx_channel] - width, 1.0, 0.0) else: width_forward = abs(chan_centers[idx_channel + 1] - chan_centers[idx_channel]) / 2 width_backward = abs(chan_centers[idx_channel] - chan_centers[idx_channel - 1]) / 2 act_fcn = torch.where((x >= chan_centers[idx_channel] - width_backward) & (x < chan_centers[idx_channel] + width_forward), 1.0, 0.0) return act_fcn def custom_function(func, x, idx_channel, chan_centers): act_fcn = func(x - chan_centers[idx_channel]) return act_fcn class Fuzzify_Layer(nn.Module): def __init__(self, params): super().__init__() self.centers = params['centers'] self.function = params['functions'] self.dimension = params['dim_out']['dim'] self.name = params['names'] if type(self.name) is list: self.n_func = len(self.name) for func, name in zip(self.function, self.name): ## Add the function to the globals try: code = 'import torch\n@torch.fx.wrap\n' + func exec(code, globals()) except Exception as e: check(False, RuntimeError, f"An error occurred when running the function '{name}':\n {e}") else: self.n_func = 1 if self.name not in ['Triangular', 'Rectangular']: ## custom function ## Add the function to the globals try: code = 'import torch\n@torch.fx.wrap\n' + self.function exec(code, globals()) except Exception as e: check(False, RuntimeError, f"An error occurred when running the function '{self.name}':\n {e}") def forward(self, x): res = torch.zeros_like(x).repeat(1, 1, self.dimension) if self.function == 'Triangular': for i in range(len(self.centers)): slicing(res, torch.tensor(i), triangular(x, i, self.centers)) elif self.function == 'Rectangular': for i in range(len(self.centers)): slicing(res, torch.tensor(i), rectangular(x, i, self.centers)) else: ## Custom_function if self.n_func == 1: # Retrieve the function object from the globals dictionary function_to_call = globals()[self.name] for i in range(len(self.centers)): slicing(res, torch.tensor(i), custom_function(function_to_call, x, i, self.centers)) else: ## we have multiple functions for i in range(len(self.centers)): if i >= self.n_func: func_idx = i - round(self.n_func * (i // self.n_func)) else: func_idx = i function_to_call = globals()[self.name[func_idx]] slicing(res, torch.tensor(i), custom_function(function_to_call, x, i, self.centers)) return res @torch.fx.wrap def slicing(res, i, x): res[:, :, i:i + 1] = x def createFuzzify(self, *params): return Fuzzify_Layer(params[0]) setattr(Model, fuzzify_relation_name, createFuzzify)