import ast
import copy
import json
import logging
import os
import re
import sys
from collections import defaultdict
import brickschema
import yaml
from pydash import filter_, flatten_deep
sys.path.append("..")
HVAC_ZONE_NAME_PARSE_RE = r"\?(\w+) a brick:HVAC_Zone \."
CASE_KEY_NAMES = [
"no",
"run_simulation",
"simulation_IO",
"idf",
"idd",
"weather",
"output",
"ep_path",
"expected_result",
"parameters",
]
[docs]
class BrickCompliance:
def __init__(
self,
brick_schema_path: str,
brick_instance_path: str,
query_statement_path: str = "./resources/brick/query_statement.yml",
datapoint_name_conversion_path: str = "./resources/brick/verification_datapoint_info.yml",
perform_reasoning: bool = False,
) -> None:
"""Instantiate a BrickCompliance class object and load specified brick schema and brick instance.
Args:
brick_schema_path: str
brick schema path (e.g., "../schema/Brick.ttl")
brick_instance_path: str
brick instance path (e.g., "../schema/brick_testing.ttl")
query_statement_path: `str`
the query statements file path. The default path is `./resources/brick/query_statement.yml`.
datapoint_name_conversion_path: str
the datapoint conversion name saving yaml file. The default path nis `./resources/brick/verification_datapoint_info.yml`.
perform_reasoning: `bool` argument whether reasoning is performed to the given instance. The default boolean value is False.
"""
# check arg types
if not isinstance(brick_schema_path, str):
logging.error(
f"The `brick_schema_path` argument type must be str, but {type(brick_schema_path)} type is provided."
)
return None
if not isinstance(brick_instance_path, str):
logging.error(
f"The `brick_instance_path` argument type must be str, but {type(brick_instance_path)} type is provided."
)
return None
if not isinstance(datapoint_name_conversion_path, str):
logging.error(
f"The `datapoint_name_conversion_path` argument type must be str, but {type(datapoint_name_conversion_path)} type is provided."
)
return None
if not isinstance(perform_reasoning, bool):
logging.error(
f"The `perform_reasoning` argument type must be bool, but {type(perform_reasoning)} type is provided."
)
return None
# check if the files exist in the given directory
if not os.path.exists(brick_schema_path):
logging.error(
f"The file doesn't exist in the provided directory. Please make sure to provide a correct `brick_schema_path`."
)
return None
if not os.path.exists(brick_instance_path):
logging.error(
f"The file doesn't exist in the provided directory. Please make sure to provide a correct `brick_instance_path`."
)
return None
# define variables
self.brick_schema_path = brick_schema_path
self.brick_instance_path = brick_instance_path
self.queried_datapoint_all_dict = defaultdict(list)
self.queried_result_in_verification_form = []
self.idx = 1
self.verification_case_dict = {
"no": None,
"run_simulation": None,
"simulation_IO": {
"idf": "",
"idd": "",
"weather": "",
"output": "",
"ep_path": "",
},
"expected_result": "",
"datapoints_source": {},
"verification_class": "",
}
# load brick schema and instance files
self.g = brickschema.Graph(load_brick=False)
self.g.load_file(self.brick_schema_path)
if perform_reasoning:
self.g.expand(
profile="rdfs"
) # reasoning on the graph, example: https://brickschema.org/tools/py-brickschema/
self.g.parse(self.brick_instance_path, format="ttl")
# load files needed for brick
try:
with open(query_statement_path, "r") as file:
self.query_statement = yaml.safe_load(file)
except FileNotFoundError:
logging.error(
f"The query statement file isn't found. Please verify the {query_statement_path} path."
)
return None
# find all the HVAC_ZONE brick class names
self.hvac_zone_name_container = list(
set(
flatten_deep(
[
re.findall(
HVAC_ZONE_NAME_PARSE_RE,
self.query_statement[verification_lib_name],
)
for verification_lib_name in self.query_statement
]
)
)
)
try:
with open(datapoint_name_conversion_path, "r") as file:
self.verification_datapoint_info = yaml.safe_load(file)
except FileNotFoundError:
logging.error(
f"The datapoint name conversion file isn't found. Please verify the {datapoint_name_conversion_path} path."
)
return None
try:
with open("./schema/library.json", "r") as file:
self.library_json = json.load(file)
except FileNotFoundError:
logging.error(
f"The library json file isn't found. Please verify that the file exists in the `schema` folder."
)
return None
[docs]
def validate_brick_instance(self):
"""Validate a brick instance against the brick schema.
Returns:
valid: bool
whether the validation passes/fails
resultsText: str
Message from the validation process
"""
valid, _, resultsText = self.g.validate()
return valid, resultsText
[docs]
def get_applicable_verification_lib_items(
self,
verification_lib_item_list: list = None,
):
"""Get applicable control verification library items among the `verification_item_lib_name` list from the brick instance.
Args:
verification_item_lib_name: list
list of verification item names to be tested. If empty list is provided, all the available verification library items are tested.
Returns: list
list that includes available verification library item names from the given brick instance.
"""
# check arg types
if verification_lib_item_list is None:
verification_lib_item_list = []
elif not isinstance(verification_lib_item_list, list):
logging.error(
f"The `verification_lib_item_list` argument type must be list, but {type(verification_lib_item_list)} type is provided."
)
return None
# determine total verification lib items to iterate. If `verification_lib_item_list` is empty, all the available verification items are used
if verification_lib_item_list:
# check if the given verification lib item names are valid
if not set(verification_lib_item_list).issubset(list(self.query_statement)):
logging.error(
f"The given verification library item names are invalid. Please check the names again."
)
return None
verification_lib_items = verification_lib_item_list
else:
verification_lib_items = list(self.query_statement)
# find if queried datapoints are the same as the datapoints in the library.json. If not, warning meesage shows up
available_verification_item_list = []
for verification_lib_item in verification_lib_items:
for query_result in self.query_verification_case_datapoints(
verification_lib_item
):
try:
queried_datapoints = set(
query_result["datapoints_source"]["idf_output_variables"]
)
except KeyError:
queried_datapoints = set(
query_result["datapoints_source"]["dev_settings"]
)
lib_datapoints = set(
self.library_json[verification_lib_item]["description_datapoints"]
)
if queried_datapoints != lib_datapoints:
diff_datapoints = lib_datapoints - queried_datapoints
str_of_datapoints = ", ".join(str(item) for item in diff_datapoints)
logging.warning(
f"Please make sure {str_of_datapoints} datapoints are included in the verification case."
)
else:
available_verification_item_list.append(verification_lib_item)
return list(set(available_verification_item_list))
[docs]
def query_verification_case_datapoints(
self,
verification_item_lib_name: str,
energyplus_naming_assembly: bool = True,
default_verification_case_values: dict = None,
) -> list:
"""
Query data point(s) for the given verification case.
Args:
verification_item_lib_name: str
verification item name(s) that will be queried
energyplus_naming_assembly: bool
whether the queried datapoint is changed to E+ style variable name or not
default_verification_case_values: dict
default key values. ("no", "run_simulation", "idf", "idd", "weather", "output", "ep_path", "expected_result", "parameters",) keys must exist.
Returns:
self.queried_datapoint_all_dict: dict
dictionary that includes verification item as a key and queried data point list as a value
"""
# check arg types
if not isinstance(verification_item_lib_name, str):
logging.error(
f"The `verification_item_lib_name` argument type must be str, but {type(verification_item_lib_name)} type is provided."
)
return None
if not isinstance(energyplus_naming_assembly, bool):
logging.error(
f"The `energyplus_naming_assembly` argument type must be bool, but {type(energyplus_naming_assembly)} type is provided."
)
return None
if (
not isinstance(default_verification_case_values, (dict, str))
and default_verification_case_values is not None
):
logging.error(
f"The `default_verification_case_values` argument type must be dict or str, but {type(default_verification_case_values)} type is provided."
)
return None
elif isinstance(default_verification_case_values, str):
default_verification_case_values = ast.literal_eval(
default_verification_case_values
)
# check if `default_verification_case_values` includes not allowed key(s)
if default_verification_case_values is not None:
self._default_verification_case_values_sanity_check_helper(
default_verification_case_values
)
# perform query
query_result = self.g.query(self.query_statement[verification_item_lib_name])
# organize query result
queried_verification_datapoints = self._organize_query_results_helper(
query_result,
verification_item_lib_name,
energyplus_naming_assembly,
default_verification_case_values,
)
# add to the `self.queried_result_in_verification_form`
self.queried_result_in_verification_form += queried_verification_datapoints
return queried_verification_datapoints
[docs]
def query_with_customized_statement(
self,
custom_query_statement: str,
verification_item_lib_name: str,
energyplus_naming_assembly: bool = True,
default_verification_case_values: dict = None,
) -> list:
"""Query datapoints with a customized query statement. When implemented, the quality check of the `query_statement` is done by checking whether the number of queried variables are the same as the required number of data points in the verification library item.
Args:
custom_query_statement: str
query statement written from users
verification_item_lib_name: str
verification library item of the query_statement
energyplus_naming_assembly: bool
whether to convert the queried datapoints' name to EnergyPlus style variable name.
default_verification_case_values: dict
default key values. ("no", "run_simulation", "idf", "idd", "weather", "output", "ep_path", "expected_result", "parameters",) keys must exist.
Returns:
queried result in the verification case format. `str` message from the `query_statement`'s quality check result.
"""
# check arg types
if not isinstance(custom_query_statement, str):
logging.error(
f"The `custom_query_statement` argument type must be str, but {type(custom_query_statement)} type is provided."
)
return None
if not isinstance(verification_item_lib_name, str):
logging.error(
f"The `verification_item_lib_name` argument type must be str, but {type(verification_item_lib_name)} type is provided."
)
return None
if not isinstance(energyplus_naming_assembly, bool):
logging.error(
f"The `energyplus_naming_assembly` argument type must be bool, but {type(energyplus_naming_assembly)} type is provided."
)
return None
if (
not isinstance(default_verification_case_values, (dict, str))
and default_verification_case_values is not None
):
logging.error(
f"The `default_verification_case_values` argument type must be dict or str, but {type(default_verification_case_values)} type is provided."
)
return None
elif isinstance(default_verification_case_values, str):
default_verification_case_values = ast.literal_eval(
default_verification_case_values
)
# check if `default_verification_case_values` includes not allowed key(s)
if default_verification_case_values is not None:
self._default_verification_case_values_sanity_check_helper(
default_verification_case_values
)
# perform query
query_result = self.g.query(custom_query_statement)
# organize query result
queried_verification_datapoints = self._organize_query_results_helper(
query_result,
verification_item_lib_name,
energyplus_naming_assembly,
default_verification_case_values,
)
# add to the `self.queried_result_in_verification_form`
self.queried_result_in_verification_form += queried_verification_datapoints
# quality check - whether the no. of queried variables == required no. of data points in the verification library item
total_no_of_datapoints = len(
self.library_json[verification_item_lib_name]["description_datapoints"]
)
for queried_datapoints in self.queried_datapoint_all_dict[
verification_item_lib_name
]:
if filter_(
self.hvac_zone_name_container, lambda x: x in queried_datapoints
):
if len(queried_datapoints) != total_no_of_datapoints + 1:
logging.warning(
f"The number of datapoints with the customized query statement is {len(queried_datapoints)-1} excluding the `hvac_zone` and the number of required datapoints for {verification_item_lib_name} verification item is {total_no_of_datapoints}. The two numbers must be the same."
)
else:
if len(queried_datapoints) != total_no_of_datapoints:
logging.warning(
f"The number of datapoints with the customized query statement is {len(queried_datapoints)} and the number of required datapoints for {verification_item_lib_name} verification item is {total_no_of_datapoints}. The two numbers must be the same."
)
return queried_verification_datapoints
def _organize_query_results_helper(
self,
query_result,
verification_item_lib_name: str,
energyplus_naming_assembly: bool,
default_verification_case_values: dict = None,
):
# save the data point name in the instance
for row in query_result:
queried_datapoint_dict = {}
for i in range(len(row)):
queried_datapoint_dict[list(row.labels)[i]] = row[i].split("#")[1]
ver_item_datapoint_ver_item_lib = self.queried_datapoint_all_dict[
verification_item_lib_name
]
if queried_datapoint_dict not in ver_item_datapoint_ver_item_lib:
ver_item_datapoint_ver_item_lib.append(queried_datapoint_dict)
# convert to the verification case format
result = self._convert_to_verification_case_format_helper(
verification_item_lib_name,
energyplus_naming_assembly,
default_verification_case_values,
)
return result
def _convert_to_verification_case_format_helper(
self,
verification_case_name: str,
energyplus_naming_assembly: str,
default_verification_case_values: dict = None,
) -> list:
if energyplus_naming_assembly:
self.verification_case_dict["datapoints_source"][
"idf_output_variables"
] = {}
else:
self.verification_case_dict["datapoints_source"]["dev_settings"] = {}
verification_case_saving_list = []
for index, query_dict in enumerate(
self.queried_datapoint_all_dict[verification_case_name]
):
verification_case_dict_copy = copy.deepcopy(self.verification_case_dict)
verification_case_dict_copy["no"] = self.idx
self.idx += 1
verification_case_dict_copy["verification_class"] = verification_case_name
conversion_setting = (
"EnergyPlus" if energyplus_naming_assembly else "default"
)
datapoint_info = self.verification_datapoint_info[verification_case_name][
conversion_setting
]
for key, value in query_dict.items():
if (
key not in self.hvac_zone_name_container
): # prevent HVAC_ZONE class from going through this loop
point_nonmen = datapoint_info[key]["point"]
if energyplus_naming_assembly:
verification_case_dict_copy["datapoints_source"][
"idf_output_variables"
][point_nonmen] = {}
else:
verification_case_dict_copy["datapoints_source"][
"dev_settings"
][point_nonmen] = {}
datapoint_ver_case_idx = self.queried_datapoint_all_dict[
verification_case_name
][index]
try:
subject = datapoint_ver_case_idx[datapoint_info[key]["subject"]]
except KeyError:
subject = None
if energyplus_naming_assembly:
verification_case_dict_copy["datapoints_source"][
"idf_output_variables"
][point_nonmen].update(
{
"subject": "" if subject is None else subject,
"variable": datapoint_info[key]["variable"],
"frequency": "",
}
)
else:
verification_case_dict_copy["datapoints_source"][
"dev_settings"
][point_nonmen].update(
{
"subject": "" if subject is None else subject,
"variable": datapoint_ver_case_idx[
datapoint_info[key]["variable"]
],
"frequency": "",
}
)
# feed in the default values if exist
if default_verification_case_values is not None:
verification_case_dict_copy["run_simulation"] = (
default_verification_case_values["run_simulation"]
)
for key_name in (
"idf",
"idd",
"weather",
"output",
"ep_path",
):
verification_case_dict_copy["simulation_IO"][key_name] = (
default_verification_case_values["simulation_IO"][key_name]
)
verification_case_dict_copy["expected_result"] = (
default_verification_case_values["expected_result"]
)
verification_case_dict_copy["datapoints_source"]["parameters"] = (
default_verification_case_values["parameters"]
)
verification_case_saving_list.append(verification_case_dict_copy)
return verification_case_saving_list
def _default_verification_case_values_sanity_check_helper(
self, default_verification_case_values: dict
) -> None:
def _check_specific_keys_in_nested_dict(keys, dictionary):
for key in dictionary:
if key not in keys:
return False
if isinstance(dictionary[key], dict) and key != "parameters":
if not _check_specific_keys_in_nested_dict(keys, dictionary[key]):
return False
return True
if not _check_specific_keys_in_nested_dict(
CASE_KEY_NAMES, default_verification_case_values
):
logging.error(
f"`default_verification_case_values` dictionary includes NOT allowed key(s). Please verify the dictionary again."
)
return None