Source code for conmo.splitters.sklearn_splitter

from typing import Iterable, Optional, Union

import numpy as np
import pandas as pd
from sklearn.model_selection import (GroupKFold, GroupShuffleSplit, KFold,
                                     LeaveOneGroupOut, LeaveOneOut,
                                     LeavePGroupsOut, LeavePOut,
                                     PredefinedSplit, RepeatedKFold,
                                     RepeatedStratifiedKFold, ShuffleSplit,
                                     StratifiedKFold, StratifiedShuffleSplit,
                                     TimeSeriesSplit)

from conmo.conf import Index
from conmo.splitters.splitter import Splitter


[docs]class SklearnSplitter(Splitter):
[docs] def __init__(self, splitter: Union[GroupKFold, GroupShuffleSplit, KFold, LeaveOneGroupOut, LeavePGroupsOut, LeaveOneOut, LeavePOut, PredefinedSplit, RepeatedKFold, RepeatedStratifiedKFold, ShuffleSplit, StratifiedKFold, StratifiedShuffleSplit, TimeSeriesSplit], groups: Optional[Iterable[int]] = None) -> None: self.splitter = splitter self.groups = groups
[docs] def split(self, in_dir: str, out_dir: str) -> None: self.show_start_message() data, labels = self.load_input(in_dir) # Check previous splitting if self.already_splitted(data) == True or self.already_splitted(labels) == True: raise RuntimeError("Dataset already splitted.") # Extract sequences of data (both DATA and LABELS must be equal) if (data.index.get_level_values(Index.SEQUENCE).unique() != labels.index.get_level_values(Index.SEQUENCE).unique()).any(): raise RuntimeError( "Sequence indexes of DATA and LABELS does not match.") sequences = data.index.get_level_values(Index.SEQUENCE).unique() # Split data by calling sklearn split function over sequences data_col = [] data_index = [] labels_col = [] labels_index = [] for fold_idx, (train_idx, test_idx) in enumerate(self.splitter.split(sequences, groups=self.groups)): # Sklearn split function returns indexes of sequences, not sequences directly, so must access their value first (sequences[idx]) # This loop provides both FOLD (fold_idx) of the current split and the TRAIN/TEST (train_idx, test_idx) sets # DATA data_col_fold, data_index_fold = self.extract_fold( data, sequences, fold_idx+1, train_idx, test_idx) data_col.extend(data_col_fold) data_index.extend(data_index_fold) # LABELS labels_col_fold, labels_index_fold = self.extract_fold( labels, sequences, fold_idx+1, train_idx, test_idx) labels_col.extend(labels_col_fold) labels_index.extend(labels_index_fold) # Generate output DataFrames data = self.to_dataframe(data, data_col, data_index) labels = self.to_dataframe(labels, labels_col, labels_index) self.save_output(out_dir, data, labels)
[docs] def extract_fold(self, df: pd.DataFrame, sequences: np.ndarray, fold: int, train_idx: np.ndarray, test_idx: np.ndarray) -> (np.ndarray, np.ndarray): # Generate array of column values of the fold data = np.concatenate([df.loc[sequences[train_idx], :].to_numpy( ), df.loc[sequences[test_idx], :].to_numpy()]) # Extract number of samples of each sequence (1 if no 'time' index is present) (1D array of each sequence count) train_samples = df.loc[sequences[train_idx], :].groupby(level=Index.SEQUENCE).size().to_numpy() test_samples = df.loc[sequences[test_idx], :].groupby(level=Index.SEQUENCE).size().to_numpy() df_index = df.index.to_frame() # Generate levels of the new dataframe individually # Time level, if present, must contain the original values if Index.TIME in df_index: time_level = np.concatenate([df_index.loc[sequences[train_idx], Index.TIME].to_numpy( ), df_index.loc[sequences[test_idx], Index.TIME].to_numpy()]) # Sequence, Set and Fold levels must be generated for each fold sequence_level = np.repeat([np.arange( 1, train_idx.shape[0]+1), np.arange(1, test_idx.shape[0]+1)], np.concatenate([train_samples, test_samples])) # Convert 1D array to unique value train_samples = train_samples.sum() test_samples = test_samples.sum() # Set and Fold levels set_level = np.repeat([Index.SET_TRAIN, Index.SET_TEST], [ train_samples, test_samples]) fold_level = np.repeat(fold, train_samples + test_samples) # Generate array of index values of the fold if Index.TIME in df_index: index = np.concatenate((fold_level.reshape((-1,1)), set_level.reshape((-1,1)), sequence_level.reshape((-1,1)), time_level.reshape((-1,1))), axis=1, dtype=object) else: index = np.concatenate((fold_level.reshape((-1,1)), set_level.reshape((-1,1)), sequence_level.reshape((-1,1))), axis=1, dtype=object) return data, index
[docs] def to_dataframe(self, df: pd.DataFrame, data: np.ndarray, index: np.ndarray) -> pd.DataFrame: # Generate MultiIndex columns = [Index.FOLD, Index.SET] columns.extend(df.index.names) # To include previous index columns index = pd.DataFrame(index, columns=columns) # Generate DataFrame df = pd.DataFrame(data, columns=df.columns, index=pd.MultiIndex.from_frame(index)) df.sort_index(inplace=True) return df