Skip to content
Snippets Groups Projects
Commit caf7a77e authored by Iryna de Albuquerque Silva's avatar Iryna de Albuquerque Silva
Browse files

[FUNC] Adds show_graphview funcionality, which outputs the graph's description in JSON. Closes #160

parent fefa086d
No related branches found
No related tags found
3 merge requests!279v0.4.0,!253v0.4.0,!211Add show_graphview funcionality.
import os
import json
import builtins
import aidge_core
import numpy as np
def dfs(graph, node, visited_nodes, sorted_nodes) -> None:
"""
Performs the depth-first search algorithm for topological sorting.
:param graph: An unsorted GraphView of Aidge
:type graph: aidge_core.GraphView
:param node: The GraphView's Node that is being treated
:type node: aidge_core.Node
:param visited_nodes: List of nodes that have already been visited.
:type visited_nodes: list
:param sorted_nodes: List of nodes that have already been sorted.
:type sorted_nodes: list
"""
node_children = node.get_children()
visited_nodes.add(node)
for child in node_children:
if child not in visited_nodes:
dfs(graph, child, visited_nodes, sorted_nodes)
sorted_nodes.append(node)
# Make sure Producers are treated:
parents = []
for parent in node.get_parents():
try:
has_parents = parent.get_parents()
except AttributeError:
has_parents = False
if (not has_parents) and (parent not in sorted_nodes):
parents.append(parent)
parents.reverse()
sorted_nodes.extend(parents)
return None
def topological_sort(graph : aidge_core.GraphView) -> list:
"""
Performs topological sorting by applying depth-first search algorithm recursively.
:param graph: An unsorted GraphView of Aidge
:type graph: aidge_core.GraphView
:return: A list with the GraphView's sorted nodes.
:rtype: list
"""
input_nodes = graph.get_input_nodes()
visited_nodes = set()
sorted_nodes = []
for input in input_nodes:
if input not in visited_nodes:
dfs(input_nodes, input, visited_nodes, sorted_nodes)
sorted_nodes.reverse()
return sorted_nodes
def retrieve_operator_attrs(node : aidge_core.Node) -> dict:
"""
Returns the dictionary containing the attributes of a given Node.
:param graph: A Node in the list of sorted nodes.
:type graph: aidge_core.Node
:return: A dictionary with the Node's attributes.
:rtype: dict
"""
if node.get_operator().attr is not None:
node_attr_dict = node.get_operator().attr.dict()
for key,value in node_attr_dict.items():
if not type(value).__name__ in dir(builtins):
node_attr_dict[key] = value.name
else:
node_attr_dict = {}
return node_attr_dict
def create_dict(sorted_nodes : list, write_trainable_params_ext : bool, write_trainable_params_embed : bool, params_file_format : str, path_trainable_params : str) -> dict:
"""
Creates a dictionary to store the information of a given sorted GraphView.
:param sorted_nodes: A list with the GraphView's sorted nodes.
:type graph: list
:param write_trainable_params_ext: Whether or not to write the eventual trainable parameters of the Nodes in an external file.
:type write_trainable_params_ext: bool
:param write_trainable_params_embed: Whether or not to write the eventual trainable parameters of the Nodes in the same file as the dict (embed).
:type write_trainable_params_embed: bool
:param params_file_format: Format of the external file used to store the Nodes' trainable parameters. Options: 'npz' or 'json'.
:type params_file_format: str
:param path_trainable_params: Path of the external file used to store the Nodes' trainable parameters. Options: 'npz' or 'json'.
:type path_trainable_params: str
:return: A dictionary with the GraphView description.
:rtype: dict
"""
graphview_dict = {'graph': []}
for node in sorted_nodes:
if node is not None:
node_dict = {'name' : node.name(),
'optype' : node.get_operator().type(),
'nb_inputs' : node.get_operator().nb_inputs(),
'nb_outputs' : node.get_operator().nb_outputs()}
inputs = []
for input_idx in range(node.get_operator().nb_inputs()):
input_dict = {'dims' : node.get_operator().get_input(input_idx).dims(),
'data_type' : str(node.get_operator().get_input(input_idx).dtype()),
'data_format' : str(node.get_operator().get_input(input_idx).dformat())}
inputs.append(input_dict)
node_dict['inputs'] = inputs
outputs = []
for output_idx in range(node.get_operator().nb_outputs()):
output_dict = {'dims' : node.get_operator().get_output(output_idx).dims(),
'data_type' : str(node.get_operator().get_output(output_idx).dtype()),
'data_format' : str(node.get_operator().get_output(output_idx).dformat())}
outputs.append(output_dict)
node_dict['outputs'] = outputs
parents = node.get_parents()
if None in parents:
if parents[0] is None: parents.append(parents.pop(0))
else:
pass
parents_inputs = []
for parent in parents:
if parent is not None:
for output_idx in range(parent.get_operator().nb_outputs()):
for input_idx in range(node.get_operator().nb_inputs()):
if parent.get_operator().get_output(output_idx).dims() == node.get_operator().get_input(input_idx).dims():
parents_inputs.append((parent.name(), input_idx))
elif parent is None:
for input_idx in list(range(node.get_operator().nb_inputs())):
if input_idx not in [item[1] for item in parents_inputs]:
parents_inputs.append((None, input_idx))
parents_inputs.sort(key=lambda x: x[1])
node_dict['parents'] = parents_inputs
children_outputs = []
for child in node.get_children():
for input_idx in range(child.get_operator().nb_inputs()):
for output_idx in range(node.get_operator().nb_outputs()):
if child.get_operator().get_input(input_idx).dims() == node.get_operator().get_output(output_idx).dims():
children_outputs.append((child.name(), output_idx))
node_dict['children'] = children_outputs
# Check if my node is a metaop
attributes_dict = {}
if hasattr(node.get_operator(), 'get_micro_graph'):
attributes_dict['micro_graph'] = []
for micro_node in node.get_operator().get_micro_graph().get_nodes():
micro_node_dict = {'name' : micro_node.name(),
'optype' : micro_node.type()}
micro_node_attr_dict = retrieve_operator_attrs(micro_node)
micro_node_dict['attributes'] = micro_node_attr_dict
attributes_dict['micro_graph'].append(micro_node_dict)
else:
node_attr_dict = retrieve_operator_attrs(node)
attributes_dict.update(node_attr_dict)
node_dict['attributes'] = attributes_dict
if node.type() == 'Producer':
if write_trainable_params_ext and params_file_format=='npz':
np.savez_compressed(os.path.join(path_trainable_params, node.name()), **{node.name() : node.get_operator().get_output(0)})
node_dict['tensor_data'] = os.path.join(path_trainable_params, node.name() + '.npz')
elif write_trainable_params_ext and params_file_format=='json':
tensor = np.array(node.get_operator().get_output(0))
tensor_dict = {
node.name() :
{
'dims' : tensor.shape,
'data_type' : str(tensor.dtype),
'tensor_data' : tensor.tolist()
}
}
with open(os.path.join(path_trainable_params, node.name() + '.json'), 'w') as fp:
json.dump(tensor_dict, fp, indent=4)
node_dict['tensor_data'] = os.path.join(path_trainable_params, node.name() + '.json')
elif write_trainable_params_embed:
node_dict['tensor_data'] = np.array(node.get_operator().get_output(0)).tolist()
else:
pass
graphview_dict['graph'].append(node_dict)
else: # node is None
pass
return graphview_dict
def write_dict_json(graphview_dict : dict, json_path : str) -> None:
"""
Writes dictionary containing GraphView description to a JSON file.
:param graphview_dict: A dictionary with the GraphView description.
:type graphview_dict: dict
:param json_path: Path to write JSON file.
:type json_path: str.
"""
with open(json_path, 'w') as fp:
json.dump(graphview_dict, fp, indent=4)
return None
def gview_to_json(gview : aidge_core.GraphView, json_path : str, write_trainable_params_ext : bool = False, write_trainable_params_embed : bool = False, params_file_format : str = None) -> None:
"""
Generates the description for a GraphView in the JSON format.
:param graph: A GraphView of Aidge
:type graph: aidge_core.GraphView
:param json_path: Path to write JSON file.
:type json_path: str.
:param write_trainable_params_ext: Whether or not to write the eventual trainable parameters of the Nodes in an external file.
:type write_trainable_params_ext: bool
:param write_trainable_params_embed: Whether or not to write the eventual trainable parameters of the Nodes in the same file as the dict (embed).
:type write_trainable_params_embed: bool
:param params_file_format: Format of the external file used to store the Nodes' trainable parameters. Options: 'npz' or 'json'.
:type params_file_format: str
:param path_trainable_params: Path of the external file used to store the Nodes' trainable parameters. Options: 'npz' or 'json'.
:type path_trainable_params: str
"""
if write_trainable_params_ext:
dir_name, fname_ext = os.path.split(json_path)
fname = os.path.splitext(fname_ext)[0] + '_trainable_params'
path_trainable_params = os.path.join(dir_name, fname)
os.mkdir(path_trainable_params)
else:
path_trainable_params = ''
# Sort graphview
sorted_nodes = topological_sort(gview)
# Create dict from graphview
graphview_dict = create_dict(sorted_nodes, write_trainable_params_ext, write_trainable_params_embed, params_file_format, path_trainable_params)
# Write dict to json
write_dict_json(graphview_dict, json_path)
return None
import json
import tempfile
import unittest
import builtins
import aidge_core
from aidge_core.show_graphview import gview_to_json
def create_gview():
# Create a LeNet-like model
gview = aidge_core.sequential([aidge_core.PaddedConv2D(in_channels=1, out_channels=6, kernel_dims=[5,5], name='feature_feature_0_Conv', stride_dims=[1,1], padding_dims = [2,2,2,2]),
aidge_core.ReLU(name='feature_feature_1_Relu'),
aidge_core.MaxPooling2D(kernel_dims=[2,2], stride_dims=[2,2], ceil_mode=0, name='feature_feature_2_MaxPool'),
aidge_core.Conv2D(in_channels=6, out_channels=16, kernel_dims=[5,5], name='feature_feature_3_Conv', stride_dims=[1,1], dilation_dims = [1,1]),
aidge_core.ReLU(name='feature_feature_4_Relu'),
aidge_core.MaxPooling2D(kernel_dims=[2,2], stride_dims=[2,2], ceil_mode=0, name='feature_feature_5_MaxPool'),
aidge_core.FC(in_channels=400, out_channels=120, name='classifier_classifier_1_Gemm'),
aidge_core.ReLU(name='classifier_classifier_2_Relu'),
aidge_core.FC(in_channels=120, out_channels=84, name='classifier_classifier_3_Gemm'),
aidge_core.ReLU(name='classifier_classifier_4_Relu'),
aidge_core.FC(in_channels=84, out_channels=10, name='classifier_classifier_5_Gemm'),
])
# Fill Producers
for node in gview.get_nodes():
if node.type() == "Producer":
prod_op = node.get_operator()
value = prod_op.get_output(0)
value.set_backend("cpu")
tuple_out = node.output(0)[0]
if (tuple_out[0].type() == "Conv" or tuple_out[0].type() == "PaddedConv") and tuple_out[1]==1:
# Conv weight
aidge_core.xavier_uniform_filler(value)
elif tuple_out[0].type() == "Conv" and tuple_out[1]==2:
# Conv bias
aidge_core.constant_filler(value, 0.01)
elif tuple_out[0].type() == "FC" and tuple_out[1]==1:
# FC weight
aidge_core.normal_filler(value)
elif tuple_out[0].type() == "FC" and tuple_out[1]==2:
# FC bias
aidge_core.constant_filler(value, 0.01)
else:
pass
# Compile model
gview.forward_dims([[1, 1, 28, 28]])
gview.set_datatype(aidge_core.dtype.float32)
return gview
class test_show_gview(unittest.TestCase):
"""Test aidge show GraphView
"""
def setUp(self):
pass
def tearDown(self):
pass
def test_model_to_json(self):
gview = create_gview()
# Create temporary file to store JSON model description
model_description_file = tempfile.NamedTemporaryFile(mode="w+")
gview_to_json(gview, model_description_file.name)
# Load JSON
with open(model_description_file.name, 'r') as fp:
model_json = json.load(fp)
# Get list of nodes of Aidge graphview
gview_ranked_nodes = gview.get_ranked_nodes()[0]
# Iterate over ranked_nodes
self.assertEqual(len(gview_ranked_nodes), len(model_json['graph']))
for node_gview in gview_ranked_nodes:
for node_json in model_json['graph']:
if node_gview.name() == node_json['name']:
self.assertEqual(node_gview.get_operator().type(), node_json['optype'])
self.assertEqual(node_gview.get_operator().nb_inputs(), node_json['nb_inputs'])
self.assertEqual(node_gview.get_operator().nb_outputs(), node_json['nb_outputs'])
self.assertEqual(node_gview.get_operator().nb_inputs(), len(node_json['inputs']))
for input_idx in range(node_gview.get_operator().nb_inputs()):
self.assertEqual(node_gview.get_operator().get_input(input_idx).dims(), node_json['inputs'][input_idx]['dims'])
self.assertEqual(str(node_gview.get_operator().get_input(input_idx).dtype()), node_json['inputs'][input_idx]['data_type'])
self.assertEqual(str(node_gview.get_operator().get_input(input_idx).dformat()), node_json['inputs'][input_idx]['data_format'])
self.assertEqual(node_gview.get_operator().nb_outputs(), len(node_json['outputs']))
for output_idx in range(node_gview.get_operator().nb_outputs()):
self.assertEqual(node_gview.get_operator().get_output(output_idx).dims(), node_json['outputs'][output_idx]['dims'])
self.assertEqual(str(node_gview.get_operator().get_output(output_idx).dtype()), node_json['outputs'][output_idx]['data_type'])
self.assertEqual(str(node_gview.get_operator().get_output(output_idx).dformat()), node_json['outputs'][output_idx]['data_format'])
self.assertEqual(len(node_gview.get_parents()), len(node_json['parents']))
self.assertEqual(len(node_gview.get_children()), len(node_json['children']))
if not hasattr(node_gview.get_operator(), 'get_micro_graph'):
try:
self.assertEqual(len(node_gview.get_operator().attr.dict()), len(node_json['attributes']))
self.assertDictEqual(node_gview.get_operator().attr.dict(), node_json['attributes'])
except AttributeError:
self.assertIsNone(node_gview.get_operator().attr) and self.assertFalse(node_json['attributes'])
elif hasattr(node_gview.get_operator(), 'get_micro_graph'):
self.assertEqual(len(node_gview.get_operator().get_micro_graph().get_nodes()), len(node_json['attributes']['micro_graph']))
for micro_node_gview in node_gview.get_operator().get_micro_graph().get_nodes():
for micro_node_json in node_json['attributes']['micro_graph']:
if micro_node_gview.get_operator().type() == micro_node_json['optype']:
for key, value in micro_node_gview.get_operator().attr.dict().items():
if not type(value).__name__ in dir(builtins):
# Replace original value by its name (str) because value is of a type that could not be written to the JSON
# Cannot update this dict inplace : micro_node_gview.get_operator().attr.dict().update({key : value.name})
temp_mnode_dict = micro_node_gview.get_operator().attr.dict()
temp_mnode_dict.update({key : value.name})
self.assertDictEqual(temp_mnode_dict, micro_node_json['attributes'])
if __name__ == '__main__':
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment