import hashlib
import io
import shutil
import zipfile
from abc import ABC, abstractmethod
from os import listdir, makedirs, path
from typing import Iterable
import requests
from conmo.conf import Directory
[docs]class Dataset(ABC):
"""
Abstract base class for a Dataset.
This class is an abstract class from which other subclasses inherit and must not be instanciated directly.
"""
[docs] def __init__(self, name: str) -> None:
"""
Main constructor of the class.
Parameters
----------
name : str
The name given to the dataset.
"""
self.name = name
self.dataset_dir = path.join(Directory.DATA, self.name)
[docs] @abstractmethod
def fetch(self, out_dir: str) -> None:
"""
Fetch data to feed the pipeline.
Parameters
----------
out_dir : str
Directory where the dataset will be stored.
"""
[docs] @abstractmethod
def dataset_files(self) -> Iterable:
"""
Iterable of files included in the dataset.
"""
[docs] def show_start_message(self) -> None:
"""
Show starting step info message.
"""
print("\n+++ Dataset {} +++".format(self.name))
[docs] def is_dataset_ready(self) -> bool:
"""
Check if dataset has been already loaded/downloaded and parsed to package format.
"""
# Check dataset folder exists
if not path.exists(self.dataset_dir):
return False
# Check dataset folder has all dataset files
dir_files = [path.join(self.dataset_dir, f) for f in listdir(
self.dataset_dir) if path.isfile(path.join(self.dataset_dir, f))]
dataset_files = self.dataset_files()
if len(dataset_files) == 0:
return False
for dataset_file in dataset_files:
if dataset_file not in dir_files:
return False
# All OK
return True
[docs]class RemoteDataset(Dataset):
"""
Abstract base class for a RemoteDataset (downloadable).
"""
[docs] def __init__(self, url: str, file_format: str, checksum: str, checksum_format: str) -> None:
super().__init__(self.__class__.__name__)
self.url = url
self.file_format = file_format
self.checksum = checksum
self.checksum_format = checksum_format
[docs] @abstractmethod
def parse_to_package(self, raw_dir: str) -> None:
"""
Parse raw dataset to package format. Data and labels must be saved in parquet format.
More information about parquet format: https://parquet.apache.org/
Parameters
----------
raw_dir:
Directory where the dataset was downloaded from its source.
"""
[docs] @abstractmethod
def feed_pipeline(self, out_dir: str) -> None:
"""
Copy selected data file to pipeline step folder.
"""
[docs] def fetch(self, out_dir: str) -> None:
"""
Fetch data to feed the pipeline.
Parameters
----------
out_dir : str
Directory where the dataset will be stored.
"""
self.show_start_message()
# Check if dataset is already downloaded and parsed to package format
if not self.is_dataset_ready():
# Create download (raw) folder and download the dataset
makedirs(self.dataset_dir, exist_ok=True)
raw_dir = path.join(self.dataset_dir, "raw")
self.download(raw_dir)
# Parse downloaded files to package format
print("Parsing downloaded files to package format")
self.parse_to_package(raw_dir)
# Remove raw files to save disk space
shutil.rmtree(raw_dir, ignore_errors=True)
# Copy data to pipeline step directory
self.feed_pipeline(out_dir)
[docs] def download(self, out_dir: str) -> None:
"""
Download a Dataset from a remote URL.
"""
print("Downloading data from " + self.url)
r = requests.get(self.url, stream=True)
if not r.ok:
raise ConnectionError(
"An error occurred downloading {}.".format(self.name))
if not self.check_checksum(r):
raise RuntimeError(
"{} has a checksum differing from expected, file may be corrupted.").format(self.name)
self.extract_data(r, out_dir)
[docs] def check_checksum(self, response: object) -> bool:
"""
Checks if the checksum of the downloaded file corresponds to the one provided in the class.
For security e integrity issues. Currently only the md5 algorithm is integrated.
Parameters
----------
response: Object
Response object returned by the get method of the Requests library.
Returns
-------
Boolean variable indicating whether the comparison of the hash with the checksum was successful or not.
"""
if self.checksum_format == 'md5':
# MD5 checksum
md5 = hashlib.md5(response.content).hexdigest()
if md5 == self.checksum:
return True
else:
return False
[docs]class LocalDataset(Dataset):
# Habrá dos funciones: una para generar el dataframe en la carpeta data y otro para feedear el pipeline a partir del nombre y la carpeta ya creada, para no tener que pasar siempre un dataframe como parametro
"""
Abstract base class for a LocalDataset (loadable).
"""
[docs] def __init__(self, path: str) -> None:
"""
Constructor of local dataset.
Parameters
----------
path : str
Absolute path to the folder where the dataset is located in your disk.
See the example of batteries_degradation.py.
"""
super().__init__(self.__class__.__name__)
self.path = path
[docs] @abstractmethod
def load(self) -> None:
"""
Parse raw dataset to package format. Data and labels must be saved in parquet format.
More information about parquet format: https://parquet.apache.org/
"""
[docs] @abstractmethod
def feed_pipeline(self, out_dir: str) -> None:
"""
Copy selected data file to pipeline step folder.
Parameters
----------
out_dir:
Directory where the dataset was originally stored.
"""
[docs] def fetch(self, out_dir: str) -> None:
"""
Fetch data to feed the pipeline.
Parameters
----------
out_dir : str
Directory where the dataset will be stored.
"""
self.show_start_message()
# Check if dataset is already in Conmo's datasets folder
if not self.is_dataset_ready():
# Create download (raw) folder
makedirs(self.dataset_dir, exist_ok=True)
raw_dir = path.join(self.dataset_dir, "raw")
# Parse local files to package format
print("Parsing local files to package format")
self.load()
# Remove raw files to save disk space
shutil.rmtree(raw_dir, ignore_errors=True)
# Copy data to pipeline step directory
self.feed_pipeline(out_dir)