Source code for conmo.datasets.nasa_turbofan_degradation

import shutil
from os import path
from typing import Iterable

import numpy as np
import pandas as pd

from conmo.conf import File, Index, Label
from conmo.datasets.dataset import RemoteDataset


[docs]class NASATurbofanDegradation(RemoteDataset): URL = "https://ti.arc.nasa.gov/c/6/" FILE_FORMAT = "zip" CHECKSUM = "79a22f36e80606c69d0e9e4da5bb2b7a" CHECKSUM_FORMAT = "md5" SUBDATASETS = { "FD001": { "train": 100, "test": 100 }, "FD002": { "train": 260, "test": 259 }, "FD003": { "train": 100, "test": 100 }, "FD004": { "train": 249, "test": 248 } } VARIABLES = ["setting_1", "setting_2", 'TRA', 'T2', 'T24', 'T30', 'T50', 'P2', 'P15', 'P30', 'Nf', 'Nc', 'epr', 'Ps30', 'phi', 'NRf', 'NRc', 'BPR', 'farB', 'htBleed', 'Nf_dmd', 'PCNfR_dmd', 'W31', 'W32'] LABEL = Label.RUL SEQUENCE_COLUMN = 'unit_number' TIME_COLUMN = 'time_cycles'
[docs] def __init__(self, subdataset: str) -> None: super().__init__(self.URL, self.FILE_FORMAT, self.CHECKSUM, self.CHECKSUM_FORMAT) if subdataset not in self.SUBDATASETS: raise RuntimeError("Invalid selected subdataset") self.subdataset = subdataset
[docs] def dataset_files(self) -> Iterable: files = [] for key in self.SUBDATASETS.keys(): files.append(path.join(self.dataset_dir, "{}_{}".format(key, File.DATA))) files.append(path.join(self.dataset_dir, "{}_{}".format(key, File.LABELS))) return files
[docs] def parse_to_package(self, raw_dir: str) -> None: columns = [self.SEQUENCE_COLUMN] columns.append(self.TIME_COLUMN) columns.extend(self.VARIABLES) for subdataset in self.SUBDATASETS: # Read raw files train = pd.read_csv(path.join(raw_dir, "train_" + subdataset + ".txt"), sep='\s+', header=None, names=columns) test = pd.read_csv(path.join(raw_dir, "test_" + subdataset + ".txt"), sep='\s+', header=None, names=columns) rul_test = pd.read_csv(path.join(raw_dir, "RUL_" + subdataset + ".txt"), sep='\s+', header=None, names=[self.LABEL]) # Modify unit_number for test subset before merging with train test.loc[:, self.SEQUENCE_COLUMN] += self.SUBDATASETS[subdataset]['train'] # Generate dataframe with multiindex SEQUENCE > TIME data = pd.concat([train, test], ignore_index=True) data.set_index( [self.SEQUENCE_COLUMN, self.TIME_COLUMN], inplace=True) data.rename_axis(index={self.SEQUENCE_COLUMN: Index.SEQUENCE, self.TIME_COLUMN: Index.TIME}, inplace=True) data.sort_index(inplace=True) # Generate labels according to data indexes labels = pd.DataFrame(index=data.index.unique( level=Index.SEQUENCE), columns=[self.LABEL]) labels.loc[:self.SUBDATASETS[subdataset]['train'], self.LABEL] = 0 labels.loc[self.SUBDATASETS[subdataset]['train']+1:, self.LABEL] = rul_test.loc[:, self.LABEL].to_numpy() # Save dataframes data.to_parquet(path.join(self.dataset_dir, "{}_{}".format( subdataset, File.DATA)), compression="gzip", index=True) labels.to_parquet(path.join(self.dataset_dir, "{}_{}".format( subdataset, File.LABELS)), compression="gzip", index=True)
[docs] def feed_pipeline(self, out_dir: str) -> None: shutil.copy(path.join(self.dataset_dir, "{}_{}".format( self.subdataset, File.DATA)), path.join(out_dir, File.DATA)) shutil.copy(path.join(self.dataset_dir, "{}_{}".format( self.subdataset, File.LABELS)), path.join(out_dir, File.LABELS))
[docs] def sklearn_predefined_split(self) -> Iterable[int]: """ Generates array of indexes of same length as sequences to be used with 'PredefinedSplit' Returns ------- array List with the index for each sequence of the dataset. """ # Generate array of indexes of same length as sequences to be used with 'PredefinedSplit' of 'scikit-learn' idx = np.empty(self.SUBDATASETS[self.subdataset]['train'] + self.SUBDATASETS[self.subdataset]['test'], dtype=int) # Set first sequences as train and latest to test (order set when generating DataFrame) idx[:self.SUBDATASETS[self.subdataset]['train']] = -1 idx[-self.SUBDATASETS[self.subdataset]['test']:] = 0 return idx