Source code for api.data_processing

"""
data_processing.py
====================================
Data Processing API
"""

import sys, os, logging, datetime, copy
import pandas as pd
import numpy as np
import matplotlib
import matplotlib.pyplot as plt

sys.path.append("..")
from constrain.epreader import *
from constrain.datetimeep import *
from typing import Union


[docs] class DataProcessing: def __init__( self, data_path: str = None, data_source: str = None, timestamp_column_name: str = None, ): """Instantiate a data processing object to load datasets and manipulate data before feeding it to the verification process. Args: data (str): Path to the data (CSV format) to be loaded for processing. data_source (str): Data source name. Use `EnergyPlus` or `Other`. timestamp_column_name (str): Name of the column header that contains the time series timestamps. """ self.data = None if data_path is None: logging.error("A `data_path` argument should be provided.") return None if data_source is None: logging.error("A `data_source` argument should be provided.") return None # check if data file exists if os.path.isfile(data_path): try: if data_source.lower() == "energyplus": # Use CSVReader to parse EnergyPlus timestamps data = CSVReader(csv_file=data_path).getseries() data = DateTimeEP(data, 2000).transform() data.drop("Date/Time", inplace=True, axis=1) elif data_source.lower() == "bms": if timestamp_column_name is None: logging.error( "timestamp_column_name is required when data_source = 'Other'" ) return None data = pd.read_csv(data_path) if not timestamp_column_name in data.columns: logging.error( f"The data does not contain a column header named {timestamp_column_name}." ) return None data.set_index(timestamp_column_name, inplace=True) try: data.index = pd.to_datetime(data.index) except: logging.error( f"The data in {timestamp_column_name} could not be converted to Python datetime object. Make sure that the data is consistent defined as a set of date strings." ) return None else: logging.error(f"data_source = {data_source} is not allowed.") self.data = data except: logging.error( f"An error occured when opening {data_path}. Please make sure that the file can be opened, and/or that it contains the correct headers." ) return None else: logging.error(f"The file {data_path} does not exists.") return None
[docs] def slice( self, start_time: datetime.datetime, end_time: datetime.datetime, inplace: bool = False, ) -> Union[None, pd.DataFrame]: """Discard any data before `start_time` and after `end_time`. Args: start_time (datetime): Python datetime object used as the slice start date of the data. end_time (datetime): Python datetime object used as the slice end date of the data. inplace (bool, optional): Modify the dataset directly. Defaults to False. Returns: pd.DataFrame: Modified dataset """ if isinstance(start_time, datetime.datetime): if isinstance(end_time, datetime.datetime): if start_time > end_time: logging.error( "The end_time cannot be an earlier data than start_time." ) else: data_slice = self.data[start_time:end_time] if len(data_slice) == 0: logging.warning(f"Data slice contains no sample.") if inplace: self.data = data_slice else: return data_slice.copy( deep=True ) # probably not necessary, just to be safe else: logging.error("The end_time argument is not a Python datetime object.") else: logging.error("The start_time argument is not a Python datetime object.") return None
[docs] def add_parameter( self, name: str = None, value: float = None, inplace: bool = False ) -> Union[None, pd.DataFrame]: """Add a parameter to `data`. The parameter will be added as a constant value for all index of `data`. Args: name (str): Name of the parameter value (float): Value of the parameter. inplace (bool, optional): Modify the dataset directly. Defaults to False. Returns: pd.DataFrame: Modified dataset """ if name is None: logging.error("A parameter name should be specified.") return None if value is None: logging.error("A parameter value should be specified.") return None if inplace: self.data[name] = value else: d = self.data.copy( deep=True ) # deep copy to not change self.data in next line d[name] = value return d
[docs] def apply_function( self, variable_names: list = None, new_variable_name: str = None, function_to_apply: str = None, inplace: bool = False, ) -> Union[None, pd.DataFrame]: """Apply an aggregation function to a list of variables from the dataset. Args: variable_names (str): List of variables used as input to the function. All elements in variable_names need to be in self.data.columns new_variable_name (str): Name of the new variable containing the result of the function for each time stamp. function_to_apply (str): Name of the function to apply. Choices are: `sum`, `min`, `max`or `average` (or 'mean'). inplace (bool, optional): Modify the dataset directly. Defaults to False. Returns: pd.DataFrame: Modified dataset """ if variable_names is None: logging.error("A list of variables was not specified.") return None if isinstance(variable_names, list): if len(variable_names) == 0: logging.error("The variable name list is empty.") return None missing_variables = [] for v in variable_names: if not v in list(self.data.columns): missing_variables.append(v) if len(missing_variables) > 0: logging.error( f"Variable name(s) {missing_variables} not in the dataset." ) return None else: logging.error( f"A list of variable names should be passed as an argument not a {type(variable_names)}." ) return None if new_variable_name is None: logging.error("A new variable name should be provided.") return None if not function_to_apply.lower() in ["sum", "min", "max", "average"]: logging.error( f"The function to apply should be `sum`, `min`, `max`, or `average, not {function_to_apply.lower()}." ) return None str_to_func = { "sum": sum, "min": min, "max": max, "average": np.mean, "mean": np.mean, } agg = self.data.loc[:, variable_names].apply( str_to_func[function_to_apply.lower()], axis=1 ) if inplace: self.data[new_variable_name] = agg else: d = self.data.copy(deep=True) d[new_variable_name] = agg return d
[docs] def summary(self) -> dict: """Provide a summary of the dataset. Returns: Dict: Dictionary containing the following information: 1) Number of data points, 2) Resolution, 3) For each variables: minimum, maximum, mean, standard deviation. """ data_summary = {} data_summary["number_of_data_points"] = len(self.data) # Calculate average timestampe difference, i.e. average resolution # Report in seconds d = copy.deepcopy(self.data) d["date"] = self.data.index data_summary["average_resolution_in_second"] = ( d["date"].diff().fillna(pd.Timedelta(seconds=0))[1:].mean().seconds ) d.drop("date", inplace=True, axis=1) data_summary["variables_summary"] = {} for v in list(d.columns): data_summary["variables_summary"][v] = {} data_summary["variables_summary"][v]["minimum"] = d[v].min() data_summary["variables_summary"][v]["maximum"] = d[v].max() data_summary["variables_summary"][v]["mean"] = d[v].mean() data_summary["variables_summary"][v]["standard_deviation"] = np.std(d[v]) return data_summary
[docs] def concatenate( self, datasets: list = None, axis: int = None, inplace: bool = False ) -> Union[None, pd.DataFrame]: """Concatenate datasets. Duplicated columns (for horizontal concatenation) or rows (for vertical concatenation) are kept. Column names (for vertical concatenation) or indexes (for horizontal concatenation) need to match exactly. Args: datasets (list): List of datasets (pd.DataFrame) to concatenate with `data`. axis (int): 1 or 0. 1 performs a vertical concatenation and 0 performs a horizontal concatenation. inplace (bool, optional): Modify the dataset directly. Defaults to False. Returns: pd.DataFrame: Modified dataset """ if not isinstance(datasets, list): logging.error( f"A list of datasets must be provided. The datasets argument that was passed is {type(datasets)}." ) return None if len(datasets) == 0: logging.error("The list of dataset that was provided is empty.") return None if not axis in [0, 1]: logging.error("The axis argument should either be 1, or 0.") return None datasets = copy.deepcopy(datasets) datasets.insert(0, self.data) if axis == 1: # argument validation datasets_columns = [sorted(list(d.columns)) for d in datasets] if not all(c == datasets_columns[0] for c in datasets_columns): logging.error("The datasets must contain the same column headers.") return None # perform concatenation concatenated_datasets = pd.concat(datasets) concatenated_datasets.sort_index(axis="index", inplace=True) else: # axis == 0 # argument validation datasets_indexes = [d.index for d in datasets] if not all(len(i) == len(datasets_indexes[0]) for i in datasets_indexes): logging.error("The datasets must have the same indexes.") return None if not all(all(i == datasets_indexes[0]) for i in datasets_indexes): logging.error("The datasets must have the same indexes.") return None # perform concatenation concatenated_datasets = datasets[0] for ds in datasets[1:]: concatenated_datasets = pd.concat( [concatenated_datasets, ds], ignore_index=False, axis=1 ) if inplace: self.data = concatenated_datasets else: return concatenated_datasets
[docs] def check(self) -> dict: """Perform a sanity check on the data. Returns: Dict: Dictionary showing the number of missing values for each variable as well as the outliers. """ data_headers = list(self.data.columns) if len(data_headers) == 0: logging.eror("The data does not include any headers.") return None check_summary = {} for c in data_headers: check_summary[c] = {} # Look for missing data missing_values_count = self.data[c].isnull().sum() check_summary[c]["number_of_missing_values"] = missing_values_count # Look for outliers # 3x the standard deviation data = self.data[c].dropna() outliers = data[ ~data.apply(lambda v: np.abs(v - data.mean()) / data.std() < 3) ] if len(outliers) == 0 or (data.std() == 0): outliers = None check_summary[c]["outliers"] = outliers return check_summary
[docs] def fill_missing_values( self, method: str = None, variable_names: list = [], inplace: bool = False ) -> Union[None, pd.DataFrame]: """Fill missing values (NaN) in `data`. Args: method (str): Method to use to fill the missing values: 'linear' (treat values as equally spaced) or 'pad' (use existing values). variable_names (list, optional): List of variable names that need missing values to be filled. By default, fill all missing data in self.data inplace (bool, optional): Modify the dataset directly. Defaults to False. Returns: pd.DataFrame: Modified dataset """ if not method in ["linear", "pad"]: logging.error( f"The method should either be linear or bad but not {method}." ) return None if not isinstance(variable_names, list): logging.error( f"A list of variable names must be provided. The variables_name argument that was passed is {type(variable_names)}." ) return None if len(variable_names) == 0: variable_names = list(self.data.columns) missing_vars = [] for v in variable_names: if not v in list(self.data.columns): missing_vars.append(v) if len(missing_vars) > 0: logging.error(f"Variable(s) {missing_vars} not included in the data.") return None d = copy.deepcopy(self.data) for v in variable_names: d[v].interpolate(method=method, inplace=True) if inplace: self.data = d else: return d
[docs] def plot( self, variable_names: list = None, kind: str = None ) -> Union[matplotlib.axes.Axes, None]: """Create plots of timesteries data, or scatter plot between two variables Args: variable_names (list): List of variables to plot. The variables must be in the data. kind (str): Type of chart to plot, either'timeseries', or 'scatter'. - If 'timeseries' is used, all variable names provided in `variable_names` will be plotted against the index timestamp from `data` - If 'scatter' is used, the first variable provided in the list will be used as the x-axis, the other will be on the y-axis Returns: matplotlib.axes.Axes: Matplotlib axes object """ if not isinstance(variable_names, list): logging.error( f"A list of variable names must be provided. The variables_name argument that was passed is {type(variable_names)}." ) return None if len(variable_names) == 0: logging.error("The list of variable names that was provided is empty.") return None if not kind in ["timeseries", "scatter"]: logging.error( f"The kind of plot should be either timeseries or scatter but not {kind}." ) return None not_found_count = 0 found = 0 for v in variable_names: if not v in list(self.data.columns): logging.warning(f"{v} is not included in the data.") not_found_count += 1 else: found += 1 if not_found_count == len(variable_names): logging.error( "None of the specified variables were found in data, the plot cannot be generated." ) return None elif found < 2 and kind == "scatter": logging.error("A scatter plot requires at least two variables.") return None # Create groups if kind == "timeseries": groups = [v for v in variable_names[0:]] elif kind == "scatter": groups = [(variable_names[0], v) for v in variable_names[1:]] else: return None fig, ax = plt.subplots() for g in groups: if kind == "timeseries": ax.plot( self.data.index, self.data[g], label=g, marker="o", linestyle="", alpha=1 / len(groups), ) plt.xlabel("Timestamp") elif kind == "scatter": ax.plot( self.data[g[0]], self.data[g[1]], label=g[1], marker="o", linestyle="", alpha=1 / len(groups), ) plt.xlabel(g[0]) ax.legend() plt.show() return ax
[docs] def downsample( self, frequency_type: str = None, number_of_periods: int = None, sampling_function: Union[dict, str] = None, inplace: bool = False, ) -> Union[None, pd.DataFrame]: """Downsample data Args: frequency_type (str): Downsampling frequency. Either 'day', 'hour', 'minute', or 'second'. number_of_periods (int): Number of frequency used for downsampling. For instance, use 1 and a frequency_type of 'hour' to downsample the data to every hour. sampling_function (Union[dict, str], optional): Function to apply during downsampling, either 'mean' or 'sum' or a dictionary of key value pairs where the keys correspond to all the variables in data and value are either 'mean' or sum'. By default, using mean to downsample. inplace (bool, optional): Modify the dataset directly. Defaults to False. Returns: pd.DataFrame: Modified dataset """ frequency_mapping = {"day": "D", "hour": "H", "minute": "T", "second": "S"} frequency_in_seconds = { "day": 24 * 60 * 60, "hour": 60 * 60, "minute": 60, "second": 1, } if frequency_type is None: logging.error("A frequency_type argument must be provided.") return None if not frequency_type in frequency_mapping.keys(): logging.error( f"{frequency_type} is not supported, please choose one of these: {frequency_mapping.keys()}." ) return None if isinstance(number_of_periods, int): if number_of_periods < 1: logging.error("The number of periods should at least be 1.") return None else: logging.error( f"The number of periods should be specified as an integer, not {type(number_of_periods)}." ) return None if sampling_function is None: logging.info( "Downsampling will be generated by applying the mean to each variable." ) sampling_function = "mean" if isinstance(sampling_function, str): if not sampling_function in ["mean", "sum"]: logging.error( f"The sampling function should be either 'mean' or 'sum'." ) return None sampling_function = dict.fromkeys( list(self.data.columns), sampling_function ) elif isinstance(sampling_function, dict): if len(sampling_function) == 0: logging.error( "The dictionary passed as the sample_function argument cannot be empty." ) return None for v in sampling_function.keys(): if not v in self.data.columns: logging.error( f"{v} is not in data, downsampling cannot be performed." ) return None if not sampling_function[v] in ["mean", "sum"]: logging.error( f"The sampling function for {v} should be either 'mean' or 'sum'." ) return None for v in list(self.data.columns): if not (v in sampling_function.keys()): logging.error( f"{v} is not in the sampling function dictionary. All variables should be included." ) return None else: logging.error( f"The sampling function should either be a string (either, 'mean' or 'sum') of a dictionary mapping the variables of data to either 'mean' or 'sum'. A {type(sampling_function)} was passed as an argument." ) return None # Check that we're not 'upsampling' d = copy.deepcopy(self.data) d["date"] = self.data.index average_resolution_in_second = ( d["date"].diff().fillna(pd.Timedelta(seconds=0))[1:].mean().seconds ) if ( average_resolution_in_second > frequency_in_seconds[frequency_type] * number_of_periods ): logging.error( "You are not attempting to 'upsample': The frequency time chosen is lower than the average timestamp resolution." ) return None # Replace strings by numpy functions for v in list(self.data.columns): if sampling_function[v] == "mean": sampling_function[v] = np.mean elif sampling_function[v] == "sum": sampling_function[v] = np.sum d = self.data.resample( f"{number_of_periods}{frequency_mapping[frequency_type]}" ).agg(sampling_function) if inplace: self.data = d else: return d