Source code for checklib

"""
This file containing the high level interface for implementing verificaiton item classes in library.py
"""

# %% Supress future warning if needed
import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)

# %% import packages
import datetime
from datetime import timedelta, date
from typing import List, Dict, Union
from abc import ABC, abstractmethod
import matplotlib.pyplot as plt
import seaborn as sns
import glob, json, os

# plt.style.use("ggplot")
import pandas as pd
from pandas.plotting import register_matplotlib_converters

# register_matplotlib_converters()


[docs] class CheckLibBase(ABC): """Abstract class defining interfaces for item-specific verification classes""" points = None result = pd.DataFrame() def __init__(self, df: pd.DataFrame, params=None, results_folder=None): full_df = df.copy(deep=True) if params is not None: for k, v in params.items(): full_df[k] = v col_list = full_df.columns.values.tolist() if not set(self.points_list).issubset(set(col_list)): print(f"Dataset is not sufficient for running {self.__class__.__name__}") print(set(col_list)) self.df = full_df[self.points_list] self.df.index = pd.to_datetime(self.df.index) self.df = self.df.sort_index() self.results_folder = results_folder self.verify() self.result.name = "" self.df["Verification Result"] = self.result @property def points_list(self) -> List[str]: return self.points
[docs] @abstractmethod def check_bool(self) -> bool: """implementation of the checking boolean return""" pass
[docs] @abstractmethod def check_detail(self) -> Dict: """implementaion of the checking detailed return in Dict""" pass
[docs] @abstractmethod def verify(self): """checking logic implementation, not for user""" pass
@property def get_checks(self): return self.check_bool(), self.check_detail()
[docs] def add_md( self, md_file_path, img_folder, relative_path_to_img_in_md, item_dict, plot_option=None, fig_size=(6.4, 4.8), ): outcome_bool, outcome_dict = self.get_checks img_folder = f"{img_folder}/VerificationCase{item_dict['no']}" relative_path_to_img_in_md = ( f"{relative_path_to_img_in_md}/VerificationCase{item_dict['no']}" ) if not os.path.exists(img_folder): os.makedirs(img_folder) self.results_folder = img_folder if plot_option is None: self.plot(plot_option="all-compact", fig_size=fig_size) self.plot(plot_option="all-expand", fig_size=fig_size) self.plot(plot_option="day-compact", fig_size=fig_size) self.plot(plot_option="day-expand", fig_size=fig_size) else: self.plot(plot_option=plot_option, fig_size=fig_size) image_list = glob.glob(f"{img_folder}/*.png") image_md_path_list = [ x.replace(img_folder, relative_path_to_img_in_md) for x in image_list ] img_md = "" for i in range(len(image_list)): img_def_path = image_list[i] img_rel_path = image_md_path_list[i] img_md += f""" ![{img_def_path}]({img_rel_path}) """ md_content = f""" ## Results for Verification Case ID {item_dict['no']} ### Pass/Fail check result {str(outcome_dict)} ### Result visualization {img_md} ### Verification case definition ``` {json.dumps(item_dict, indent=2)} ``` --- """ if md_file_path is not None: with open(md_file_path, "a") as fw: fw.write(md_content) return { "md_content": md_content, "outcome_notes": outcome_dict, "model_file": item_dict["simulation_IO"]["idf"] .split("/")[-1] .split("\\")[-1] .replace(".idf", ""), "verification_class": item_dict["verification_class"], }
[docs] def save_data(self, csv_path): self.df.to_csv(csv_path) return
[docs] def plot(self, plot_option, plt_pts=None, fig_size=(6.4, 4.8)): """default plot function for showing result""" if plt_pts is None: plt_pts = self.df.columns.tolist() if plot_option is None: return plot_option = plot_option.strip().lower() plt.subplots() # filter out "Untested" to prevent an error when plotting self.result_filtered = self.result[self.result != "Untested"] if plot_option == "all-compact": self.all_plot_aio(plt_pts, fig_size) elif plot_option == "all-expand": self.all_plot_obo(plt_pts, fig_size) elif plot_option == "day-compact": self.day_plot_aio(plt_pts, fig_size) elif plot_option == "day-expand": self.day_plot_obo(plt_pts, fig_size) else: print("Invalid plot option!") plt.close("all") return
[docs] def all_plot_aio(self, plt_pts, fig_size): """All in one plot of all samples""" plt.figure(figsize=fig_size) # flag ax1 = plt.subplot(2, 1, 1) sns.scatterplot( x=self.result_filtered.index, y=self.result_filtered, linewidth=0, s=1 ) plt.xlim([self.df.index[0], self.df.index[-1]]) plt.ylim([-0.2, 1.2]) plt.title(f"All samples Pass / Fail flag plot - {self.__class__.__name__}") # datapoints ax2 = plt.subplot(2, 1, 2) self.df[plt_pts].plot(ax=ax2) pt_nan = self.df.isnull().any().to_dict() for i, line in enumerate(ax2.get_lines()): line_label = line.get_label() if pt_nan[line_label]: line.set_marker(".") ax2.ticklabel_format(useOffset=False, axis="y") plt.title(f"All samples data points plot - {self.__class__.__name__}") plt.tight_layout() plt.savefig(f"{self.results_folder}/All_plot_aio.png") print()
[docs] def all_plot_obo(self, plt_pts, fig_size): """One by one plot of all samples""" num_plots = len(plt_pts) + 1 plt.figure(figsize=(fig_size[0], fig_size[1] * num_plots)) # flag ax1 = plt.subplot(num_plots, 1, 1) sns.scatterplot( x=self.result_filtered.index, y=self.result_filtered, linewidth=0, s=1 ) plt.xlim([self.df.index[0], self.df.index[-1]]) plt.ylim([-0.2, 1.2]) plt.title(f"All samples Pass / Fail flag plot - {self.__class__.__name__}") # datapoints pt_nan = self.df.isnull().any().to_dict() i = 2 for pt in plt_pts: try: axx = plt.subplot(num_plots, 1, i) if pt_nan[pt]: self.df[pt].plot(ax=axx, marker=".") else: # check if values in the series are boolean if self.df[pt].apply(lambda x: isinstance(x, bool)).all(): self.df[pt] = self.df[pt].astype(int) self.df[pt].plot(ax=axx) plt.title(f"All samples - {pt} - {self.__class__.__name__}") i += 1 axx.ticklabel_format(useOffset=False, axis="y") except: print(f"{pt} cannot be plotted by itself, ignored in the plot.") plt.tight_layout() plt.savefig(f"{self.results_folder}/All_plot_obo.png") print()
[docs] def calculate_plot_day(self): trueday = None truedaydf = None falseday = None falsedaydf = None mixday = None mixdaydf = None ratio = -0.5 # Looking for day with most balanced pass/fail samples for one_day in self.daterange( date(self.df.index[0].year, self.df.index[0].month, self.df.index[0].day), date( self.df.index[-1].year, self.df.index[-1].month, self.df.index[-1].day ), ): daystr = f"{str(one_day.year)}-{str(one_day.month)}-{str(one_day.day)}" daydf = self.df.loc[daystr] day = self.result[daystr] if (trueday is None) and len(day[day == True]) > 0: trueday = day truedaydf = daydf # print("reach true") continue if (falseday is None) and len(day[day == False]) > 0: falseday = day falsedaydf = daydf # print("reach false") continue if len(day[day == False]) == 0 or len(day[day == True]) == 0: continue new_ratio = len(day[day == True]) / len(day) - 0.5 if abs(new_ratio) < abs(ratio): ratio = new_ratio mixday = day mixdaydf = daydf if mixdaydf is None: plotdaydf = daydf plotday = day else: plotdaydf = mixdaydf plotday = mixday return plotday, plotdaydf
[docs] def day_plot_aio(self, plt_pts, fig_size): """ALl in one plot for one day""" plt.figure(figsize=fig_size) plotday, plotdaydf = self.calculate_plot_day() plotday_filtered = plotday[plotday != "Untested"] # flag ax1 = plt.subplot(2, 1, 1) sns.scatterplot(x=plotday_filtered.index, y=plotday_filtered) plt.xlim([plotday.index[0], plotday.index[-1]]) plt.ylim([-0.2, 1.2]) plt.title(f"Example day Pass / Fail flag - {self.__class__.__name__}") # datapoints ax2 = plt.subplot(2, 1, 2) plotdaydf[plt_pts].plot(ax=ax2) pt_nan = plotdaydf.isnull().any().to_dict() for i, line in enumerate(ax2.get_lines()): line_label = line.get_label() if pt_nan[line_label]: line.set_marker(".") ax2.ticklabel_format(useOffset=False, axis="y") plt.title(f"Example day data points plot - {self.__class__.__name__}") plt.tight_layout() plt.savefig(f"{self.results_folder}/Day_plot_aio.png") print()
[docs] def day_plot_obo(self, plt_pts, fig_size): """One by one plot of all samples""" num_plots = len(plt_pts) + 1 plt.figure(figsize=(fig_size[0], fig_size[1] * num_plots)) plotday, plotdaydf = self.calculate_plot_day() plotday_filtered = plotday[plotday != "Untested"] # flag ax1 = plt.subplot(num_plots, 1, 1) sns.scatterplot(x=plotday_filtered.index, y=plotday_filtered) plt.xlim([plotday.index[0], plotday.index[-1]]) plt.ylim([-0.2, 1.2]) plt.title(f"Example day Pass / Fail flag plot - {self.__class__.__name__}") # datapoints pt_nan = plotdaydf.isnull().any().to_dict() i = 2 for pt in plt_pts: try: axx = plt.subplot(num_plots, 1, i) if pt_nan[pt]: plotdaydf[pt].plot(ax=axx, marker=".") else: # check if values in the series are boolean if self.df[pt].apply(lambda x: isinstance(x, bool)).all(): self.df[pt] = self.df[pt].astype(int) plotdaydf[pt].plot(ax=axx) plt.title(f"Example day - {pt} - {self.__class__.__name__}") i += 1 axx.ticklabel_format(useOffset=False, axis="y") except: print(f"{pt} cannot be plotted by itself, ignored in the plot.") plt.tight_layout() plt.savefig(f"{self.results_folder}/Day_plot_obo.png") print()
[docs] def daterange(self, start_date, end_date): for n in range(int((end_date - start_date).days)): yield start_date + timedelta(n)
[docs] class RuleCheckBase(CheckLibBase):
[docs] def check_bool(self) -> Union[bool, str]: if len(self.result[self.result == False] > 0): return False elif len(self.result[self.result == True] > 0): return True else: return "Untested"
[docs] def check_detail(self) -> Dict: output = { "Sample #": len(self.result), "Pass #": len(self.result[self.result == True]), "Fail #": len(self.result[self.result == False]), "Untested #": len(self.result[self.result == "Untested"]), "Verification Passed?": self.check_bool(), } print("Verification results dict: ") print(output) return output
[docs] class EconomizerIntegrationCompliance(RuleCheckBase): points = ["OA_min_sys", "OA_timestep", "Cool_sys_out"]
[docs] def verify(self): """Economizer Integration: Non-Integrated Economizer Operation "assertions_type": "fail", "assertion_level": "sample", "assertions": ["$OA_timestep > $OA_min_sys and $Cool_sys_out > 0"], """ # Logical Operators in Pandas are &, | and ~, and parentheses (...) is important! self.result = ~( (self.df["OA_timestep"] > self.df["OA_min_sys"]) & (self.df["Cool_sys_out"] > 0) )
[docs] class EconomizerHeatingCompliance(RuleCheckBase): points = ["OA_min_sys", "OA_timestep", "Heat_sys_out"]
[docs] def verify(self): self.result = ~( (self.df["OA_timestep"] > self.df["OA_min_sys"]) & (self.df["Heat_sys_out"] > 0) )
[docs] class HeatRecoveryCompliance(RuleCheckBase): points = ["OA_timestep", "Heat_rec", "Cool_rec", "OA_min_sys"]
[docs] def verify(self): self.result = ~( (self.df["OA_timestep"] > self.df["OA_min_sys"]) & ((self.df["Heat_rec"] > 0) | (self.df["Cool_rec"] > 0)) )
[docs] class SimultaneousHeatingCoolingCompliance(RuleCheckBase): points = ["Cool_sys_out", "Heat_sys_out"]
[docs] def verify(self): self.result = ~((self.df["Cool_sys_out"] > 0) & (self.df["Heat_sys_out"] > 0))
[docs] class HumidityWithinBoundaries(RuleCheckBase): points = ["Zone_hum", "Hum_up_bound", "Hum_low_bound"]
[docs] def verify(self): self.result = (self.df["Zone_hum"] >= self.df["Hum_up_bound"]) & ( self.df["Zone_hum"] <= self.df["Hum_low_bound"] )
[docs] class ContinuousDimmingCompliance(CheckLibBase): points = ["Electric_light_power"] flat_min_threshold = 60
[docs] def check_bool(self) -> bool: if self.max_up_period > 60 and self.max_down_period > 60: return True return False
[docs] def check_detail(self) -> Dict: output = { "max_up_period": self.max_up_period, "max_down_period": self.max_down_period, "max_up_start": self.max_up_start, "max_up_end": self.max_up_end, "max_down_start": self.max_down_start, "max_down_end": self.max_down_end, } sns.scatterplot(x=self.df.index, y=self.df["Electric_light_power"]) plt.axvspan( output["max_up_start"], output["max_up_end"], color="red", alpha=0.3 ) plt.axvspan( output["max_down_start"], output["max_down_end"], color="green", alpha=0.3 ) plt.xlim([self.df.index[0], self.df.index[-1]]) plt.title(self.__class__.__name__) plt.show() return output
[docs] def verify(self): max_up_period = 0 max_down_period = 0 trend_period = 0 v_prev = None trend = None start_time_flat = None end_time_flat = None start_time = None end_time = None max_up_start = None max_up_end = None max_down_start = None max_down_end = None flat_flag = False for i, v in self.df["Electric_light_power"].iteritems(): if v_prev is None: v_prev = v start_time = i end_time = i continue if v > v_prev: if trend == -1: start_time = i trend = 1 flat_flag = False end_time = i if v < v_prev: if trend == 1: start_time = i trend = -1 flat_flag = False end_time = i if v == v_prev: if flat_flag: end_time_flat = i else: start_time_flat = i flat_flag = True v_prev = v continue if ( self.delta_minutes(start_time_flat, end_time_flat) >= self.flat_min_threshold ): trend_period = 0 v_prev = v start_time = i end_time = i continue trend_period = self.delta_minutes(start_time, end_time) if trend == 1 and trend_period > max_up_period: max_up_period = trend_period max_up_start = start_time max_up_end = end_time if trend == -1 and trend_period > max_down_period: max_down_period = trend_period max_down_start = start_time max_down_end = end_time v_prev = v self.max_up_start = max_up_start self.max_up_end = max_up_end self.max_down_start = max_down_start self.max_down_end = max_down_end self.max_up_period = max_up_period self.max_down_period = max_down_period
[docs] def delta_minutes( self, start_time_flat: datetime.datetime, end_time_flat: datetime.datetime ) -> float: return (end_time_flat - start_time_flat).total_seconds() / 60
[docs] def main(): import json from tqdm import tqdm with open("../schema/simplified2items.json") as json_file: data = json.load(json_file) items = data["items"] from datetimeep import DateTimeEP # check dimming control example # df1 = DateTimeEP( # pd.read_csv( # "../resources/ASHRAE901_SchoolPrimary_STD2019_ElPaso/ASHRAE901_SchoolPrimary_STD2019_ElPaso.csv" # ) # ).transform() # dimming_item = items[0] # point_map = dimming_item["datapoints_source"]["output_variables"] # point_map_reverse = {value.strip(): key.strip() for key, value in point_map.items()} # new_df1 = df1.rename(str.strip, axis="columns") # new_df1 = new_df1.rename(columns=point_map_reverse) # cdc = ContinuousDimmingCompliance(new_df1["2000-07-21"]).get_checks # print(cdc) # check rule based examples df_rule = DateTimeEP( pd.read_csv( "../resources/ASHRAE901_Hospital_STD2016_Tampa/ASHRAE901_Hospital_STD2016_Tampa.csv" # hudmidity # "../resources/ASHRAE901_SchoolPrimary_STD2004_ElPaso_Injected/eplusout.csv" # non-int economizer ) ).transform() # rule_items_id = [1] # non-int economizer rule_items_id = [-1] # humidity for item_id in rule_items_id: item = items[item_id] point_map = item["datapoints_source"]["output_variables"] point_map_reverse = { value.strip(): key.strip() for key, value in point_map.items() } new_df = df_rule.rename(str.strip, axis="columns") new_df = new_df.rename(columns=point_map_reverse) cls = globals()[item["verification_class"]] parameter = ( item["datapoints_source"]["parameters"] if ("parameters" in item["datapoints_source"]) else None ) outcome = cls(new_df, item["datapoints_source"]["parameters"]).get_checks print(f"{item['verification_class']}:") print(outcome)
if __name__ == "__main__": main() # %% #