"""
brick_compliance.py
====================================
Brick Compliance API
"""
import ast
import copy
import json
import logging
import os
import re
import sys
from collections import defaultdict
import brickschema
import yaml
import pathlib
from pydash import filter_, flatten_deep
PATH = pathlib.Path(__file__).parent.resolve()
HVAC_ZONE_NAME_PARSE_RE = r"\?(\w+) a brick:HVAC_Zone \."
CASE_KEY_NAMES = [
"no",
"run_simulation",
"simulation_IO",
"idf",
"idd",
"weather",
"output",
"ep_path",
"expected_result",
"parameters",
]
[docs]
class BrickCompliance:
def __init__(
self,
brick_schema_path: str,
brick_instance_path: str,
query_statement_path: str = "./resources/brick/query_statement.yml",
datapoint_name_conversion_path: str = "./resources/brick/verification_datapoint_info.yml",
perform_reasoning: bool = False,
) -> None:
"""Instantiate a BrickCompliance class object and load specified brick schema and brick instance.
Args:
brick_schema_path: str
brick schema path (e.g., "../schema/Brick.ttl")
brick_instance_path: str
brick instance path (e.g., "../schema/brick_testing.ttl")
query_statement_path: `str`
the query statements file path. The default path is `./resources/brick/query_statement.yml`.
datapoint_name_conversion_path: str
the datapoint conversion name saving yaml file. The default path nis `./resources/brick/verification_datapoint_info.yml`.
perform_reasoning: `bool` argument whether reasoning is performed to the given instance. The default boolean value is False.
"""
# check arg types
if not isinstance(brick_schema_path, str):
logging.error(
f"The `brick_schema_path` argument type must be str, but {type(brick_schema_path)} type is provided."
)
raise TypeError(
f"brick_schema_path must be str, got {type(brick_schema_path)}"
)
if not isinstance(brick_instance_path, str):
logging.error(
f"The `brick_instance_path` argument type must be str, but {type(brick_instance_path)} type is provided."
)
raise TypeError(
f"brick_instance_path must be str, got {type(brick_instance_path)}"
)
if not isinstance(datapoint_name_conversion_path, str):
logging.error(
f"The `datapoint_name_conversion_path` argument type must be str, but {type(datapoint_name_conversion_path)} type is provided."
)
raise TypeError(
f"datapoint_name_conversion_path must be str, got {type(datapoint_name_conversion_path)}"
)
if not isinstance(query_statement_path, str):
logging.error(
f"The `query_statement_path` argument type must be str, but {type(query_statement_path)} type is provided."
)
raise TypeError(
f"query_statement_path must be str, got {type(query_statement_path)}"
)
if not isinstance(perform_reasoning, bool):
logging.error(
f"The `perform_reasoning` argument type must be bool, but {type(perform_reasoning)} type is provided."
)
raise TypeError(
f"perform_reasoning must be bool, got {type(perform_reasoning)}"
)
# check if the files exist in the given directory
if not os.path.exists(brick_schema_path):
logging.error(
f"The file doesn't exist in the provided directory. Please make sure to provide a correct `brick_schema_path`."
)
raise FileNotFoundError(f"Brick schema file not found: {brick_schema_path}")
if not os.path.exists(brick_instance_path):
logging.error(
f"The file doesn't exist in the provided directory. Please make sure to provide a correct `brick_instance_path`."
)
raise FileNotFoundError(
f"Brick instance file not found: {brick_instance_path}"
)
# define variables
self.brick_schema_path = brick_schema_path
self.brick_instance_path = brick_instance_path
self.queried_datapoint_all_dict = defaultdict(list)
self.queried_result_in_verification_form = []
self.idx = 1
self.verification_case_dict = {
"no": None,
"run_simulation": None,
"simulation_IO": {
"idf": "",
"idd": "",
"weather": "",
"output": "",
"ep_path": "",
},
"expected_result": "",
"datapoints_source": {},
"verification_class": "",
}
# load brick schema and instance files
self.g = brickschema.Graph(load_brick=False)
self.g.load_file(self.brick_schema_path)
if perform_reasoning:
self.g.expand(
profile="rdfs"
) # reasoning on the graph, example: https://brickschema.org/tools/py-brickschema/
self.g.parse(self.brick_instance_path, format="ttl")
# load files needed for brick
try:
with open(query_statement_path, "r") as file:
self.query_statement = yaml.safe_load(file)
except FileNotFoundError:
logging.error(
f"The query statement file isn't found. Please verify the {query_statement_path} path."
)
raise FileNotFoundError(
f"Query statement file not found: {query_statement_path}"
)
# find all the HVAC_ZONE brick class names
self.hvac_zone_name_container = list(
set(
flatten_deep(
[
re.findall(
HVAC_ZONE_NAME_PARSE_RE,
self.query_statement[verification_lib_name],
)
for verification_lib_name in self.query_statement
]
)
)
)
try:
with open(datapoint_name_conversion_path, "r") as file:
self.verification_datapoint_info = yaml.safe_load(file)
except FileNotFoundError:
logging.error(
f"The datapoint name conversion file isn't found. Please verify the {datapoint_name_conversion_path} path."
)
raise FileNotFoundError(
f"Datapoint name conversion file not found: {datapoint_name_conversion_path}"
)
try:
with open(f"{PATH}/../schema/library.json", "r") as file:
self.library_json = json.load(file)
except FileNotFoundError:
logging.error(
f"The library json file isn't found. Please verify that the file exists in the `schema` folder."
)
raise FileNotFoundError(
f"Library json file not found: {PATH}/../schema/library.json"
)
[docs]
def validate_brick_instance(self):
"""Validate a brick instance against the brick schema.
Returns:
valid: bool
whether the validation passes/fails
resultsText: str
Message from the validation process
"""
valid, _, resultsText = self.g.validate()
return valid, resultsText
[docs]
def get_applicable_verification_lib_items(
self,
verification_lib_item_list: list = None,
):
"""Get applicable control verification library items among the `verification_item_lib_name` list from the brick instance.
Args:
verification_item_lib_name: list
list of verification item names to be tested. If empty list is provided, all the available verification library items are tested.
Returns: list
list that includes available verification library item names from the given brick instance.
"""
# check arg types
if verification_lib_item_list is None:
verification_lib_item_list = []
elif not isinstance(verification_lib_item_list, list):
logging.error(
f"The `verification_lib_item_list` argument type must be list, but {type(verification_lib_item_list)} type is provided."
)
return None
# determine total verification lib items to iterate. If `verification_lib_item_list` is empty, all the available verification items are used
if verification_lib_item_list:
# check if the given verification lib item names are valid
if not set(verification_lib_item_list).issubset(list(self.query_statement)):
logging.error(
f"The given verification library item names are invalid. Please check the names again."
)
return None
verification_lib_items = verification_lib_item_list
else:
verification_lib_items = list(self.query_statement)
# find if queried datapoints are the same as the datapoints in the library.json. If not, warning message shows up
available_verification_item_list = []
for verification_lib_item in verification_lib_items:
for query_result in self.query_verification_case_datapoints(
verification_lib_item
):
try:
queried_datapoints = set(
query_result["datapoints_source"]["idf_output_variables"]
)
except KeyError:
queried_datapoints = set(
query_result["datapoints_source"]["dev_settings"]
)
lib_datapoints = set(
self.library_json[verification_lib_item]["description_datapoints"]
)
if queried_datapoints != lib_datapoints:
diff_datapoints = lib_datapoints - queried_datapoints
str_of_datapoints = ", ".join(str(item) for item in diff_datapoints)
logging.warning(
f"Please make sure {str_of_datapoints} datapoints are included in the verification case."
)
else:
available_verification_item_list.append(verification_lib_item)
return list(set(available_verification_item_list))
[docs]
def query_verification_case_datapoints(
self,
verification_item_lib_name: str,
energyplus_naming_assembly: bool = True,
default_verification_case_values: dict = None,
) -> list:
"""
Query data point(s) for the given verification case.
Args:
verification_item_lib_name: str
verification item name(s) that will be queried
energyplus_naming_assembly: bool
whether the queried datapoint is changed to E+ style variable name or not
default_verification_case_values: dict
default key values. ("no", "run_simulation", "idf", "idd", "weather", "output", "ep_path", "expected_result", "parameters",) keys must exist.
Returns:
self.queried_datapoint_all_dict: dict
dictionary that includes verification item as a key and queried data point list as a value
"""
# check arg types
if not isinstance(verification_item_lib_name, str):
logging.error(
f"The `verification_item_lib_name` argument type must be str, but {type(verification_item_lib_name)} type is provided."
)
return None
if not isinstance(energyplus_naming_assembly, bool):
logging.error(
f"The `energyplus_naming_assembly` argument type must be bool, but {type(energyplus_naming_assembly)} type is provided."
)
return None
if (
not isinstance(default_verification_case_values, (dict, str))
and default_verification_case_values is not None
):
logging.error(
f"The `default_verification_case_values` argument type must be dict or str, but {type(default_verification_case_values)} type is provided."
)
return None
elif isinstance(default_verification_case_values, str):
default_verification_case_values = ast.literal_eval(
default_verification_case_values
)
# check if `default_verification_case_values` includes not allowed key(s)
if default_verification_case_values is not None:
self._default_verification_case_values_sanity_check_helper(
default_verification_case_values
)
# perform query
query_result = self.g.query(self.query_statement[verification_item_lib_name])
# organize query result
queried_verification_datapoints = self._organize_query_results_helper(
query_result,
verification_item_lib_name,
energyplus_naming_assembly,
default_verification_case_values,
)
# add to the `self.queried_result_in_verification_form`
self.queried_result_in_verification_form += queried_verification_datapoints
return queried_verification_datapoints
[docs]
def query_with_customized_statement(
self,
custom_query_statement: str,
verification_item_lib_name: str,
energyplus_naming_assembly: bool = True,
default_verification_case_values: dict = None,
) -> list:
"""Query datapoints with a customized query statement. When implemented, the quality check of the `query_statement` is done by checking whether the number of queried variables are the same as the required number of data points in the verification library item.
Args:
custom_query_statement: str
query statement written from users
verification_item_lib_name: str
verification library item of the query_statement
energyplus_naming_assembly: bool
whether to convert the queried datapoints' name to EnergyPlus style variable name.
default_verification_case_values: dict
default key values. ("no", "run_simulation", "idf", "idd", "weather", "output", "ep_path", "expected_result", "parameters",) keys must exist.
Returns:
queried result in the verification case format. `str` message from the `query_statement`'s quality check result.
"""
# check arg types
if not isinstance(custom_query_statement, str):
logging.error(
f"The `custom_query_statement` argument type must be str, but {type(custom_query_statement)} type is provided."
)
return None
if not isinstance(verification_item_lib_name, str):
logging.error(
f"The `verification_item_lib_name` argument type must be str, but {type(verification_item_lib_name)} type is provided."
)
return None
if not isinstance(energyplus_naming_assembly, bool):
logging.error(
f"The `energyplus_naming_assembly` argument type must be bool, but {type(energyplus_naming_assembly)} type is provided."
)
return None
if (
not isinstance(default_verification_case_values, (dict, str))
and default_verification_case_values is not None
):
logging.error(
f"The `default_verification_case_values` argument type must be dict or str, but {type(default_verification_case_values)} type is provided."
)
return None
elif isinstance(default_verification_case_values, str):
default_verification_case_values = ast.literal_eval(
default_verification_case_values
)
# check if `default_verification_case_values` includes not allowed key(s)
if default_verification_case_values is not None:
self._default_verification_case_values_sanity_check_helper(
default_verification_case_values
)
# perform query
query_result = self.g.query(custom_query_statement)
# organize query result
queried_verification_datapoints = self._organize_query_results_helper(
query_result,
verification_item_lib_name,
energyplus_naming_assembly,
default_verification_case_values,
)
# add to the `self.queried_result_in_verification_form`
self.queried_result_in_verification_form += queried_verification_datapoints
# quality check - whether the no. of queried variables == required no. of data points in the verification library item
total_no_of_datapoints = len(
self.library_json[verification_item_lib_name]["description_datapoints"]
)
for queried_datapoints in self.queried_datapoint_all_dict[
verification_item_lib_name
]:
if filter_(
self.hvac_zone_name_container, lambda x: x in queried_datapoints
):
if len(queried_datapoints) != total_no_of_datapoints + 1:
logging.warning(
f"The number of datapoints with the customized query statement is {len(queried_datapoints)-1} excluding the `hvac_zone` and the number of required datapoints for {verification_item_lib_name} verification item is {total_no_of_datapoints}. The two numbers must be the same."
)
else:
if len(queried_datapoints) != total_no_of_datapoints:
logging.warning(
f"The number of datapoints with the customized query statement is {len(queried_datapoints)} and the number of required datapoints for {verification_item_lib_name} verification item is {total_no_of_datapoints}. The two numbers must be the same."
)
return queried_verification_datapoints
def _organize_query_results_helper(
self,
query_result,
verification_item_lib_name: str,
energyplus_naming_assembly: bool,
default_verification_case_values: dict = None,
):
# save the data point name in the instance
for row in query_result:
queried_datapoint_dict = {}
for i in range(len(row)):
queried_datapoint_dict[list(row.labels)[i]] = row[i].split("#")[1]
ver_item_datapoint_ver_item_lib = self.queried_datapoint_all_dict[
verification_item_lib_name
]
if queried_datapoint_dict not in ver_item_datapoint_ver_item_lib:
ver_item_datapoint_ver_item_lib.append(queried_datapoint_dict)
# convert to the verification case format
result = self._convert_to_verification_case_format_helper(
verification_item_lib_name,
energyplus_naming_assembly,
default_verification_case_values,
)
return result
def _convert_to_verification_case_format_helper(
self,
verification_case_name: str,
energyplus_naming_assembly: str,
default_verification_case_values: dict = None,
) -> list:
if energyplus_naming_assembly:
self.verification_case_dict["datapoints_source"][
"idf_output_variables"
] = {}
else:
self.verification_case_dict["datapoints_source"]["dev_settings"] = {}
verification_case_saving_list = []
for index, query_dict in enumerate(
self.queried_datapoint_all_dict[verification_case_name]
):
verification_case_dict_copy = copy.deepcopy(self.verification_case_dict)
verification_case_dict_copy["no"] = self.idx
self.idx += 1
verification_case_dict_copy["verification_class"] = verification_case_name
conversion_setting = (
"EnergyPlus" if energyplus_naming_assembly else "default"
)
datapoint_info = self.verification_datapoint_info[verification_case_name][
conversion_setting
]
for key, value in query_dict.items():
if (
key not in self.hvac_zone_name_container
): # prevent HVAC_ZONE class from going through this loop
point_nonmen = datapoint_info[key]["point"]
if energyplus_naming_assembly:
verification_case_dict_copy["datapoints_source"][
"idf_output_variables"
][point_nonmen] = {}
else:
verification_case_dict_copy["datapoints_source"][
"dev_settings"
][point_nonmen] = {}
datapoint_ver_case_idx = self.queried_datapoint_all_dict[
verification_case_name
][index]
try:
subject = datapoint_ver_case_idx[datapoint_info[key]["subject"]]
except KeyError:
subject = None
if energyplus_naming_assembly:
verification_case_dict_copy["datapoints_source"][
"idf_output_variables"
][point_nonmen].update(
{
"subject": "" if subject is None else subject,
"variable": datapoint_info[key]["variable"],
"frequency": "",
}
)
else:
verification_case_dict_copy["datapoints_source"][
"dev_settings"
][point_nonmen].update(
{
"subject": "" if subject is None else subject,
"variable": datapoint_ver_case_idx[
datapoint_info[key]["variable"]
],
"frequency": "",
}
)
# feed in the default values if exist
if default_verification_case_values is not None:
verification_case_dict_copy["run_simulation"] = (
default_verification_case_values["run_simulation"]
)
for key_name in (
"idf",
"idd",
"weather",
"output",
"ep_path",
):
verification_case_dict_copy["simulation_IO"][key_name] = (
default_verification_case_values["simulation_IO"][key_name]
)
verification_case_dict_copy["expected_result"] = (
default_verification_case_values["expected_result"]
)
verification_case_dict_copy["datapoints_source"]["parameters"] = (
default_verification_case_values["parameters"]
)
verification_case_saving_list.append(verification_case_dict_copy)
return verification_case_saving_list
def _default_verification_case_values_sanity_check_helper(
self, default_verification_case_values: dict
) -> None:
def _check_specific_keys_in_nested_dict(keys, dictionary):
for key in dictionary:
if key not in keys:
return False
if isinstance(dictionary[key], dict) and key != "parameters":
if not _check_specific_keys_in_nested_dict(keys, dictionary[key]):
return False
return True
if not _check_specific_keys_in_nested_dict(
CASE_KEY_NAMES, default_verification_case_values
):
logging.error(
f"`default_verification_case_values` dictionary includes NOT allowed key(s). Please verify the dictionary again."
)
return None