Source code for conmo.algorithms.keras_autoencoder

from typing import Union

import numpy as np
import pandas as pd
import tensorflow as tf
import tensorflow.keras as keras
from scipy.stats import chi2
from tensorflow.keras import layers, losses

from conmo.conf import Index, Label
from conmo.algorithms.algorithm import AnomalyDetectionThresholdBasedAlgorithm


[docs]class KerasAutoencoder(AnomalyDetectionThresholdBasedAlgorithm):
[docs] def __init__(self, encoding_dim: int = 32, optimizer: str = 'Adam', loss_f: str = 'mse', epochs: int = 2, batch_size: int = 64, random_seed: int = 11, threshold_mode: str = 'chi2', threshold_value: Union[int, float, None] = 0.95): super().__init__(threshold_mode, threshold_value) self.encoding_dim = encoding_dim self.optimizer = optimizer self.loss_f = loss_f self.epochs = epochs self.batch_size = batch_size self.random_seed = random_seed # Set random seed tf.random.set_seed(random_seed)
[docs] def fit_predict(self, data_train: pd.DataFrame, data_test: pd.DataFrame, labels_train: pd.DataFrame, labels_test: pd.DataFrame) -> pd.DataFrame: # Build the model # This is our input input_img = keras.Input(shape=(data_train.shape[1],)) # "encoded" is the encoded representation of the input encoded = layers.Dense(self.encoding_dim, activation='relu')(input_img) # "decoded" is the lossy reconstruction of the input decoded = layers.Dense( data_train.shape[1], activation='sigmoid')(encoded) # This model maps an input to its reconstruction autoencoder = keras.Model(input_img, decoded) # Compile the model autoencoder.compile(optimizer='adam', loss='binary_crossentropy') # TRAIN SET history = autoencoder.fit(data_train, data_train, epochs=self.epochs, batch_size=self.batch_size, shuffle=True, validation_data=(data_test, data_test)) # TEST SET # Calculate cutoff (anomaly_threshold) anomaly_threshold = self.find_anomaly_threshold( history.history['loss'], data_train.shape[1]) # Perform classification on test set recons = autoencoder.predict(data_test) recons_err = losses.get(self.loss_f)(data_test, recons).numpy() # Detect anomalies test_error = pd.DataFrame( recons_err, index=data_test.index, columns=['loss']) test_error.loc[:, Label.ANOMALY] = test_error.loc[:, 'loss'] > anomaly_threshold # Generate output dataframe if self.labels_per_sequence(labels_test): # Only labels per SEQUENCE output = test_error.groupby(level=Index.SEQUENCE)[ Label.ANOMALY].any() else: # Labels per TIME output = test_error.loc[:, Label.ANOMALY] output = pd.DataFrame(output, index=labels_test.index, columns=[ Label.ANOMALY]) return output
[docs] def find_anomaly_threshold(self, values: np.ndarray, n_features: int) -> float: if self.threshold_mode == 'chi2': return chi2.ppf(self.threshold_value, df=n_features) else: super().find_anomaly_threshold(values)