from abc import ABC
from datetime import datetime
from os import makedirs, path
from typing import Iterable, Optional
from conmo.algorithms.algorithm import Algorithm
from conmo.conf import Directory
from conmo.datasets.dataset import Dataset
from conmo.metrics.metric import Metric
from conmo.preprocesses.preprocess import Preprocess
from conmo.splitters.splitter import Splitter
[docs]class Pipeline(ABC):
DATASET_FOLDER = "01_Dataset"
SPLITTER_FOLDER = "02_Splitter"
PREPROCESSES_FOLDER = "03_Preprocesses"
ALGORITHMS_FOLDER = "04_Algorithms"
METRICS_FOLDER = "05_Metrics"
[docs] def __init__(self, dataset: Dataset, splitter: Optional[Splitter], preprocesses: Optional[Iterable[Preprocess]], algorithms: Iterable[Algorithm], metrics: Iterable[Metric]) -> None:
self.dataset = dataset
self.splitter = splitter
self.preprocesses = preprocesses
self.algorithms = algorithms
self.metrics = metrics
[docs] def run(self, pipe_dir: str, pipe_num: int, pipes: int) -> None:
"""
Contains all the logic for the execution of a particular pipeline, creating intermediate
directories for data passing and executing the relevant methods for each step.
Parameters
----------
pipe_dir: str
Name of the current pipeline directory.
pipe_num: int
Index of the current pipelines.
pipes: int
Total number of pipelines in the current experiment.
"""
print("\n**** START PIPELINE {:02}/{:02} ****".format(pipe_num, pipes))
self.generate_dirs(pipe_dir)
in_dir = None
out_dir = None
# Dataset
out_dir = path.join(pipe_dir, self.DATASET_FOLDER)
self.dataset.fetch(out_dir)
# Splitter
if self.splitter != None:
in_dir = out_dir
out_dir = path.join(pipe_dir, self.SPLITTER_FOLDER)
self.splitter.split(in_dir, out_dir)
# Preprocesses
if self.preprocesses != None:
for idx, preprocess in enumerate(self.preprocesses):
in_dir = out_dir
out_dir = path.join(pipe_dir, self.PREPROCESSES_FOLDER, "{:02}_{}".format(
idx+1, preprocess.__class__.__name__))
preprocess.apply(in_dir, out_dir)
# Algorithms
in_dir = out_dir
out_dir = path.join(pipe_dir, self.ALGORITHMS_FOLDER)
algs = []
for idx, algorithm in enumerate(self.algorithms):
algs.append(algorithm.execute(idx+1, in_dir, out_dir))
# Metrics
last_preprocess_dir = in_dir
algorithms_dir = out_dir
metrics_dir = path.join(pipe_dir, self.METRICS_FOLDER)
for idx, metric in enumerate(self.metrics):
metric.calculate(idx+1, algs, last_preprocess_dir,
algorithms_dir, metrics_dir)
print("\n**** END PIPELINE {:02}/{:02} ****\n".format(pipe_num, pipes))
[docs] def generate_dirs(self, pipe_dir: str) -> None:
"""
Auxiliary method to generate directories for each of the steps in the current pipeline.
Parameters
----------
pipe_dir: str
Name of the pipe directory.
"""
# Generate first level of directories
makedirs(path.join(pipe_dir, self.DATASET_FOLDER))
if self.splitter != None:
makedirs(path.join(pipe_dir, self.SPLITTER_FOLDER))
if self.preprocesses != None and len(self.preprocesses) > 0:
makedirs(path.join(pipe_dir, self.PREPROCESSES_FOLDER))
makedirs(path.join(pipe_dir, self.ALGORITHMS_FOLDER))
makedirs(path.join(pipe_dir, self.METRICS_FOLDER))
# Preprocesses: second level
if self.preprocesses != None and len(self.preprocesses) > 0:
for idx, preprocess in enumerate(self.preprocesses):
makedirs(path.join(pipe_dir, self.PREPROCESSES_FOLDER,
"{:02}_{}".format(idx+1, preprocess.__class__.__name__)))
[docs]class Experiment(ABC):
[docs] def __init__(self, pipelines: Iterable[Pipeline], analytics: Iterable, name=datetime.now().strftime('%Y_%m_%d-%H_%M_%S')):
self.pipelines = pipelines
self.analytics = analytics
self.name = name
[docs] def launch(self):
"""
Launchs the current experiment.
"""
print("\n##### EXPERIMENT {} #####".format(self.name))
pipes_dirs = self.generate_dirs()
# Pipelines
n_pipes = len(self.pipelines)
for idx, pipeline in enumerate(self.pipelines):
pipeline.run(pipes_dirs[idx], idx+1, n_pipes)
# Analytics
# TODO
[docs] def generate_dirs(self) -> Iterable[str]:
"""
Generates directories both for the experiment itself and for the pipelines it contains.
Returns
-------
pipes_dirs: Iterable[str]
Array containing the names of the directories of the pipes of the current experiment.
"""
# Experiment dir
exp_dir = path.join(Directory.EXPERIMENTS, self.name)
makedirs(exp_dir, exist_ok=True)
# Pipelines dirs
pipes_dirs = []
for idx, pipeline in enumerate(self.pipelines):
pipe_dir = path.join(exp_dir, "{:02}_Pipeline".format(idx+1))
makedirs(pipe_dir, exist_ok=True)
pipes_dirs.append(pipe_dir)
return pipes_dirs