"""
workflow.py
====================================
Workflow API
"""
import glob
import sys, logging, json, os, datetime
from typing import Union
sys.path.append("./constrain")
sys.path.append("..")
from constrain.api import (
BrickCompliance,
VerificationLibrary,
DataProcessing,
VerificationCase,
Verification,
Reporting,
)
# helper
[docs]
def timenow():
return datetime.datetime.now()
[docs]
def strnow():
return timenow().strftime("%H:%M:%S")
[docs]
class WorkflowEngine:
def __init__(self, workflow, run_workflow_now=False):
self.end_state_name = None
self.start_state_name = None
self.payloads = {}
self.states = {}
self.workflow_dict = {}
self.running_sequence = []
# checking workflow validity is handed over to the API method.
if isinstance(workflow, str):
self.load_workflow_json(workflow)
if isinstance(workflow, dict):
self.workflow_dict = workflow
self.load_states()
if run_workflow_now:
self.run_workflow()
[docs]
def validate(self, verbose: bool = False) -> bool:
"""function to be implemented to check for high level validity of the workflow definition"""
workflow_schema = {
"workflow_name": str,
"meta": {"author": str, "date": str, "version": str, "description": str},
"imports": list,
"states": {"*": dict},
}
def _validate_workflow(schema, instance, verbose) -> bool:
# TODO: JXL this is duplicate from VerificationCase.validate_verification_case_structure._validate_case_structure_helper. Refactor needed.
for schema_key, schema_value in schema.items():
# accommodate data points alike scenarios (random string key with required value structure)
if schema_key == "*":
keys = list(instance.keys())
else:
keys = [schema_key]
for key in keys:
# check key match
if key not in instance:
logging.error(f"Missing required key '{key}' in {instance}")
return False
case_value = instance[key]
if isinstance(schema_value, dict):
# recursively check nested value
is_cases_valid = _validate_workflow(
schema_value, case_value, verbose
)
if not is_cases_valid:
return False
else:
# check leaf value match
# if require float, it can be int
if schema_value == float:
eligible_types = [int, float]
else:
eligible_types = [schema_value]
type_match_list = [
isinstance(case_value, sv) for sv in eligible_types
]
if not any(type_match_list):
logging.error(
f"The type of '{key}' key must be {schema_value}, but {type(case_value)} is provided."
)
return False
else:
if verbose:
print(
f"The type of {key} has the correct type {schema_value}"
)
return True
return _validate_workflow(workflow_schema, self.workflow_dict, verbose)
[docs]
def import_package(self) -> None:
"""Import third party packages based on the "imports" element values of the workflow json.
E.g.: {
...
"imports": ["numpy as np","pandas as pd","datetime"],
...
}
"""
import_list = []
if "imports" in self.workflow_dict:
import_list = self.workflow_dict["imports"]
for line in import_list:
cp = line
if " as " in line:
cp = line.split(" as ")[-1]
exec(f"import {line}", globals())
[docs]
def load_workflow_json(self, workflow_path: str) -> None:
"""Load workflow from a json workflow definition.
Args:
workflow_path (str): path to the workflow json file.
"""
with open(workflow_path) as f:
self.workflow_dict = json.load(f)
[docs]
def load_states(self) -> None:
"""load states from the workflow definition with some sanity checks."""
for state_name, state_dict in self.workflow_dict["states"].items():
# there should be no duplicate names
if state_name in self.states:
logging.error(
f"State name [{state_name}] is not unique in the loaded workflow. This is NOT allowed."
)
return None
self.states[state_name] = state_dict
if "Start" in state_dict and state_dict["Start"] == "True":
self.start_state_name = state_name
if "End" in state_dict and state_dict["End"] == "True":
self.end_state_name = state_name
continue
if (state_dict["Type"] != "Choice") and ("Next" not in state_dict):
logging.error(
f"Non-Choice state [{state_name} is not an End state and it has no Next state. This is NOT allowed."
)
[docs]
def run_state(self, state_name: str) -> Union[None, str]:
"""Run a specific states by state_name. This is not a external facing method and is only supposed to be called by run_workflow.
Args:
state_name (str): name of the state to execute.
Returns:
Union[None, str]: name of the next state to run or None if there is no next state.
"""
state_dict = self.states[state_name]
state_type = state_dict["Type"]
next_state = None
if state_type == "MethodCall":
self.payloads = MethodCall(state_dict, self.payloads).run().get_payloads()
if "Next" in state_dict:
next_state = state_dict["Next"]
if state_type == "Choice":
next_state = Choice(state_dict, self.payloads).check_choices()
if (state_name != self.end_state_name) and (next_state is not None):
return next_state
else:
return None
[docs]
def run_workflow(self, verbose=True, max_states: int = 1000) -> None:
"""Workflow runner with a maximum steps allowed setting.
Args:
max_states (int, optional): Maximum number of states to run allowed. Defaults to 1000.
"""
start_time = timenow()
print(f"Start running workflow at {start_time.strftime('%H:%M:%S')}")
if verbose:
print(f"Importing packages specified in workflow -- [{strnow()}]")
self.import_package()
current_state_name = self.start_state_name
if verbose:
print(
f"Start running workflow with start state [{self.start_state_name}] with maximum allowable number of states being [{max_states}] -- [{strnow()}]"
)
state_count = 0
touch_limit = False
while current_state_name is not None:
state_count += 1
if verbose:
print(
f"Running state {state_count}: [{current_state_name}] ...", end=" "
)
self.running_sequence.append(current_state_name)
current_state_name = self.run_state(current_state_name)
if verbose:
print(f"Done. -- [{strnow()}]")
if state_count > max_states:
e_msg = (
"Reaching maximum allowable number of states. Workflow terminated."
)
if verbose:
print(e_msg)
touch_limit = True
logging.warning(
"Reaching maximum allowable number of states. Workflow terminated."
)
break
end_time = timenow()
duration = end_time - start_time
print(
f"Workflow done at {end_time.strftime('%H:%M:%S')}, a total of {state_count} states were executed in {duration}."
)
[docs]
def summarize_workflow_run(self) -> dict:
"""Summarize the states running sequence after a workflow is executed.
Returns:
dict: a summary dictionary with two keys: `total_states_executed` and `state_running_sequence`.
"""
print(
f"A total of {len(self.running_sequence)} states were executed with the following sequence:"
)
step_count = 0
for element in self.running_sequence:
step_count += 1
print(f"State {step_count}: {element}")
return {
"total_states_executed": step_count,
"state_running_sequence": self.running_sequence,
}
[docs]
class Workflow:
def __init__(self, workflow: Union[str, dict] = None):
"""Instantiate a Workflow class object and load specified workflow as a `dict` in `self.workflow`
Args:
workflow (Union[str, dict], optional): str path to the workflow definition json file or dict of the actual workflow definition. Defaults to None.
"""
self.workflow_engine = None
if workflow is None:
logging.warning(
"Workflow is not provided at time of initialization. self.workflow_engine is set to None. Workflow can be added later by calling self.load_workflow(workflow)"
)
else:
self.load_workflow(workflow)
[docs]
def load_workflow(self, workflow: Union[str, dict]) -> None:
"""Load workflow definition from a json file or dict to `self.workflow`.
Args:
workflow (Union[str, dict]): str path to the workflow definition json file or dict of the actual workflow definition.
"""
if self.workflow_engine is not None:
logging.warning(
"self.workflow_engine is not None, load_workflow will overwrite it with a new workflow."
)
self.workflow_engine = self.create_workflow_engine(workflow)
[docs]
@staticmethod
def create_workflow_engine(
workflow: Union[str, dict]
) -> Union[None, WorkflowEngine]:
"""Instantiate a WorkflowEngine object with specified workflow definition.
Args:
workflow (Union[str, dict]): str path to the workflow definition json file or dict of the actual workflow definition. Defaults to None.
Returns:
Union[None, WorkflowEngine]: Instantiated WorkflowEngine object if provided workflow is valid; None otherwise.
"""
if (isinstance(workflow, str) and os.path.isfile(workflow)) or isinstance(
workflow, dict
):
workflow_engine = WorkflowEngine(workflow)
else:
logging.error(
"workflow needs to be either a str path to the workflow json file or a dict of the workflow definition."
)
workflow_engine = None
return workflow_engine
[docs]
@staticmethod
def list_existing_workflows(workflow_dir: str = None) -> Union[dict, None]:
"""List existing workflows (defined as json files) under a specific directory path.
Args:
workflow_dir (str, optional): path to the directory containing workflow definitions (including sub directories). By default, point to the path of the example folder. @JXL TODO example folder to be specified
Returns:
Union[dict, None]: `dict` with keys being workflow names and values being a `Dict` with the following keys:
- `workflow_json_path`: path to the file of the workflow
- `workflow`: `Dict` of the workflow, loaded from the workflow json definition
"""
if isinstance(workflow_dir, str) and os.path.isfile(workflow_dir):
if workflow_dir[-1] != "/":
workflow_dir = workflow_dir + "/"
found_files = glob.glob(f"{workflow_dir}**/*.json", recursive=True)
else:
logging.error(
"workflow_dir needs to point to a valid path containing potential workflow json files."
)
return None
num_found_files = len(found_files)
if num_found_files == 0:
logging.warning(f"There is no json file in {workflow_dir}")
return None
workflows = {}
for json_file_path in found_files:
temp_wfe = WorkflowEngine(json_file_path)
if temp_wfe.validate():
workflows[temp_wfe.workflow_dict["workflow_name"]] = {
"workflow_json_path": json_file_path,
"workflow": temp_wfe.workflow_dict,
}
[docs]
@staticmethod
def validate_workflow_definition(workflow: Union[str, dict], verbose=False) -> dict:
"""Validate a workflow definition.
Args:
workflow (Union[str, dict]): If str, this is assumed to be the path to the workflow definition json file; If dict, this is assumed to be loaded from the workflow json definition.
verbose (bool, optional): Verbose output for validate. Defaults to False.
Returns:
dict: `dict` with the following keys:
- `workflow_validity`: bool flag of the validity of the workflow definition
- `detail`: detailed info about the validity check.
"""
temp_workflow_engine = Workflow.create_workflow_engine(workflow)
return temp_workflow_engine.validate()
[docs]
def validate(self, verbose=False) -> dict:
"""Validate `self.workflow`
Args:
verbose (bool, optional): Verbose output for validate. Defaults to False.
Returns:
dict: `dict` with the following keys:
- `workflow_validity`: bool flag of the validity of the workflow definition
- `detail`: detailed info about the validity check.
"""
return self.workflow_engine.validate()
[docs]
def save(self, json_path: str = None) -> None:
"""Save the workflow as a json file.
Args:
json_path (str, optional): path to the file to be saved. Defaults to None.
"""
if self.workflow_engine is None:
logging.warning("self.workflow_engine is None, there is nothing to save.")
else:
if isinstance(json_path, str):
self.workflow_engine.save_workflow(json_path)
else:
logging.error("json_path needs to be str")
[docs]
def run_workflow(self, verbose: bool = False) -> bool:
"""Execute the workflow defined in self.workflow
Args:
verbose (bool, optional): bool. Wether to output detailed information. Defaults to False.
Returns:
bool: Whether the run is successful or not.
"""
self.workflow_engine.run_workflow(verbose)
[docs]
class MethodCall:
"""The MethodCall State class. This class also covers the `Embedded MethodCall` state type.
A typical use case of execute a MethodCall state would be: `self.payloads = MethodCall(state_dict, self.payloads).run().get_payloads()`
"""
def __init__(self, state_dict, payloads):
self.payloads = payloads # self.payloads takes in the input payloads, add key-value pairs contents based on state_dict['Payloads'], and will be available by self.get_payloads()
self.parameters = {}
self.state_dict = state_dict
self.dollar = None # the return of the current method call will be stored in this variable.
self.build_parameters(state_dict)
[docs]
def build_parameters(self, state_dict):
if isinstance(state_dict["Parameters"], dict):
for k, v in state_dict["Parameters"].items():
self.parameters[k] = self.build_param_value(v)
if isinstance(state_dict["Parameters"], list):
# some python built-in methods only accept positional arguments
self.parameters = [
self.build_param_value(x) for x in state_dict["Parameters"]
]
[docs]
def build_param_value(self, v):
if isinstance(v, dict):
# if parameter value is a dict, then it needs to be an embedded method call style
new_v = self.embedded_call(v)
return new_v
if isinstance(v, int) or isinstance(v, float):
return v
if isinstance(v, str):
# special string treatment
Payloads = (
self.payloads
) # to be used by "...Payloads['xxx']... in parameter value" TODO:JXL to be tested.
if v.split("[")[0] == "Payloads":
# only in this case we eval
return eval(v)
elif v[:3] == "+x ": # unique prefix for evaluate the rest of the string
return eval(v[3:])
else:
# in all other string param value, we consider it is a string. This is for clarity and security. More complicated parameter should use embedded methodcall
return v
[docs]
def run(self):
Payloads = (
self.payloads
) # to be used by "...Payloads['xxx']... in self.state_dict["MethodCall"]"
method_call = eval(self.state_dict["MethodCall"])
# TODO JXL: handle non-method calls, maybe just (self.dollar = method_call)? Need tests
if isinstance(self.parameters, dict):
self.dollar = method_call(**self.parameters)
if isinstance(self.parameters, list):
self.dollar = method_call(*self.parameters)
return self
[docs]
def get_method_return(self):
return self.dollar
[docs]
def get_payloads(self):
Payloads = self.payloads
if "Payloads" in self.state_dict:
for k, v in self.state_dict["Payloads"].items():
if isinstance(v, str):
# only eval if v is string
self.payloads[k] = eval(v.replace("$", "self.dollar"))
else:
self.payloads[k] = v
return self.payloads
[docs]
def embedded_call(self, embedded_case_dict):
if embedded_case_dict["Type"] != "Embedded MethodCall":
logging.error(
f"If parameter value is a dict, it needs to be of type 'Embedded MethodCall'. Problematic state definition: {embedded_case_dict}"
)
return None
return MethodCall(embedded_case_dict, self.payloads).run().get_method_return()
[docs]
class Choice:
"""The Choice state that check conditions to decide next step.
A typical use case of execute a Choice state would be: `next_state = Choice(state_dict, self.payloads).check_choices()`
"""
def __init__(self, state_dict, payloads):
self.payloads = payloads
self.state_dict = state_dict
[docs]
def check_choices(self):
for choice in self.state_dict["Choices"]:
choice_return = self.check_choice(choice)
if isinstance(choice_return, str):
return choice_return
# if current choice does not give a next step, then check the next one
continue
# no valid next step has been identified. check if there is a default state, if not, log error
if "Default" in self.state_dict:
return self.state_dict["Default"]
else:
logging.error("Among all choices, no valid next step identified. Abort!")
return None
[docs]
def check_choice(self, choice):
if not isinstance(choice, dict):
logging.error("choice has to be a dict")
return None
if "Value" not in choice:
# when 'Value' is not in choice, a logical expression key is expected
# check only one logic expression in the key
eligible_logic = ["ALL", "ANY", "NONE"]
keycheck_flag = [(ek in choice) for ek in eligible_logic]
if sum(keycheck_flag) != 1:
logging.error(
"For logical expression choices state, there needs to be exactly one key of either 'ALL', or 'ANY', or 'NONE'."
)
return None
if "ALL" in choice:
flag_list = [self.get_choice_value(x) for x in choice["AND"]]
choice_value = all(flag_list)
if "NONE" in choice:
flag_list = [self.get_choice_value(x) for x in choice["NONE"]]
choice_value = not any(flag_list)
if "ANY" in choice:
choice_value = False
for subchoice in choice["ANY"]:
if self.get_choice_value(subchoice):
choice_value = True
break
else:
# 'leaf' choice implementation
choice_value = self.get_choice_value(choice)
if choice_value:
return choice["Next"]
else:
return False
[docs]
def get_choice_value(self, choice):
Payloads = self.payloads # to be used by "...Payloads['xxx']... in choice dict"
left = choice["Value"]
right = choice["Equals"]
# only eval when it is a string
if isinstance(left, str):
left = eval(choice["Value"])
if isinstance(right, str):
right = eval(choice["Equals"])
if left == right:
return True
else:
return False
[docs]
def main():
WorkflowEngine("./tests/api/data/testworkflow.json")
if __name__ == "__main__":
main()