Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • eclipse/aidge/aidge_core
  • hrouis/aidge_core
  • mszczep/aidge_core
  • oantoni/aidge_core
  • cguillon/aidge_core
  • jeromeh/aidge_core
  • axelfarr/aidge_core
  • cmoineau/aidge_core
  • noamzerah/aidge_core
  • lrakotoarivony/aidge_core
  • silvanosky/aidge_core
  • maab05/aidge_core
  • mick94/aidge_core
  • lucaslopez/aidge_core_ll
  • wboussella/aidge_core
  • farnez/aidge_core
  • mnewson/aidge_core
17 results
Show changes
Showing
with 2088 additions and 61 deletions
import aidge_core
from abc import ABC, abstractmethod
class ExportNode(ABC):
"""Abstract class to interface node with export generation.
"""
@abstractmethod
def __init__(self, aidge_node: aidge_core.Node) -> None:
"""Create ExportNode and retieve attirubtes from ``aidge_node``:
- name: aidge Node name
- attributes: dictionnary of attributes of the aidge Operator linked to the node, attributes name follow aidge naming convention
- parameters: List of parameters node, order in the list is the same as the one defined by the aidge operator
"""
super().__init__()
self.node = aidge_node
self.operator = aidge_node.get_operator()
self.name = self.node.name()
self.attributes = {} # Attributes are auto fetched from aidge operators
if isinstance(self.operator, aidge_core.Attributes):
for attr_name in self.operator.get_attrs_name():
self.attributes[attr_name] = self.operator.get_attr(attr_name)
# rename is_leaf ?
self.is_last = len(self.node.get_children()) == 0
self.inputs = []
self.outputs = []
self.inputs_dims = []
self.outputs_dims = []
for idx, parent_node in enumerate(self.node.get_parents()):
self.inputs.append(parent_node)
if parent_node is not None:
self.inputs_dims.append(self.operator.get_input(idx).dims())
else:
self.inputs_dims.append(None)
for idx, child_node in enumerate(self.node.get_children()):
self.outputs.append(child_node)
# Dirty hot fix, change it quickly
self.outputs_dims.append(self.operator.get_output(0).dims())
@abstractmethod
def export(self, export_folder:str, list_configs:list):
"""Define how to export the node definition.
"""
pass
@abstractmethod
def forward(self, list_actions:list):
"""Define how to generate code to perform a forward pass.
"""
pass
from .node_export import ExportNode, ExportNodeCpp
from .code_generation import generate_file, generate_str, copy_file
from .export_registry import ExportLib
from .scheduler_export import scheduler_export
from .tensor_export import tensor_to_c, generate_input_file
from .generate_main import generate_main_cpp, generate_main_compare_cpp
from pathlib import Path
from jinja2 import Environment, FileSystemLoader, StrictUndefined
from typing import Union
import os
import shutil
def generate_file(file_path: Union[Path, str], template_path: Union[Path, str], **kwargs) -> None:
"""Generate a file at `file_path` using the jinja template located at `file_path`.
kwargs are used to fill the template.
:param file_path: path where to generate the file
:type file_path: pathlib.Path or str
:param template_path: Path to the template to use for code generation
:type template_path: pathlib.Path or str
"""
# Convert str -> Path for compatibility !
if isinstance(file_path, str):
file_path = Path(file_path)
if isinstance(template_path, str):
template_path = Path(template_path)
if not template_path.exists():
raise ValueError(f"Path to template {template_path} is not valid !")
# Make dir
file_path.parent.mkdir(parents=True, exist_ok=True)
# Generate file
with open(file_path, mode="w", encoding="utf-8") as file:
file.write(generate_str(template_path, **kwargs))
def generate_str(template_path: Union[Path, str], **kwargs) -> str:
"""Generate a string using the jinja template located at `file_path`.
kwargs are used to fill the template.
:param template_path: Path to the template to use for code generation
:type template_path: pathlib.Path or str
:return: A string of the interpreted template
:rtype: str
"""
# Convert str -> Path for compatibility !
if isinstance(template_path, str):
template_path = Path(template_path)
return Environment(loader=FileSystemLoader(
template_path.parent), undefined=StrictUndefined, keep_trailing_newline=True).get_template(template_path.name).render(kwargs)
def copy_file(filename, dst_folder):
# If directory doesn't exist, create it
if not os.path.exists(dst_folder):
os.makedirs(dst_folder)
shutil.copy(filename, dst_folder)
import numpy as np
import aidge_core
from typing import Dict
datatype_converter_aidge2c = {
aidge_core.dtype.float64 : "double",
aidge_core.dtype.float32 : "float",
aidge_core.dtype.float16 : "half_float::half",
aidge_core.dtype.int8 : "int8_t",
aidge_core.dtype.int16 : "int16_t",
aidge_core.dtype.int32 : "int32_t",
aidge_core.dtype.int64 : "int64_t",
aidge_core.dtype.uint8 : "uint8_t",
aidge_core.dtype.uint16 : "uint16_t",
aidge_core.dtype.uint32 : "uint32_t",
aidge_core.dtype.uint64 : "uint64_t"
}
def aidge2c(datatype):
"""Convert a aidge datatype to C type
If the type is not convertible to a C type (e.g. int4), return None and raise a warning.
:param datatype: Aidge datatype to convert
:type datatype: :py:object:`aidge_core.DataType`
:return: A string representing the C type
:rtype: string
"""
if datatype in datatype_converter_aidge2c:
return datatype_converter_aidge2c[datatype]
else:
raise ValueError(f"Unsupported {datatype} aidge datatype")
def aidge2export_type(datatype: aidge_core.dtype, conversion_map: Dict[aidge_core.dtype, str] = datatype_converter_aidge2c) -> str:
"""Convert a aidge datatype to the export type specified by the map passed in argument
If the aidge type is not convertible, that is to say, is not specified in the map, a value Error is raised.
:param datatype: Aidge datatype to convert
:type datatype: :py:object:`aidge_core.DataType`
:param conversion_map: Map that specify the conversion
:type conversion_map: Dict[:py:object:`aidge_core.DataType`, str]
:return: A string representing the export type
:rtype: string
"""
if datatype in conversion_map:
return conversion_map[datatype]
else:
raise ValueError(f"Unsupported type conversion {datatype} aidge datatype for export")
from typing import Dict, List
import aidge_core
from aidge_core.export_utils import ExportNode
class classproperty:
"""Helper class to define class-level properties.
Equivalent to applying both the ``@property`` and ``@classmethod`` decorators,
allowing methods to be accessed as class properties. These two decorators
are otherwise incompatible prior to Python 3.12.
See discussion: https://discuss.python.org/t/add-a-supported-read-only-classproperty-decorator-in-the-stdlib/18090/12
"""
def __init__(self, fget):
"""
:param fget: Function to be wrapped as a class property.
:type fget: Callable
"""
self.fget = fget
def __get__(self, instance, owner):
return self.fget(owner)
class ExportLib(aidge_core.OperatorImpl):
"""Aidge export library that manages a registry for operators and static files.
This class provides a structure for registering different operator types and
export nodes, facilitating access and management of these elements.
:ivar _name: The name of the export library, used for namespacing.
:ivar static_files: A dictionary mapping paths of static files to their target locations relative to the export root.
"""
# PUBLIC
# Lib name useful ?
# Help define namespace
_name: str = None
# key: Path where static file is
# Value: Path where to copy the file relative to the export root
static_files: Dict[str, str] = {}
# Main memory section
mem_section = None
# Custom forward generation jinja file
forward_template: str = None
forward_header_template: str = None
# PRIVATE
# Registry of exportNode, class level dictionary, shared across all ExportLib
_cls_export_node_registry = {}
def __init__(self, operator):
super(ExportLib, self).__init__(operator, self._name)
if self._name is None:
raise ValueError("ExportLib {self.__class__.__name__} does not define the attribute ``_name``.")
# TODO: This is a patch, setBackend method is used to set an ExportLib as an Implementation.
# But it also set the backend of the Tensor and we don't define a backend for Tensor.
# The following code block associate to the export backend the "cpu" implementation.
# This is tracked by the issue :
# https://gitlab.eclipse.org/eclipse/aidge/aidge_core/-/issues/178
aidge_core.register_Tensor([self._name, aidge_core.dtype.float32],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.float32]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.float16],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.float16]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.int8],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.int8]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.int16],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.int16]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.int32],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.int32]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.int64],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.int64]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.uint8],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.uint8]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.uint16],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.uint16]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.uint32],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.uint32]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.uint64],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.uint64]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.int4],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.int4]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.uint4],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.uint4]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.dual_int4],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.dual_int4]))
aidge_core.register_Tensor([self._name, aidge_core.dtype.dual_uint4],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.dual_uint4]))
@classproperty
def _export_node_registry(cls) -> Dict[str, List['ExportNode']]:
"""Define as a class property to access the registry at class level while keeping it at instance level.
:return: The export node registry specific to the class
:rtype: Dict[str, List[ExportNode]]
"""
return cls._cls_export_node_registry.setdefault(cls, {})
def get_available_impl_specs(self) -> List[aidge_core.ImplSpec]:
"""Override the virtual OperatorImpl method, in order to provide available
implementation specifications.
:return: List of implementation specification available for the type of operator.
:rtype: List[aidge_core.ImplSpec]
"""
if self.get_operator().type() in self._export_node_registry:
spec_vec = [i for i, _ in self._export_node_registry[self.get_operator().type()]]
return spec_vec
else:
return []
def get_export_node(self, spec: aidge_core.ImplSpec) -> ExportNode:
"""Given an :py:class:`aidge_core.ImplSpec`, return the ExportNode that is the closest match.
:param spec: Implementation specification to match
:type spec: :py:class:`aidge_core.ImplSpec
:return: The class ExportNode that is the closest match
:rtype: aidge_core.ImplSpec
"""
for registered_spec, export_node in self._export_node_registry[self.get_operator().type()]:
if registered_spec == spec:
return export_node
return None
@classmethod
def register(cls, op_type, spec):
"""Decorator to register an operator implementation for a specified operator type.
Registers an operator under a given operator type and specification,
adding it to the export library registry. This method supports both
single operator types (str) and lists of types (List[str]).
:param op_type: The operator type(s) to register.
:type op_type: Union[str, List[str]]
:param spec: Implementation specification for the operator.
:type spec: :py:class:``aidge_core.ImplSpec``
:return: A wrapper class that initializes the registered operator.
:rtype: Callable
"""
def decorator(operator):
class Wrapper(operator):
def __init__(self, *args, **kwargs):
return operator(*args, **kwargs)
type_list = []
if isinstance(op_type, list):
type_list = op_type
elif isinstance(op_type, str):
type_list = [op_type]
else:
raise TypeError("Argument type of register method should be of type 'List[str]' or 'str', got {type(type)}")
for type_name in type_list:
if (type_name not in cls._export_node_registry):
cls._export_node_registry[type_name] = []
cls._export_node_registry[type_name].append((spec, operator))
register_func: str = f"register_{type_name}Op"
# If operator is not defined, then it means we try to register a MetaOperator
if register_func not in dir(aidge_core):
raise ValueError(f"Operator of type: {type_name} is not declared as registrable!\nHint: If you try to register a MetaOperator use register_metaop instead.")
else:
# Equivalent to aidge_core.register_ConvOp("ExportLibX", ExportLibX)
aidge_core.__getattribute__(register_func)(cls._name, cls)
return Wrapper
return decorator
@classmethod
def register_metaop(cls, op_type, spec):
"""Decorator to register a MetaOperator with the export library.
Registers a MetaOperator under a given operator type and specification. This decorator
is intended for operator types that are grouped as meta operators.
:param op_type: Operator type(s) to register as a ``MetaOperator``.
:type op_type: Union[str, List[str]]
:param spec: Implementation specification for the MetaOperator.
:type spec: aidge_core.ImplSpec
:return: A wrapper class that initializes the registered MetaOperator.
:rtype: Callable
"""
def decorator(operator):
class Wrapper(operator):
def __init__(self, *args, **kwargs):
return operator(*args, **kwargs)
type_list = []
if isinstance(op_type, list):
type_list = op_type
elif isinstance(op_type, str):
type_list = [op_type]
else:
raise TypeError("Argument 'op_type' of register method should be of type 'List[str]' or 'str', got {type(type)}")
for type_name in type_list:
if (type_name not in cls._export_node_registry):
cls._export_node_registry[type_name] = []
cls._export_node_registry[type_name].append((spec, operator))
aidge_core.register_MetaOperatorOp([cls._name, type_name], cls)
spec.attrs.add_attr("type", type_name) # MetaOperator specs need to verify the type
return Wrapper
return decorator
@classmethod
def register_generic(cls, op_type, spec):
"""Decorator to register a GenericOperator with the export library.
Registers a GenericOperator under a given operator type and specification. This decorator
is intended for operator types that are grouped as meta operators.
:param op_type: Operator type(s) to register as a ``GenericOperator``.
:type op_type: Union[str, List[str]]
:param spec: Implementation specification for the GenericOperator.
:type spec: aidge_core.ImplSpec
:return: A wrapper class that initializes the registered GenericOperator.
:rtype: Callable
"""
def decorator(operator):
class Wrapper(operator):
def __init__(self, *args, **kwargs):
return operator(*args, **kwargs)
type_list = []
if isinstance(op_type, list):
type_list = op_type
elif isinstance(op_type, str):
type_list = [op_type]
else:
raise TypeError("Argument 'op_type' of register method should be of type 'List[str]' or 'str', got {type(type)}")
for type_name in type_list:
if (type_name not in cls._export_node_registry):
cls._export_node_registry[type_name] = []
cls._export_node_registry[type_name].append((spec, operator))
aidge_core.register_GenericOperatorOp([cls._name, type_name], cls)
spec.attrs.add_attr("type", type_name) # GenericOperator specs need to verify the type
return Wrapper
return decorator
import aidge_core
from pathlib import Path
from aidge_core.export_utils import generate_file, data_conversion
def generate_main_cpp(export_folder: str, graph_view: aidge_core.GraphView, inputs_tensor=None) -> None:
"""
Generate a C++ file to manage the forward pass of a model using the given graph structure.
This function extracts details from the :py:class:`aidge_core.graph_view` object, including input and output node names, data types,
and tensor sizes. It uses this data to populate a C++ file template (`main.jinja`), creating a file (`main.cpp`)
that call the `model_forward` function, which handles data flow and processing for the exported model.
This function also generate files containing input tensor if they have been set.
:param export_folder: Path to the folder where the generated C++ file (`main.cpp`) will be saved.
:type export_folder: str
:param graph_view: An instance of :py:class:`aidge_core.graph_view`, providing access to nodes and
ordered input/output data within the computational graph.
:type graph_view: aidge_core.graph_view
:param inputs_tensor: **For future** argument to provide tensor to use in the main function, not implemented yet!
:type inputs_tensor: None
:raises RuntimeError: If there is an inconsistency in the output arguments (names, data types, sizes),
indicating an internal bug in the graph representation.
"""
outputs_name: list[str] = []
outputs_dtype: list[str] = []
outputs_size: list[int] = []
inputs_name: list[str] = []
gv_inputs: list[tuple[aidge_core.Node, int]] = graph_view.get_ordered_inputs()
gv_outputs: list[tuple[aidge_core.Node, int]] = graph_view.get_ordered_outputs()
for in_node, in_idx in gv_inputs:
in_node_input, in_node_input_idx = in_node.input(in_idx)
in_name = f"{in_node.name()}_input_{in_idx}" if in_node_input is None else f"{in_node_input.name()}_output_{in_node_input_idx}"
inputs_name.append(in_name)
input_tensor = in_node.get_operator().get_input(in_idx)
if input_tensor is None or input_tensor.undefined() or not input_tensor.has_impl():
if inputs_tensor is not None:
aidge_core.Log.notice("No support for inputs_tensor argument yet.")
aidge_core.Log.notice(f"No input tensor set for {in_name}, main generated will not be functionnal after code generation.")
else:
aidge_core.Log.notice(f"No input tensor set for {in_name}, main generated will not be functionnal after code generation.")
else:
aidge_core.export_utils.generate_input_file(export_folder=export_folder, array_name=in_name, tensor=input_tensor)
for out_node, out_id in gv_outputs:
outputs_name.append(f"{out_node.name()}_output_{out_id}")
out_tensor = out_node.get_operator().get_output(out_id)
outputs_dtype.append(data_conversion.aidge2c(out_tensor.dtype()))
outputs_size.append(out_tensor.size())
if len(outputs_name) != len(outputs_dtype) or len(outputs_name) != len(outputs_size):
raise RuntimeError("FATAL: Output args list does not have the same length this is an internal bug.")
ROOT = Path(__file__).resolve().parents[0]
generate_file(
str(Path(export_folder) / "main.cpp"),
str(ROOT / "templates" / "main.jinja"),
func_name="model_forward",
inputs_name=inputs_name,
outputs_name=outputs_name,
outputs_dtype=outputs_dtype,
outputs_size=outputs_size
)
def generate_main_compare_cpp(export_folder: str, graph_view: aidge_core.GraphView, inputs_tensor=None) -> None:
"""
Generate a C++ file to manage the forward pass and compare the output of a model.
This function extracts details from the :py:class:`aidge_core.graph_view` object, including input and output node names, data types,
and tensor sizes. It uses this data to populate a C++ file template (`main.jinja`), creating a file (`main.cpp`)
that call the `model_forward` function, which handles data flow and processing for the exported model.
This function also generate files containing input tensor if they have been set.
:param export_folder: Path to the folder where the generated C++ file (`main.cpp`) will be saved.
:type export_folder: str
:param graph_view: An instance of :py:class:`aidge_core.graph_view`, providing access to nodes and
ordered input/output data within the computational graph.
:type graph_view: aidge_core.graph_view
:param inputs_tensor: **For future** argument to provide tensor to use in the main function, not implemented yet!
:type inputs_tensor: None
:raises RuntimeError: If there is an inconsistency in the output arguments (names, data types, sizes),
indicating an internal bug in the graph representation.
"""
outputs_name: list[str] = []
outputs_dtype: list[str] = []
outputs_size: list[int] = []
inputs_name: list[str] = []
gv_inputs: list[tuple[aidge_core.Node, int]] = graph_view.get_ordered_inputs()
gv_outputs: list[tuple[aidge_core.Node, int]] = graph_view.get_ordered_outputs()
for in_node, in_idx in gv_inputs:
in_node_input, in_node_input_idx = in_node.input(in_idx)
in_name = f"{in_node.name()}_input_{in_idx}" if in_node_input is None else f"{in_node_input.name()}_output_{in_node_input_idx}"
inputs_name.append(in_name)
input_tensor = in_node.get_operator().get_input(in_idx)
if input_tensor is None or input_tensor.undefined() or not input_tensor.has_impl():
if inputs_tensor is not None:
aidge_core.Log.notice("No support for inputs_tensor argument yet.")
aidge_core.Log.notice(f"No input tensor set for {in_name}, main generated will not be functionnal after code generation.")
else:
aidge_core.Log.notice(f"No input tensor set for {in_name}, main generated will not be functionnal after code generation.")
else:
aidge_core.export_utils.generate_input_file(export_folder=export_folder, array_name=in_name, tensor=input_tensor)
for out_node, out_id in gv_outputs:
out_name = f"{out_node.name()}_output_{out_id}"
outputs_name.append(out_name)
out_tensor = out_node.get_operator().get_output(out_id)
outputs_dtype.append(data_conversion.aidge2c(out_tensor.dtype()))
outputs_size.append(out_tensor.size())
if out_tensor is None or out_tensor.undefined() or not out_tensor.has_impl():
aidge_core.Log.notice(f"No input tensor set for {out_name}, main generated will not be functionnal after code generation.")
else:
aidge_core.export_utils.generate_input_file(export_folder=export_folder, array_name=out_name+"_expected", tensor=out_tensor)
if len(outputs_name) != len(outputs_dtype) or len(outputs_name) != len(outputs_size):
raise RuntimeError("FATAL: Output args list does not have the same length this is an internal bug.")
ROOT = Path(__file__).resolve().parents[0]
generate_file(
str(Path(export_folder) / "main.cpp"),
str(ROOT / "templates" / "main_compare.jinja"),
func_name="model_forward",
inputs_name=inputs_name,
outputs_name=outputs_name,
outputs_dtype=outputs_dtype,
outputs_size=outputs_size
)
This diff is collapsed.
import aidge_core
import os
import shutil
from pathlib import Path
from aidge_core.export_utils import ExportLib, generate_file, copy_file
from typing import List, Tuple
def scheduler_export(scheduler, export_folder_path: str, export_lib: ExportLib = None, memory_manager=None, memory_manager_args=None, test_mode=False) -> None:
"""Exports an aidge_core.Scheduler to C++ code.
This function generates files for a given computation graph, including forward-pass functions,
configuration headers, and the main API entry point for the exported model. It requires a
memory manager to allocate resources, and optionally an `ExportLib` instance to handle backend
configurations for node operators.
1. **Export Preparation**:
- Initializes export and DNN folders, checking that required memory management functions are defined.
- Retrieves peak memory usage and memory details for each node using the `memory_manager`.
2. **Configuration Generation**:
- Iterates over nodes scheduled by `scheduler`, configuring backends if `export_lib` is specified.
- Exports configuration headers and forward-pass actions for each node by invoking `op.export()`
and `op.forward()`, appending these to `list_configs` and `list_actions`, respectively.
- Collects information on input and output nodes, including their names, data types, and sizes.
3. **Code Generation**:
- Defines the forward-pass function, `model_forward`, with inputs and outputs based on node attributes.
- Generates the following files:
- **forward.cpp**: Implements the model forward pass using templates, applying configurations
and actions for each node.
- **forward.hpp**: Exports the forward API, defining inputs and outputs.
- **main.cpp**: Main entry file, serving as the model's forward-pass interface.
4. **Static File Export (Optional)**:
- If `export_lib` is specified, static files are copied to the export folder based on `export_lib`
specifications.
:param scheduler: Scheduler instance managing the computation graph.
Uses `graph_view` and `get_static_scheduling` methods
to retrieve the computation graph layout and ordered nodes.
:type scheduler: aidge_core.Scheduler
:param export_folder_path: Path to the folder where the generated export files will be saved.
Creates this folder, along with subdirectories for model and source files.
:type export_folder_path: str
:param export_lib: Library providing the backend implementation for node operators.
Defaults to None. If provided, each node's backend is set to the library's name.
:type export_lib: ExportLib, optional
:param memory_manager: Required function for managing memory allocation. It should take
`scheduler` and optional `memory_manager_args` as parameters, returning
`peak_mem` (peak memory usage) and `mem_info` (memory details for each node).
:type memory_manager: callable
:param memory_manager_args: Additional arguments passed to `memory_manager`. Defaults to an empty dictionary.
:type memory_manager_args: dict, optional
:param test_mode: Additional argument which may be used during forward generation.
:type test_mode: bool, optional
"""
graphview = scheduler.graph_view()
export_folder = Path().absolute() / export_folder_path
os.makedirs(str(export_folder), exist_ok=True)
dnn_folder = export_folder / "dnn"
os.makedirs(str(dnn_folder), exist_ok=True)
if memory_manager_args is None:
memory_manager_args = {}
if memory_manager is None:
raise ValueError("A memory manager is required (no default value yet).")
peak_mem, mem_info = memory_manager(
scheduler, **memory_manager_args)
# List of function call for forward.cpp
list_actions: List[str] = []
# List of headers for forward.cpp
list_configs: List[str] = []
inputs_name: List[str] = []
inputs_dtype: List[str] = []
outputs_name: List[str] = []
outputs_dtype: List[str] = []
outputs_size: List[int] = []
# List of aidge_core.Node ordered by scheduler
list_forward_nodes: List[aidge_core.Node] = scheduler.get_static_scheduling()
# If exportLib define use it
# else parse component in platform
# if export_lib is None:
# raise ValueError("Export need an ExportLib.")
for node in list_forward_nodes:
if export_lib is not None:
aidge_core.Log.debug(f"Setting backend {export_lib._name} to {node.name()}[{node.type()}].")
node.get_operator().set_backend(export_lib._name)
op_impl = node.get_operator().get_impl()
if op_impl is None:
raise RuntimeError(f"Operator {node.name()}[{node.type()}] doesn't have an implementation.")
if not isinstance(op_impl, ExportLib):
raise RuntimeError(f"Operator {node.name()}[{node.type()}] doesn't have an exportable backend ({op_impl}).")
is_input:bool = node in graphview.get_input_nodes()
is_output:bool = node in graphview.get_output_nodes()
if is_input:
flag_not_input = True
# GraphView.get_inputs_nodes() returns the nodes that have an Input set to None or not in the graph
# However, some inputs are Optional and thus the node may not be an input of the graph!
# So we need to check that at least one input of the nodes is not in the graph and not optional
# This is what the following code block is checking.
for idx, node_in in enumerate(node.inputs()):
optional:bool = node.get_operator().is_optional_input(idx)
# Note: node_in is a Tuple(Node, out_idx)
in_graph:bool = node_in[0] in graphview.get_nodes()
flag_not_input &= (in_graph or optional)
is_input = not flag_not_input
# Get operator current specs
required_specs = op_impl.get_required_spec()
# Get specs of the implementation that match current specs
specs = op_impl.get_best_match(required_specs)
# Retrieve said implementation
export_node = op_impl.get_export_node(specs)
if export_node is None:
raise RuntimeError(f"Could not find export node for {node.name()}[{node.type()}].")
# Instanciate ExportNode
op = export_node(node, mem_info[node])
# For configuration files
list_configs += op.export(dnn_folder)
# For forward file
list_actions += op.forward()
if is_input:
for idx, node_in in enumerate(node.inputs()):
if (node.get_operator().get_input(idx) is not None) and (node_in[0] not in graphview.get_nodes()):
inputs_name.append(op.attributes["in_name"][idx])
inputs_dtype.append(
op.attributes["in_cdtype"][idx]
)
if is_output:
for idx in range(len(node.outputs())):
outputs_name.append(op.attributes["out_name"][idx])
outputs_dtype.append(
op.attributes["out_cdtype"][idx]
)
outputs_size.append(op.attributes["out_size"][idx])
func_name = "model_forward"
ROOT = Path(__file__).resolve().parents[0]
forward_template = str(ROOT / "templates" / "forward.jinja")
if export_lib.forward_template != None:
forward_template = export_lib.forward_template
list_node_names = []
for node in list_forward_nodes:
if node.type() != "Producer":
list_node_names.append(node.name())
generate_file(
str(dnn_folder / "src" / "forward.cpp"),
forward_template,
func_name=func_name,
headers=set(list_configs),
actions=list_actions,
# Note: Graph may not have inputs, so we need to check with output
# In the future, we should remove this as it is not compatible
# with a mix precision approach.
mem_ctype=outputs_dtype[0], # Legacy behavior ...
mem_section=export_lib.mem_section,
peak_mem=peak_mem,
inputs_name=inputs_name,
inputs_dtype=inputs_dtype,
outputs_name=outputs_name,
outputs_dtype=outputs_dtype,
test_mode=test_mode,
list_node_names=list_node_names
)
forward_header_template = str(ROOT / "templates" / "forward_header.jinja")
if export_lib.forward_header_template != None:
forward_header_template = export_lib.forward_header_template
# Generate dnn API
generate_file(
str(dnn_folder / "include" / "forward.hpp"),
forward_header_template,
libraries=[],
func_name=func_name,
inputs_name=inputs_name,
inputs_dtype=inputs_dtype,
outputs_name=outputs_name,
outputs_dtype=outputs_dtype,
test_mode=test_mode
)
if len(outputs_name) != len(outputs_dtype) or len(outputs_name) != len(outputs_size):
raise RuntimeError("FATAL: Output args list does not have the same length this is an internal bug.")
if export_lib is not None:
# Copy all static files in the export
for source, destination in export_lib.static_files.items():
copy_file(source, str(export_folder / destination))
{#- For libraries #}
#include <stdint.h>
{# Design header of the array -#}
static const {{ data_t }} {{ name }}[{{ dims |join("*") }}] __attribute__((section("nn_data"))) = {
{{ values |join(", ") }}
};
#include <stdint.h>
#ifdef SAVE_OUTPUTS
#include <sys/types.h>
#include <sys/stat.h>
#endif
#include "include/forward.hpp"
// Layer & memory configurations
{%- for header in headers %}
#include "{{ header }}"
{%- endfor %}
// Memory block
{%- if mem_section == None %}
static {{mem_ctype}} mem[{{peak_mem}}];
{%- else %}
static {{mem_ctype}} mem[{{peak_mem}}] __attribute__((section("{{ mem_section }}")));
{%- endif %}
{# Forward function #}
{#- Support multiple inputs with different datatypes and multiple outputs with different datatypes -#}
void {{ func_name }} (
{%- for i in range(inputs_name | length) -%}
const {{ inputs_dtype[i] }}* {{ inputs_name[i] }},
{%- endfor -%}
{%- for o in range(outputs_name | length) -%}
{{ outputs_dtype[o] }}** {{ outputs_name[o] }}_ptr{% if not loop.last %}, {% endif %}
{%- endfor -%})
{
{%- for action in actions %}
{{ action }}
{%- endfor %}
{%- for output_name in outputs_name %}
*{{ output_name }}_ptr = {{ output_name }};
{%- endfor %}
}
#ifndef DNN_HPP
#define DNN_HPP
#ifdef __cplusplus
extern "C" {
#endif
{#- For libraries #}
{% for lib in libraries %}
#include <{{ lib }}>
{%- endfor %}
void {{ func_name }} (
{%- for i in range(inputs_name | length) -%}
const {{ inputs_dtype[i] }}* {{ inputs_name[i] }},
{%- endfor -%}
{%- for o in range(outputs_name | length) %}
{{ outputs_dtype[o] }}** {{ outputs_name[o] }}{% if not loop.last %}, {% endif %}
{%- endfor -%});
#ifdef __cplusplus
}
#endif
#endif /* DNN_HPP */
#include <iostream>
#include "forward.hpp"
{% for name in inputs_name %}
#include "{{ name }}.h"
{% endfor %}
{% set printf_formats = {
"double": "%lf",
"float": "%f",
"int8_t": "%hhd",
"int16_t": "%hd",
"int32_t": "%d",
"int64_t": "%lld",
"uint8_t": "%hhu",
"uint16_t": "%hu",
"uint32_t": "%u",
"uint64_t": "%llu"
} %}
int main()
{
// Initialize the output arrays
{%- for o in range(outputs_name | length) %}
{{ outputs_dtype[o] }}* {{ outputs_name[o] }} = nullptr;
{% endfor %}
// Call the forward function
{{ func_name }}({{ inputs_name|join(", ") }}{% if inputs_name %}, {% endif %}&{{ outputs_name|join(", &") }});
// Print the results of each output
{%- for o in range(outputs_name | length) %}
printf("{{ outputs_name[o] }}:\n");
for (int o = 0; o < {{ outputs_size[o] }}; ++o) {
printf("{{ printf_formats[outputs_dtype[o]] }} ", {{ outputs_name[o] }}[o]);
}
printf("\n");
{% endfor %}
return 0;
}
{% set printf_formats = {
"double": "%lf",
"float": "%f",
"int8_t": "%hhd",
"int16_t": "%hd",
"int32_t": "%d",
"int64_t": "%lld",
"uint8_t": "%hhu",
"uint16_t": "%hu",
"uint32_t": "%u",
"uint64_t": "%llu"
} %}
#include <cstdio> // printf
#include <cmath> // std::abs
#include "forward.hpp" // Exported forward
// Inputs
{% for name in inputs_name %}
#include "{{ name }}.h"
{% endfor %}
// Outputs
{% for name in outputs_name %}
#include "{{ name }}_expected.h"
{% endfor %}
int main()
{
// Initialize the output arrays
{%- for o in range(outputs_name | length) %}
{{ outputs_dtype[o] }}* {{ outputs_name[o] }} = nullptr;
{% endfor %}
const float abs_err = 0.001f;
const float rel_err = 0.0001f;
// Call the forward function
{{ func_name }}({{ inputs_name|join(", ") }}{% if inputs_name %}, {% endif %}&{{ outputs_name|join(", &") }});
int nb_identical;
int nb_out;
// Print the results of each output
{%- for o in range(outputs_name | length) %}
nb_identical = 0;
nb_out = 0;
printf("{{ outputs_name[o] }}:\n");
for (int o = 0; o < {{ outputs_size[o] }}; ++o) {
printf("Expected {{ printf_formats[outputs_dtype[o]] }} <-> Predicted {{ printf_formats[outputs_dtype[o]] }}\n", {{ outputs_name[o] }}_expected[o], {{ outputs_name[o] }}[o]);
if (std::abs({{ outputs_name[o] }}_expected[o] - {{ outputs_name[o] }}[o]) <= abs_err + rel_err * std::abs({{ outputs_name[o] }}_expected[o]))
nb_identical++;
nb_out++;
}
printf("\nNumber of equal outputs: %d / %d\n", nb_identical, nb_out);
{% endfor %}
return 0;
}
import os
from aidge_core.export_utils.code_generation import generate_file
from aidge_core.export_utils.data_conversion import aidge2c
from aidge_core import Tensor
from pathlib import Path
def tensor_to_c(tensor:Tensor)->str:
"""Given a :py:class:``aigd_core.Tensor``, return a C description of the tensor.
For example:
{
{1, 2},
{3, 4}
}
:param tensor: Tensor to transform to a string
:type tensor: Tensor
:return: String representation of a C array
:rtype: str
"""
return str(tensor)
def generate_input_file(export_folder:str,
array_name:str,
tensor:Tensor):
# If directory doesn't exist, create it
if not os.path.exists(export_folder):
os.makedirs(export_folder)
print(f"gen : {export_folder}/{array_name}.h")
ROOT = Path(__file__).resolve().parents[0]
generate_file(
file_path=f"{export_folder}/{array_name}.h",
template_path=str(ROOT / "templates" / "c_data.jinja"),
dims = tensor.dims(),
data_t = aidge2c(tensor.dtype()),
name = array_name,
values = list(tensor)
)
import os
import subprocess
import shutil
from pathlib import Path
import aidge_core
from typing import Tuple, List
import matplotlib.pyplot as plt
import aidge_core.mem_info
import numpy as np
# Default memory management, which can be used for development
def compute_default_mem_info(scheduler: aidge_core.Scheduler) -> Tuple[int, List]:
"""Basic memory management concatenate memory block, no memory reuse !
:param scheduler: Aidge scheduler
:type scheduler: :py:class:`aidge_core.Scheduler`
:return: The total memory size (in number of elements) and a list (of size nb node) of list (of size nb output) of dictionnary (size, offset)
:rtype: Tuple[int, list]
"""
mem_info = {}
mem_size = 0
# Exclude Producers and the last layers (because the results are stored outside the export)
for i, node in enumerate(scheduler.get_static_scheduling()):
if node.type() != "Producer":
node_mem_info = []
for out_id in range(node.get_nb_outputs()):
dims = node.get_operator().get_output(out_id).dims()
mem = 1
for dim in dims:
mem *= dim
# Add memeory info
node_mem_info.append({
"size": mem,
"offset": mem_size
})
# Increment offset for the next layer
mem_size += mem
mem_info[node] = node_mem_info
else:
mem_info[node] = [] # No meminfo for producer
return mem_size, mem_info
def log_meminfo(mem_manager:aidge_core.MemoryManager, path: Path, diplay_names:bool):
"""Generate a graph representing the memory allocation of each ouputs.
Block with the smae color correspond to the same memory plane.
:param mem_manager: Memory manager to log
:type mem_manager: aidge_core.memory_manager
:param path: Path where to save the figure
:type path: Path
:param diplay_names: If True Node names are diplayed alongside their block
:type diplay_names: bool
"""
max_lifetime = mem_manager.get_max_lifetime()
# peak_usage in kwords
peak_usage = mem_manager.get_peak_usage() / 1024
# Set figure size 1920x1080 px
plt.figure(figsize=(19.20, 10.80))
# Same color for each planes
colors = plt.cm.viridis(np.linspace(0, 1, len(mem_manager.get_planes()) + 1))
color_id = 1
for node, planes in mem_manager.get_planes().items():
for plane in planes:
cont_offset = plane.get_contiguous_offset()
cont_size = plane.get_contiguous_size()
allocated = plane.mem_space.allocated
released = plane.mem_space.released
is_released = released >= 0 and not plane.mem_space.dependencies
x_start = allocated
y_start = cont_offset / 1024.0
y_end = (cont_offset + cont_size) / 1024.0
x_end = max_lifetime if not is_released else released
plt.fill_betweenx(
[y_start, y_end],
x_start,
x_end + 1,
color=colors[color_id % len(colors)]
)
if diplay_names:
# Rotation for lisibility!
plt.text(x_end,y_end, node.name(), rotation=45)
color_id += 1
plt.xlim(0, max_lifetime + 1)
plt.ylim(0, peak_usage)
plt.axhline(y=peak_usage, color='red', linestyle='--')
plt.text(0, peak_usage, f'Peak usage = {peak_usage} KWords', color='red')
plt.xlabel("Time")
plt.ylabel("Memory usage (KWords)")
plt.title("Memory Usage Over Time")
plt.grid(True)
ax = plt.gca()
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
folder_path = path.parent
folder_path.mkdir(parents=True, exist_ok=True)
plt.savefig(path)
plt.close()
aidge_core.Log.notice(f"Generated memory management info at: {path}")
def generate_optimized_memory_info(scheduler: aidge_core.Scheduler, stats_folder: Path = None, wrapping: bool = False, auto_concat: bool = False, display_names: bool=True) -> Tuple[int, List[dict]]:
"""Generates optimized memory information for a computation graph managed by a scheduler.
This function analyzes the memory usage of a computation graph, determining the memory peak
and detailed memory management information for each node in the scheduler. It supports optional
wrapping of memory buffers and logs memory usage statistics to facilitate memory optimization.
:param scheduler: Scheduler instance that organizes the computation graph. It manages the
nodes and facilitates memory planning by invoking its `generate_memory` method.
:type scheduler: aidge_core.Scheduler
:param stats_folder: Directory path to store memory statistics and plots generated by `mem_manager`.
If provided as a string, it is converted to a `Path` object, default=None.
:type stats_folder: Path, optional
:param wrapping: Boolean flag to enable or disable wrap-around buffer optimization.
Defaults to `False`.
:type wrapping: bool, optional
:param auto_concat: Boolean flag to enable or disable auto-concatenation optimization.
Defaults to `False`.
:type auto_concat: bool, optional
:param diplay_names: If True Node names are diplayed in the memory plot alongside their block, defaults to False
:type diplay_names: bool, optional
:return: A tuple containing the peak memory size and a list of memory information for each
scheduled node. The memory information for each node includes details such as size,
offset, stride, length, count, and optional wrap-around details.
:rtype: Tuple[int, List[dict]]
"""
# The forward dims has to done outside the function
# Also supposed the generation of the scheduler has been performed outside
# Otherwise decomment the following line
# scheduler.generate_scheduling()
# Generate the memory manager
# So far, the Producers are not take in consideration in the meory manager => inc_producers=False
if auto_concat:
mem_manager = scheduler.generate_memory_auto_concat(
inc_producers=False, wrap_around_buffer=wrapping)
else:
mem_manager = scheduler.generate_memory(
inc_producers=False, wrap_around_buffer=wrapping)
# List of nodes which are connected at the input of the graph (None if input is not connected)
nodes_at_input = [n[0] for n in scheduler.graph_view().inputs()]
if stats_folder is not None:
log_meminfo(mem_manager, Path(stats_folder) / "memory_info.png", display_names)
# In the export, we currently use an unified memory buffer whose size
# is determined by the memory peak usage
mem_size = mem_manager.get_peak_usage()
mem_info = {}
mem_planes = mem_manager.get_planes()
for node in scheduler.get_static_scheduling():
node_mem_info = []
if node.type() == "Producer":
pass
elif node in nodes_at_input:
# Input memory management (suppose tensor ends with [:, channel, height, width]))
tensor = node.get_operator().get_output(0)
if tensor is None:
raise RuntimeError("Warning input producer not provided")
if len(tensor.dims()) < 3:
raise RuntimeError(
f"Input producer dimensions must be with [:, channel, height, width] but got {tensor.dims()} instead")
# TODO : use get_chan get_height and get_width function !
node_mem_info.append({
"size": tensor.dims()[-3], # Should be nb_channels
"offset": 0, # Suppose input data is stored outside the export function
# so the memory offset is not important to consider
"stride": tensor.dims()[-3], # Should be nb_channels
"length": tensor.dims()[-1], # Should be width
"count": tensor.dims()[-2], # Should be height
"cont_offset": 0, # Suppose input data is stored outside the export function
# so the memory offset is not important to consider
"cont_size": tensor.dims()[-1] * \
tensor.dims()[-2] * \
tensor.dims()[-3], # Size of input
"wrap_offset": 0, # No wrapping
"wrap_size": 0 # No wrapping
})
else:
for out_id in range(node.get_nb_outputs()):
plane = mem_planes[node][out_id]
node_mem_info.append({
"size": plane.size,
"offset": plane.get_contiguous_offset(),
"stride": plane.stride,
"length": plane.length,
"count": plane.count,
"cont_offset": plane.get_contiguous_offset(),
"cont_size": plane.get_contiguous_size(),
"wrap_offset": plane.get_wrapped_offset(),
"wrap_size": plane.get_wrapped_size()
})
mem_info[node] = node_mem_info
return mem_size, mem_info
import os
import json
import builtins
import aidge_core
import numpy as np
from pathlib import Path
from typing import Any, Dict, List, Optional
def _retrieve_operator_attrs(node : aidge_core.Node) -> Dict[str, Optional[Any]]:
"""
Returns the dictionary containing the attributes of a given Node.
:param graph: A Node in the list of ordered nodes.
:type graph: aidge_core.Node
:return: A dictionary with the Node's attributes.
:rtype: Dict[str, Optional[Any]]
"""
if node.get_operator().attr is not None:
node_attr_dict = node.get_operator().attr.dict()
for key,value in node_attr_dict.items():
if not type(value).__name__ in dir(builtins):
node_attr_dict[key] = value.name
else:
node_attr_dict = {}
return node_attr_dict
def _create_dict(ordered_nodes : List[aidge_core.Node], write_trainable_params_embed : bool, write_trainable_params_ext : bool, path_trainable_params : Path, params_file_format : str) -> Dict[str, Optional[Any]]:
"""
Creates a dictionary to store the information of a given ordered GraphView.
:param ordered_nodes: A list with the GraphView's ordered nodes.
:type graph: list
:param write_trainable_params_embed: Whether or not to write the eventual trainable parameters of the Nodes in the same file as the dict (embed).
:type write_trainable_params_embed: bool
:param write_trainable_params_ext: Whether or not to write the eventual trainable parameters of the Nodes in an external file.
:type write_trainable_params_ext: bool
:param path_trainable_params: Path of the external file used to store the Nodes' trainable parameters.
:type path_trainable_params: Path
:param params_file_format: Format of the external file used to store the Nodes' trainable parameters. Options: ``npz`` or ``json``. Default : ``json``. Requires ``write_trainable_params_ext``.
:type params_file_format: str
:return: A dictionary with the GraphView description.
:rtype: Dict[str, Optional[Any]]
"""
graphview_dict = {'graph': []}
for node in ordered_nodes:
if node is not None:
node_dict = {'name' : node.name(),
'optype' : node.get_operator().type(),
'nb_inputs' : node.get_operator().nb_inputs(),
'nb_outputs' : node.get_operator().nb_outputs()}
inputs = []
if node.get_operator().nb_inputs() > 0:
for input_idx in range(node.get_operator().nb_inputs()):
if node.get_operator().get_input(input_idx) is not None:
input_dict = {'dims' : node.get_operator().get_input(input_idx).dims(),
'data_type' : str(node.get_operator().get_input(input_idx).dtype()),
'data_format' : str(node.get_operator().get_input(input_idx).dformat())}
elif node.get_operator().get_input(input_idx) is None:
input_dict = {'dims' : None,
'data_type' : None,
'data_format' : None}
inputs.append(input_dict)
node_dict['inputs'] = inputs
outputs = []
if node.get_operator().nb_outputs() > 0:
for output_idx in range(node.get_operator().nb_outputs()):
if node.get_operator().get_output(output_idx) is not None:
output_dict = {'dims' : node.get_operator().get_output(output_idx).dims(),
'data_type' : str(node.get_operator().get_output(output_idx).dtype()),
'data_format' : str(node.get_operator().get_output(output_idx).dformat())}
elif node.get_operator().get_output(output_idx) is None:
output_dict = {'dims' : None,
'data_type' : None,
'data_format' : None}
outputs.append(output_dict)
node_dict['outputs'] = outputs
parents = node.get_parents()
if None in parents:
if parents[0] is None: parents.append(parents.pop(0))
else:
pass
parents_inputs = []
input_idx = 0
for parent in node.get_parents():
if parent is not None:
for children in parent.outputs():
for child in children:
if child[0] == node and child[1] == input_idx:
parents_inputs.append((parent.name(), input_idx))
elif parent is None:
if input_idx not in [item[1] for item in parents_inputs]:
parents_inputs.append((None, input_idx))
input_idx += 1
node_dict['parents'] = parents_inputs
children_outputs = []
output_idx = 0
for children in node.get_ordered_children():
for child in children:
if child is not None:
for parent in child.inputs():
if parent[0] == node and parent[1] == output_idx:
children_outputs.append((child.name(), output_idx))
output_idx += 1
node_dict['children'] = children_outputs
# Check if my node is a metaop
attributes_dict = {}
if isinstance(node.get_operator(), aidge_core.MetaOperatorOp):
attributes_dict['micro_graph'] = []
for micro_node in node.get_operator().get_micro_graph().get_nodes():
micro_node_dict = {'name' : micro_node.name(),
'optype' : micro_node.type()}
micro_node_attr_dict = _retrieve_operator_attrs(micro_node)
micro_node_dict['attributes'] = micro_node_attr_dict
attributes_dict['micro_graph'].append(micro_node_dict)
else:
node_attr_dict = _retrieve_operator_attrs(node)
attributes_dict.update(node_attr_dict)
node_dict['attributes'] = attributes_dict
if node.type() == 'Producer':
if write_trainable_params_ext:
params_file_format.casefold()
if params_file_format=='npz':
np.savez_compressed(Path(path_trainable_params, node.name()), **{node.name() : node.get_operator().get_output(0)})
node_dict['tensor_data'] = str(Path(path_trainable_params, node.name() + '.npz'))
elif params_file_format=='json':
tensor = np.array(node.get_operator().get_output(0))
tensor_dict = {
node.name() :
{
'dims' : tensor.shape,
'data_type' : str(tensor.dtype),
'tensor_data' : tensor.tolist()
}
}
with open(Path(path_trainable_params, node.name() + '.json'), 'w') as fp:
json.dump(tensor_dict, fp, indent=4)
node_dict['tensor_data'] = str(Path(path_trainable_params, node.name() + '.json'))
else:
raise Exception("File format to write trainable parameters not recognized.")
if write_trainable_params_embed:
node_dict['tensor_data'] = np.array(node.get_operator().get_output(0)).tolist()
else:
pass
graphview_dict['graph'].append(node_dict)
else: # node is None
pass
return graphview_dict
def _write_dict_json(graphview_dict : Dict[str, Optional[Any]], json_path : str) -> None:
"""
Writes dictionary containing GraphView description to a JSON file.
:param graphview_dict: A dictionary with the GraphView description.
:type graphview_dict: dict[str, int, float, bool, None]
:param json_path: Path to write JSON file.
:type json_path: str
"""
with open(json_path, 'w') as fp:
json.dump(graphview_dict, fp, indent=4)
return None
def gview_to_json(gview : aidge_core.GraphView, json_path : Path, write_trainable_params_embed : bool = False, write_trainable_params_ext : bool = False, params_file_format : str = 'json') -> None:
"""
Generates the description for a GraphView in the JSON format.
:param graph: A GraphView of Aidge.
:type graph: aidge_core.GraphView
:param json_path: Path to write JSON file.
:type json_path: Path
:param write_trainable_params_embed: Whether or not to write the eventual trainable parameters of the Nodes in the same file as the dict (embed).
:type write_trainable_params_embed: bool, optional
:param write_trainable_params_ext: Whether or not to write the eventual trainable parameters of the Nodes in an external file.
:type write_trainable_params_ext: bool, optional
:param params_file_format: Format of the external file used to store the Nodes' trainable parameters. Options: ``npz`` or ``json``. Default : ``json``. Requires ``write_trainable_params_ext``.
:type params_file_format: str, optional
"""
json_path = Path(json_path)
if not json_path.suffix:
if not json_path.is_dir():
json_path.mkdir(parents=True, exist_ok=True)
json_path = json_path.joinpath('model.json')
else:
if json_path.suffix != '.json':
raise Exception('If ``json_path`` contains a filename, it must be of JSON format.')
if not json_path.parent.is_dir():
json_path.parent.mkdir(parents=True, exist_ok=True)
if write_trainable_params_ext:
path_trainable_params = (json_path.parent).joinpath(json_path.stem + '_trainable_params/')
path_trainable_params.mkdir(parents=True, exist_ok=True)
else:
path_trainable_params = Path()
if isinstance(gview, aidge_core.GraphView):
# Sort GraphView in topological order
ordered_nodes = gview.get_ordered_nodes()
# Create dict from GraphView
graphview_dict = _create_dict(ordered_nodes, write_trainable_params_embed, write_trainable_params_ext, path_trainable_params, params_file_format)
# Write dict to JSON
_write_dict_json(graphview_dict, json_path)
else:
raise Exception("Graph must be an instance of aidge_core.GraphView.")
return None
\ No newline at end of file
import numpy as np
import aidge_core
def simplify_graph(graph: aidge_core.GraphView):
"""
Simplify a graph loaded from ONNX.
:param graph: The GraphView to simplify.
:type graph: aidge_core.GraphView
"""
def check_constant_producer(value):
def _check_constant_producer(node):
out = node.get_operator().get_output(0)
return (len(out) == 1 and np.isclose(out[0], value))
return _check_constant_producer
gm = aidge_core.SinglePassGraphMatching(graph)
gm.add_node_lambda("Constant_sqrt2", check_constant_producer(np.sqrt(2)))
gm.add_node_lambda("Constant_1", check_constant_producer(1))
gm.add_node_lambda("Constant_0_5", check_constant_producer(0.5))
# Linear [from PyTorch ONNX]
aidge_core.fuse_to_metaops(gm, "MatMul-*>Add", "Linear")
# LayerNorm [from PyTorch ONNX]
aidge_core.fuse_to_metaops(gm, "ReduceMean-*>Sub#1~>(Pow#1->ReduceMean-*>Add#1->Sqrt)-*>Div#1-*>Mul#1-*>Add#2;"
"Sub#1~*>Div#1;"
"Pow#1<1~Producer;"
"Add#1<*~Producer;"
"Mul#1<*~Producer;"
"Add#2<*~Producer;"
"Sub#1~>$", "LayerNorm")
# ScaledDotProductAttention [from PyTorch ONNX]
aidge_core.fuse_to_metaops(gm, "MatMul->Div#1->Softmax-*>MatMul;"
"Div#1<1~Producer", "ScaledDotProductAttention")
# MultiHeadAttention [from PyTorch ONNX]
aidge_core.fuse_to_metaops(gm, "ScaledDotProductAttention#1->Transpose->Reshape#1->Linear;"
"Reshape#1<1~Producer;"
"ScaledDotProductAttention#1<0-(Transpose<-Reshape#2<-Add#1);"
"ScaledDotProductAttention#1<1-(Transpose<-Reshape#3<-Add#2);"
"ScaledDotProductAttention#1<2-(Transpose<-Reshape#4<-Add#3);"
"Reshape#2<1~Producer;"
"Add#1<*-0-Split#1;"
"Add#2<*-1-Split#1;"
"Add#3<*-2-Split#1;"
"Split#1<-MatMul;"
"Split#1<1~Producer", "MultiHeadAttention")
# GeLU [from PyTorch ONNX]
aidge_core.fuse_to_metaops(gm, "Div#1->Erf->Add#1-*>Mul->Mul#2;"
"Div#1<1~Producer[Constant_sqrt2];"
"Add#1<*~Producer[Constant_1];"
"Mul#2<*~Producer[Constant_0_5]", "GeLU")
import matplotlib
import matplotlib.pyplot as plt
from functools import partial
import numpy as np
import aidge_core
class StaticAnalysisExt(aidge_core.StaticAnalysis):
def log_nb_params(self, filename, title=None, log_scale=False):
namePtrTable = self.get_graph().get_ranked_nodes_name("{0} ({1}#{3})");
nodes = self.get_graph().get_ordered_nodes()
series = []
legend = None
for node in nodes:
if node.type() == "Producer":
continue
name = namePtrTable[node]
series.append([name, self.get_nb_params(node)])
if title is None: title = "log_nb_params"
if filename is not None:
self._log_bar(series, filename, title, legend, log_scale)
return series
def log_params_size(self, filename, title=None, log_scale=False):
namePtrTable = self.get_graph().get_ranked_nodes_name("{0} ({1}#{3})");
nodes = self.get_graph().get_ordered_nodes()
series = []
legend = None
for node in nodes:
if node.type() == "Producer":
continue
name = namePtrTable[node]
series.append([name, self.log_params_size(node)])
if title is None: title = "log_params_size"
if filename is not None:
self._log_bar(series, filename, title, legend, log_scale)
return series
def log_nb_arithm_ops(self, filename, title=None, log_scale=False):
return self._log_callback(aidge_core.OperatorStats.get_nb_arithm_ops, filename, title, log_scale)
def log_nb_logic_ops(self, filename, title=None, log_scale=False):
return self._log_callback(aidge_core.OperatorStats.get_nb_logic_ops, filename, title, log_scale)
def log_nb_comp_ops(self, filename, title=None, log_scale=False):
return self._log_callback(aidge_core.OperatorStats.get_nb_comp_ops, filename, title, log_scale)
def log_nb_nl_ops(self, filename, title=None, log_scale=False):
return self._log_callback(aidge_core.OperatorStats.get_nb_nl_ops, filename, title, log_scale)
def log_nb_mac_ops(self, filename, title=None, log_scale=False):
return self._log_callback(aidge_core.OperatorStats.get_nb_mac_ops, filename, title, log_scale)
def log_nb_ops(self, filename, title=None, log_scale=False):
return self._log_callback(aidge_core.OperatorStats.get_nb_ops, filename, title, log_scale)
def log_nb_arithm_int_ops(self, filename, title=None, log_scale=False):
return self._log_callback(aidge_core.OperatorStats.get_nb_arithm_int_ops, filename, title, log_scale)
def log_nb_arithm_fp_ops(self, filename, title=None, log_scale=False):
return self._log_callback(aidge_core.OperatorStats.get_nb_arithm_fp_ops, filename, title, log_scale)
def log_nb_ops_by_type(self, filename, title=None, log_scale=False):
return self._log_callback([aidge_core.OperatorStats.get_nb_arithm_int_ops,
aidge_core.OperatorStats.get_nb_arithm_fp_ops,
aidge_core.OperatorStats.get_nb_logic_ops,
aidge_core.OperatorStats.get_nb_comp_ops,
aidge_core.OperatorStats.get_nb_nl_ops], filename, title, log_scale)
def _log_callback(self, callback, filename, title=None, log_scale=False):
"""
Log a statistic given by an OperatorStats callback member function.
Usage:
stats = StaticAnalysisExt(model)
stats.log_callback(aidge_core.OperatorStats.get_nb_params, "stats.png", "Nb params per operator")
:param func: OperatorStats member function to call.
:param filename: Output graph file name.
:type filename: str
:param title: Title of the graph.
:type title: str
"""
namePtrTable = self.get_graph().get_ranked_nodes_name("{0} ({1}#{3})");
nodes = self.get_graph().get_ordered_nodes()
series = []
legend = None
for node in nodes:
if node.type() == "Producer":
continue
stats = self.get_op_stats(node)
name = namePtrTable[node]
attr = {}
if type(node.get_operator()) is aidge_core.GenericOperatorOp:
# Display Generic Op in orange
attr = {'color': 'orange'}
elif not node.get_operator().is_atomic():
# Display Meta Op in bold
attr = {'fontweight': 'bold'}
elif node.type() not in aidge_core.get_keys_OperatorStats():
# Display unsupported operator in red labels
attr = {'color': 'red'}
if attr:
name = (name, attr)
if isinstance(callback, list):
series.append([name, [partial(cb, stats)() for cb in callback]])
legend = [cb.__name__ for cb in callback]
if title is None: title = str(legend)
else:
series.append([name, partial(callback, stats)()])
if title is None: title = callback.__name__
if title is None: title = str(callback)
if filename is not None:
self._log_bar(series, filename, title, legend, log_scale)
return series
def _log_bar(self, series, filename, title=None, legend=None, log_scale=False):
names, values = zip(*series)
names_only = [item[0] if isinstance(item, tuple) else item for item in names]
fig, ax = plt.subplots(figsize=(max(5, len(names)/4), 5))
plt.xlim(-0.5, len(names) - 0.5)
if isinstance(values[0], list):
series = [list(i) for i in zip(*values)]
bot = np.zeros(len(series[0]))
for i, serie in enumerate(series):
plt.bar(names_only, serie, bottom=bot)
bot += serie
else:
plt.bar(names_only, values)
if callable(getattr(ax.yaxis, 'minorticks_on', None)):
ax.yaxis.minorticks_on() # introduced in matplotlib 3.9.x
plt.grid(axis='y', which='major', linestyle='--', color='gray')
plt.grid(axis='y', which='minor', linestyle=':', color='lightgray')
formatter0 = matplotlib.ticker.EngFormatter(unit='')
ax.yaxis.set_major_formatter(formatter0)
plt.gca().set_axisbelow(True)
labels = plt.gca().get_xticks()
tick_labels = plt.gca().get_xticklabels()
for i, label in enumerate(labels):
if isinstance(names[i], tuple):
if 'color' in names[i][1]:
tick_labels[i].set_color(names[i][1]['color'])
elif 'fontweight' in names[i][1]:
tick_labels[i].set_fontweight(names[i][1]['fontweight'])
plt.xticks(rotation='vertical')
if log_scale: plt.yscale('log')
if title is not None: plt.title(title)
if legend is not None: plt.legend(legend)
plt.savefig(filename, bbox_inches='tight')
def _log_barh(self, series, filename, title=None, legend=None, log_scale=False):
names, values = zip(*series)
names_only = [item[0] if isinstance(item, tuple) else item for item in names]
fig, ax = plt.subplots(figsize=(10, max(5, len(names)/4)))
plt.ylim(-0.5, len(names) - 0.5)
if isinstance(values[0], list):
series = [list(i) for i in zip(*values)]
left = np.zeros(len(series[0]))
for i, serie in enumerate(series):
plt.barh(names_only, serie, left=left)
left += serie
else:
plt.barh(names_only, values)
if callable(getattr(ax.xaxis, 'minorticks_on', None)):
ax.xaxis.minorticks_on() # introduced in matplotlib 3.9.x
plt.grid(axis='x', which='major', linestyle='--', color='gray')
plt.grid(axis='x', which='minor', linestyle=':', color='lightgray')
formatter0 = matplotlib.ticker.EngFormatter(unit='')
ax.xaxis.set_major_formatter(formatter0)
plt.gca().set_axisbelow(True)
plt.gca().xaxis.set_label_position('top')
plt.gca().xaxis.tick_top()
labels = plt.gca().get_yticks()
tick_labels = plt.gca().get_yticklabels()
for i, label in enumerate(labels):
if isinstance(names[i], tuple):
if 'color' in names[i][1]:
tick_labels[i].set_color(names[i][1]['color'])
elif 'fontweight' in names[i][1]:
tick_labels[i].set_fontweight(names[i][1]['fontweight'])
if log_scale: plt.xscale('log')
if title is not None: plt.title(title)
if legend is not None: plt.legend(legend)
plt.savefig(filename, bbox_inches='tight')
#
# Do not add there auto import of submodules.
#
# The testing module contains utils and other tools
# related to tests, possibly reusable by other aidge
# components unit_tests.
#
# Import a specific module explicitly with for instance:
# import aidge_core.testing.utils
# or
# from aidge_core.testing.utils import (....,)
#
#
# Should provide some general utility functions for testing.
# For instance:
# - filesystem
# - os dependencies
# - unit tests setup
#
from .tree_cache import tree_update_from_cache
from .tree_utils import tree_move, tree_remove