Source code for fuzz.engine

""" The machine to run the fuzzy logic """
# pylint: disable=invalid-name, fixme
from typing import Any, Dict, List, Tuple
from warnings import warn
import numpy as np

from .operators import DefuzzEnum, OperatorEnum, RuleAggregationEnum
from .kernel import Kernel
from .rules import RuleBase


[docs]class Engine: """ The Fuzzy Engine wraps the input kernels, rules and inference system to provide the fuzzyfy, defuzzyfy and generate surface methods. Usage steps: 1: create engine \n 2: add input kernels (see Kernel for further reference) \n 3: add inference system (Kernel or Takagi-Sugeno based) \n 4: add rules to map the input kernels to the inference system \n 5: call engine.fuzzyfy() to run the system \n 6: call engine.defuzzyfy() to reduce the fuzzy result to a single float number \n 7: call engine.gen_surface() to build a iterable cache-like map to greatly reduce time compute """
[docs] def __init__( self, operands: OperatorEnum = OperatorEnum.DEFAULT, rule_agg: RuleAggregationEnum = RuleAggregationEnum.MAX, defuzz_method: DefuzzEnum = DefuzzEnum.LINGUISTIC, ) -> None: """Initializes a new engine object Args: operands (OperandEnum, optional): Operation definitions for AND, OR and NOT methods. \ Defaults to OperandEnum.DEFAULT. """ # initialization self.operands = operands self.rule_agreggation = rule_agg self.defuzz_method = defuzz_method # builder self.input_kernel_set: Dict[str, Kernel] = {} self.inference_kernel: Kernel = None self.ruleset: Dict[str, List[RuleBase]] = {} # results self.actuation_signal: Dict[str, float] = {} self.membership_degree: Dict[str, float] = {} self.defuzzy_res: np.ndarray = None
def __repr__(self) -> str: """String representation Returns: str: the engine object """ return str(self.__dict__)
[docs] def add_kernel(self, name: str, kernel: Kernel): """Adds a Kernel object, to map a particular variable of interest to its membership functions Args: name (str): The name of the kernel. Various methods will call the kernel using this \ name as the key to a dict of type {name: kernel} kernel (Kernel): Kernel object, mapping the variable to its many membership functions Raises: TypeError: if the internal dictionary self.input_kernel_set gets corrupted by direct \ user manipulation Returns: Engine: self """ _typecheck(name, kernel) # if not kernel.check_normalized(): # warn(f"Kernel for {name} is not normalized") if not self.input_kernel_set: self.input_kernel_set = dict({name: kernel}) elif isinstance(self.input_kernel_set, dict): self.input_kernel_set[name] = kernel else: raise TypeError( f"Expected self.rules to be None or dict. Found {type(self.input_kernel_set)}" ) return self
[docs] def del_kernel(self, name: str) -> None: """Deletes a registered input kernel Args: name (str): the name of the registered kernel Raises: KeyError: if name is not found in self.input_kernel_set.keys() """ try: del self.input_kernel_set[name] except KeyError as error: raise KeyError(f"{name} not found in rules dict") from error
[docs] def add_inference_kernel(self, kernel: Kernel): """Adds a Kernel to map the inference system to its membership functions fuzzy output Args: kernel (Kernel): the kernel object mapping the inference system to membership functions Raises: TypeError: if object type != type(Kernel) Returns: Engine: self """ if not isinstance(kernel, Kernel): raise TypeError(f"Expected type Kernel for 'kernel'. Got {type(kernel)}") self.inference_kernel = kernel return self
[docs] def del_inference_kernel(self): """Deletes the registered inference kernel, if there is one""" self.inference_kernel = None
[docs] def add_rule(self, name: str, rule: RuleBase): """Add a declarative rule, mapping each input kernel membership values to the inference system membership functions. Args: name (str): must match the name of a KernelFuncMember registered at the Inference \ Kernel System. rule (Union[RuleBase, Dict[str, str]]): a rule object (AND, OR, NOT), or a dictionary \ to get the direct value of a specific membership function. Examples: - (i) If FOOD is GOOD then tip is High (direct access): \ fm.add_rule('High', {'food': 'good'}) - (ii) if SERVICE is BAD AND FOOD is RANCID then tip is Low (rule base access): \ fm.add_rule('Low', AND({'service': 'bad'}, {'food': 'rancid'})) See More: - RuleBase documentation Returns: Engine: self """ if not isinstance(rule, RuleBase): raise TypeError(f"Expected type RuleBase for 'rule'. Got {type(rule)}") if isinstance(rule, RuleBase): self._inject_operands(rule) if name in self.ruleset: self.ruleset[name].append(rule) else: self.ruleset[name] = [rule] return self
[docs] def delete_rule(self, name: str): """Deletes a registered rule at self.ruleset Args: name (str): the rule name Raises: KeyError: if name is not in self.ruleset.keys() """ try: del self.ruleset[name] except KeyError as error: raise KeyError(f"{name} not found in rules dict") from error
def _fuzzyfy(self, measurements: Dict[str, Any]) -> Dict[str, Dict[str, float]]: """Fuzzyfy the crisp measurement data for all registered kernels. Result is returned \ to the user as a nested dictionary but also stored in each kernel object. Args: measurements for each registered registered input kernel \ (in the format {'kernel_name': data}) Raises: KeyError: if there's a mismatch between input_kernel_set.keys() and measurements.keys() Returns: Dict[str, Dict[str, float]]: dict('kernel_name': dict('function_member': value)) """ if self.input_kernel_set.keys() != measurements.keys(): raise KeyError( "Could not match the ruleset data to registered ruleset functions.\nruleset_data:" f" {measurements.keys()}\nruleset: {self.input_kernel_set.keys()}" ) data_len = set() for data in measurements.values(): data_len.add(np.asarray(data).size) assert len(data_len) == 1, "Fuzzification expect all measurement data to be of equal lenght" res = {} for kkey, kernel in self.input_kernel_set.items(): res[kkey] = kernel(measurements[kkey]) return res def _aggregate( self, ) -> Dict[str, np.ndarray]: # aggregation (running all rules) and returning one value per rule for rkey, rulelist in self.ruleset.items(): rule_res = np.asfarray([rule(self.input_kernel_set) for rule in rulelist]) assert len(rule_res) >= 1, f"rule {rkey} returned no value" if rule_res[0].size > 1: agg_actuation = np.asfarray( [ self.rule_agreggation.value(rule_res[:, col]) for col in range(rule_res.shape[1]) ] ) else: agg_actuation = self.rule_agreggation.value(rule_res) self.actuation_signal[rkey] = np.asfarray(agg_actuation) return self.actuation_signal def _accumulate(self, granularity: float) -> Tuple[np.ndarray, np.ndarray]: """ In the accumulation phase, all inference rules are joined to form a single shape. \ The method traverses the inference system with granularity A and run each of its KMF with \ max activation set to aggregated rule value for each KMF. \ This method is implemented for Linguistic Inference Systems only and will raise exception \ if run with any other defuzzyfication method. Args: granularity (float): the "step size" of the iterator function Raises: ValueError: if _accumulate is called by an engine not running linguistic inference sys Returns: Tuple[np.ndarray, np.ndarray]: a mapping of x_values to y_values as a tuple of ndarray. """ if self.inference_kernel is None: raise ValueError("Engine is missing the inference kernel system.") if self.defuzz_method != DefuzzEnum.LINGUISTIC: raise ValueError( f"self._accumulate is not valid for defuzzification method {self.defuzz_method}" ) sample_size = round( (self.inference_kernel.max_v - self.inference_kernel.min_v) / granularity ) x_range = np.linspace(self.inference_kernel.min_v, self.inference_kernel.max_v, sample_size) y_stack = None acc_array = np.array(list(self.actuation_signal.values())) if acc_array.ndim == 1: y_range = np.zeros(sample_size) for rule, func in self.inference_kernel.input_functions.items(): acc = self.actuation_signal[rule] y_proponent = func(x_range, acc) y_range = np.maximum(y_range, y_proponent) if y_stack is not None: y_stack = np.vstack((y_stack, y_range)) else: y_stack = y_range else: keys = list(self.actuation_signal.keys()) for acc_point in acc_array.T: y_range = np.zeros(sample_size) point = dict(zip(keys, acc_point)) print(point) for rule, func in self.inference_kernel.input_functions.items(): acc = point[rule] y_proponent = func(x_range, acc) y_range = np.maximum(y_range, y_proponent) if y_stack is not None: y_stack = np.vstack((y_stack, y_range)) else: y_stack = y_range # print(y_stack) return x_range, y_stack def _takagi_sugeno(self, data: Any): """Transform the fuzzy result to a numerical float value.""" return sum( func(**data) * self.actuation_signal[rule] for rule, func in self.inference_kernel.input_functions.items() ) / sum(self.actuation_signal.values())
[docs] def run_fuzz(self, measurements: Dict[str, Any]) -> Dict[str, np.ndarray]: """Passing a dictionary of data, runs the Engine and return a fuzzy output mapped to the \ inference system Args: measurements (Dict[str, Any]): Dictionary mapping each input kernel to its data Returns: Dict[str, np.ndarray]: the fuzzy output of the inference system. """ self._check_coverage() self._fuzzyfy(measurements) return self._aggregate()
[docs] def run_defuzz(self, measurements: Dict[str, Any], granularity: float = None) -> np.ndarray: """Passing a dictionary of data and granularity (mandatory for Linguistic Defuzz) \ returns a crisp result of the inference system. Args: measurements (Dict[str, Any]): Dictionary mapping each input kernel to its data granularity (float): iteration granularity. Required for Linguistic Defuzz only. Raises: NotImplementedError: If Defuzz method is not implemented. Returns: np.ndarray: crisp values of the inference system. """ self.run_fuzz(measurements) if self.defuzz_method == DefuzzEnum.LINGUISTIC: assert granularity is not None, "Linguistic Defuzz requires a granularity param" x_range, y_range = self._accumulate(granularity) self.defuzzy_res = _centroid(x_range, y_range) elif self.defuzz_method == DefuzzEnum.TAKAGI_SUGENO: self.defuzzy_res = self._takagi_sugeno(measurements) else: raise NotImplementedError(f"defuzzyfy for {self.defuzz_method} is not implemented") return self.defuzzy_res
[docs] def gen_surface(self, map_size: int, granularity: float): """Very expensive operation, if used with Linguistic Fuzzy Systems. Args: granularity (Union[float, Dict[str, float]]): [description] Returns: [type]: [description] """ raise NotImplementedError # TODO: gen_surface needs to be redone
def _inject_operands(self, rule: RuleBase): rule.operand_set = self.operands if isinstance(rule.a, RuleBase): self._inject_operands(rule.a) if isinstance(rule.b, RuleBase): self._inject_operands(rule.b) def _check_coverage(self) -> None: coverage = np.array([f.check_coverage() for f in self.input_kernel_set.values()]) if not coverage.all(): warn( UserWarning(f"Variable description is incomplete. Coverage check: {list(coverage)}") )
def _typecheck(variable: str, kernel: Kernel): if not isinstance(variable, str): raise TypeError(f"Expected type str for 'variable'. Got {type(variable)}") if not isinstance(kernel, Kernel): raise TypeError(f"Expected type Kernel for 'kernel'. Got {type(kernel)}") def _centroid(x_range, y_range: np.ndarray): """Transform the fuzzy result to a numerical float value.""" if y_range.ndim == 1: return _func1d(y_range, x_range) if y_range.ndim == 2: return np.array([_func1d(y_row, x_range) for y_row in y_range]) raise NotImplementedError("_centroid is not defined for arrays with 3 or more dimensions") def _func1d(y_row, x_values): return np.asfarray(np.sum(x_values * y_row) / np.sum(y_row))