Skip to content
Snippets Groups Projects
Commit 56506f56 authored by Cyril Moineau's avatar Cyril Moineau
Browse files

Merge branch 'allowNoInputProducer' into 'dev'

Export refactor

See merge request eclipse/aidge/aidge_export_arm_cortexm!11
parents 6f3ee49b 10868393
No related branches found
No related tags found
No related merge requests found
Showing
with 354 additions and 2476 deletions
import re
import os
import shutil
from pathlib import Path
import numpy as np
from aidge_core.export_utils.data_conversion import aidge2c
from aidge_core.export_utils.code_generation import *
from aidge_export_arm_cortexm.utils import (ROOT, AVAILABLE_BOARDS, has_board, \
OPERATORS_REGISTRY, supported_operators)
import aidge_export_arm_cortexm.operators
from aidge_export_arm_cortexm.utils.scheduler import topological_sort
from aidge_export_arm_cortexm.utils.generation import get_functions_from_c_file, get_functions_from_c_folder, get_filenames_from_folder
from aidge_export_arm_cortexm.utils.converter import *
from aidge_export_arm_cortexm.memory import *
from aidge_export_arm_cortexm.utils import (ROOT, AVAILABLE_BOARDS, has_board)
from aidge_export_arm_cortexm.export_registry import ExportLibAidgeARM
# from aidge_export_arm_cortexm.utils.converter import numpy_dtype2ctype
from aidge_core.mem_info import compute_default_mem_info, generate_optimized_memory_info
from aidge_core.export_utils import scheduler_export
BOARD_PATH : str = ROOT / "boards"
BOARDS_MAP: dict[str, Path] = {
"stm32h7" : BOARD_PATH / "stm32" / "H7",
}
def export(export_folder_name,
graphview,
scheduler = None,
board:str ="stm32h7",
library:str = "aidge",
mem_wrapping = False):
# Create export directory
export_folder = Path().absolute() / export_folder_name
os.makedirs(str(export_folder), exist_ok=True)
# Create dnn directory
dnn_folder = export_folder / "dnn"
os.makedirs(str(dnn_folder), exist_ok=True)
# Determine which board the user wants
# to select correct config
if has_board(board):
board_path = AVAILABLE_BOARDS[board]
else:
raise ValueError(f"{board} not found in the package. Please among those boards: {list(AVAILABLE_BOARDS.keys())}")
# Copy all static files in the export
shutil.copytree(board_path, str(export_folder), dirs_exist_ok=True)
# For N2D2 library, copy static folder to export/include
if library == "n2d2":
dnn_include_folder = dnn_folder / "include"
os.makedirs(str(dnn_include_folder), exist_ok=True)
shutil.copytree(str(ROOT / "_N2D2" / "static"), str(dnn_include_folder), dirs_exist_ok=True)
# Create statistics directory
stats_folder = export_folder / "statistics"
os.makedirs(str(stats_folder), exist_ok=True)
# Sort layers according to a scheduler
if not isinstance(scheduler, aidge_core.Scheduler):
# No scheduler provided by the user, use the default one
list_forward_nodes = topological_sort(graphview)
mem_size, mem_info = compute_default_mem_info(list_forward_nodes)
else:
list_forward_nodes = scheduler.get_static_scheduling()
mem_size, mem_info = generate_optimized_memory_info(stats_folder, scheduler, mem_wrapping)
# Set some lists of elements for generating forward file
list_actions = []
list_configs = []
# Export layer configurations
for node in list_forward_nodes:
if node.type() == "Producer":
# We do not treat Producer here but i the nodes which use them
continue
if node.type() in supported_operators():
op = OPERATORS_REGISTRY[node.type()](node, board, library)
# Export the configuration
list_configs = op.export(dnn_folder, list_configs)
# Add forward kernel
list_actions = op.forward(list_actions)
else:
print(f"Warning: {node.type()} is not supported in the export.\nPlease add the implementation.")
# Generate the memory file
generate_file(
str(dnn_folder / "memory" / "mem_info.h"),
str(ROOT / "templates" / "memory" / "mem_info.jinja"),
mem_size = mem_size,
mem_info_legends = MEMORY_INFO_TEMPLATE,
mem_info = mem_info,
mem_alignment = 1 # Fixed memory alignement so far, feel free to adapt it
scheduler_export(
scheduler,
export_folder_name,
ExportLibAidgeARM,
memory_manager=generate_optimized_memory_info,
memory_manager_args={"stats_folder": f"{export_folder_name}/stats", "wrapping": mem_wrapping }
)
list_configs.append("memory/mem_info.h")
# Get entry nodes
# It supposes the entry nodes are producers with constant=false
# Store the datatype & name
list_inputs_name = []
first_element_added = False
for node in graphview.get_nodes():
if node.type() == "Producer":
if not first_element_added:
export_type = aidge2c(node.get_operator().get_output(0).dtype())
list_inputs_name.append((export_type, node.name()))
first_element_added = True
if not node.get_operator().attr.constant:
export_type = aidge2c(node.get_operator().get_output(0).dtype())
list_inputs_name.append((export_type, node.name()))
gen_board_files(export_folder_name, board)
# Get output nodes
# Store the datatype & name, like entry nodes
list_outputs_name = []
for node in graphview.get_nodes():
if len(node.get_children()) == 0:
if node.get_operator().attr.has_attr('dtype'):
# Temporary fix because impossible to set DataType of a generic operator
export_type = aidge2c(node.get_operator().attr.dtype)
else:
export_type = aidge2c(node.get_operator().get_output(0).dtype())
def supported_boards() -> list[str]:
return BOARDS_MAP.keys()
list_outputs_name.append((export_type, node.name()))
if library == "n2d2":
forward_file = "forward.cpp"
else:
forward_file = "forward.c"
# Generate forward file
generate_file(
str(dnn_folder / "src" / forward_file),
str(ROOT / "templates" / "network" / "network_forward.jinja"),
headers=set(list_configs),
actions=list_actions,
inputs= list_inputs_name,
outputs=list_outputs_name
)
def gen_board_files(path:str, board:str)->None:
if board not in supported_boards():
raise ValueError(f"Board {board} is not supported, supported board are:\n\t-{'\n\t-'.join(supported_boards())}")
# Generate dnn internal API
if library == "aidge":
# For Aidge, parse all kernels source code and retrieve function prototypes
generate_file(
str(dnn_folder / "include" / "network_functions.h"),
str(ROOT / "templates" / "network" / "network_prototypes.jinja"),
libraries=[],
functions=get_functions_from_c_folder(str(dnn_folder / "src" / "kernels")),
)
elif library == "n2d2":
# For N2D2, parse all the files in include/kernel/ and retrieve the names of the files
generate_file(
str(dnn_folder / "include" / "network_functions.h"),
str(ROOT / "templates" / "network" / "network_prototypes.jinja"),
libraries=[],
files=[str(Path("kernels") / x) for x in get_filenames_from_folder(str(dnn_folder / "include" / "kernels"), r'^.*\.hpp$')],
)
if isinstance(path, str): path = Path(path)
# Create dnn directory is not exist
dnn_folder = path / "dnn"
os.makedirs(str(dnn_folder), exist_ok=True)
# Generate dnn API
generate_file(
str(dnn_folder / "include" / "dnn.h"),
str(ROOT / "templates" / "network" / "dnn_header.jinja"),
libraries=["stdint.h"],
functions=get_functions_from_c_file(str(dnn_folder / "src" / forward_file)),
)
# Determine which board the user wants
# to select correct config
# Copy all static files in the export
shutil.copytree(BOARDS_MAP[board], str(path), dirs_exist_ok=True)
# For N2D2 library, copy static folder to export/include
dnn_include_folder = dnn_folder / "include"
os.makedirs(str(dnn_include_folder), exist_ok=True)
shutil.copytree(str(ROOT / "_N2D2" / "static"), str(dnn_include_folder), dirs_exist_ok=True)
from aidge_core.export_utils import ExportLib
class ExportLibAidgeARM(ExportLib):
_name="aidge_arm"
# TODO ugly fix for Tensor registration issue...
import aidge_core
aidge_core.register_Tensor(["aidge_arm", aidge_core.dtype.float32],
aidge_core.get_key_value_Tensor(["cpu", aidge_core.dtype.float32]))
class ExportLibCMSISNN(ExportLib):
_name="export_cmsisnn"
import os
import shutil
from typing import List
from pathlib import Path
import aidge_core
import aidge_backend_cpu
# for each layer,
# name [size, stride, length, count, contigious offset, contigious size, wrapping offset, wrapping size]
# true values [nb_outputs, nb_outputs, width, width, offset start, total size, 0, 0]
# Example:
#define ENV_MEM_SIZE 3
#define ENV_MEM_STRIDE 3
#define ENV_MEM_LENGTH 224
#define ENV_MEM_COUNT 224
#define ENV_MEM_CONT_OFFSET 0
#define ENV_MEM_CONT_SIZE 150528
#define ENV_MEM_WRAP_OFFSET 0
#define ENV_MEM_WRAP_SIZE 0
MEMORY_INFO_TEMPLATE = ["layer_name", "size", "stride", "length", "count", "cont_offset", "cont_size", "wrap_offset", "wrap_size"]
# for each layer, name: [size, offset start] (old style)
# Example:
#define ENV_MEM_SIZE 3
#define ENV_OFFSET 0
# MEMORY_INFO_TEMPLATE = ["layer_name", "size", "offset"]
# Default memory management, which can be used for development
def compute_default_mem_info(scheduler: aidge_core.Scheduler):
list_forward_nodes = scheduler
mem_info = []
mem_size = 0
# Exclude Producers and the last layers (because the results are stored outside the export)
for i, node in enumerate(list_forward_nodes):
if node.type() != "Producer" and node.type() != "Reshape":
# if node.type() != "Producer":
if len(node.get_children()) != 0:
dims = node.get_operator().get_output(0).dims()
mem = 1
for dim in dims:
mem *= dim
# Add memory info
# Only size and cont_offset matter
mem_info.append([node.name(), mem, 0, 0, 0, mem_size, mem, 0, 0])
# Increment offset for the next layer
mem_size += mem
return mem_size, mem_info
def generate_optimized_memory_info(stats_folder: Path,
scheduler: aidge_core.Scheduler,
wrapping:bool = False):
# The forward dims has to done outside the function
# Also supposed the generation of the scheduler has been performed outside
# Otherwise decomment the following line
# scheduler.generate_scheduling()
# Generate the memory manager
# So far, the Producers are not take in consideration in the meory manager => inc_producers=False
mem_manager = scheduler.generate_memory(inc_producers=False, wrap_around_buffer=wrapping)
# In the export, we currently use an unified memory buffer whose size
# is determined by the memory peak usage
mem_size = mem_manager.get_peak_usage()
mem_info = []
mem_planes = mem_manager.get_planes()
for node in scheduler.get_static_scheduling():
# Skip memory management for the parameter producers
if node.type() == "Producer":
if node.get_operator().attr.constant:
continue
else:
# Input memory management (suppose tensor ends with [:, channel, height, width]))
tensor = node.get_operator().get_output(0)
if tensor is None:
raise RuntimeError("Warning input producer not provided")
if len(tensor.dims()) < 3:
raise RuntimeError("Input producer dimensions must be with [:, channel, height, width]")
name = node.name()
size = tensor.dims()[-3] # Should be nb_channels
stride = tensor.dims()[-3] # Should be nb_channels
length = tensor.dims()[-1] # Should be width
count = tensor.dims()[-2] # Should be height
cont_offset = 0 # Suppose input data is stored outside the export function
# so the memory offset is not important to consider
cont_size = tensor.dims()[-1] * tensor.dims()[-2] * tensor.dims()[-3] # Size of input
wrap_offset = 0 # No wrapping
wrap_size = 0 # No wrapping
# elif node.type() != "Reshape":
else:
plane = mem_planes[node][0]
name = node.name()
size = plane.size
stride = plane.stride
length = plane.length
count = plane.count
cont_offset = plane.get_contiguous_offset()
cont_size = plane.get_contiguous_size()
wrap_offset = plane.get_wrapped_offset()
wrap_size = plane.get_wrapped_size()
mem_info.append([name, size, stride, length, count,
cont_offset, cont_size, wrap_offset, wrap_size])
# Use gnuplot to generate the log
try:
os.makedirs(str(stats_folder / "graph"), exist_ok=True)
mem_manager.log("memory_info")
os.chmod("memory_info_plot.gnu", 0o777)
os.system("./memory_info_plot.gnu")
shutil.move("memory_info", str(stats_folder / "graph"/ "memory_info"))
shutil.move("memory_info_plot.png", str(stats_folder / "graph" / "memory_info_plot.png"))
os.remove("memory_info_plot.gnu")
except:
print("Please install gnuplot if you want memory plot from MemoryManager.")
return mem_size, mem_info
\ No newline at end of file
This diff is collapsed.
import os
import shutil
import numpy as np
from pathlib import Path
from jinja2 import Environment, FileSystemLoader
from aidge_core import ExportNode
from aidge_export_arm_cortexm.utils import ROOT, operator_register
##############################################
############## Export functions ##############
##############################################
def generate_file(filename, templatename, **kwargs):
# Get directory name of the file
dirname = os.path.dirname(filename)
# If directory doesn't exist, create it
if not os.path.exists(dirname):
os.makedirs(dirname)
# Get directory name and name of the template
template_dir = os.path.dirname(templatename)
template_name = os.path.basename(templatename)
# Select template
template = Environment(loader=FileSystemLoader(template_dir)).get_template(template_name)
# Generate file
content = template.render(kwargs)
with open(filename, mode="w", encoding="utf-8") as message:
message.write(content)
def generate_action(template_path, **kwargs):
dirname = os.path.dirname(template_path)
filename = os.path.basename(template_path)
template = Environment(loader=FileSystemLoader(dirname)).get_template(filename)
return template.render(kwargs)
def copyfile(filename, dst_folder):
# If directory doesn't exist, create it
if not os.path.exists(dst_folder):
os.makedirs(dst_folder)
shutil.copy(filename, dst_folder)
def export_to_static(name, array, filepath):
# Get directory name of the file
dirname = os.path.dirname(filepath)
# If directory doesn't exist, create it
if not os.path.exists(dirname):
os.makedirs(dirname)
generate_file(
filepath,
str(ROOT) + "/templates/data/data_static.jinja",
dims = array.shape,
data_t = "float",
name = name,
values = array.tolist()
)
##############################################
################### Utils ####################
##############################################
def get_node_parents(node):
parents = []
for parent in node.get_parents():
if parent.type() != "Producer":
parents.append(parent)
return parents
def get_producer_parents(node):
parents = []
for parent in node.get_parents():
if parent.type() == "Producer":
parents.append(parent)
return parents
##############################################
################### Actions ##################
##############################################
def set_up_output(name, datatype):
return f"{datatype}* {name} = ({datatype}*) mem + {name.upper()}_OFFSET;"
##############################################
############## Operators helper ##############
##############################################
@operator_register("Add")
class Add(ExportNode):
def __init__(self, node, board, dataformat, library):
# Copy dims for first input
node.get_operator().get_output(0).resize(node.get_operator().get_input(0).dims())
super().__init__(node)
self.board = board
self.library = library
self.dataformat = dataformat
def export(self, export_folder:str, list_configs:list):
# Copying kernel into export
# Find a more generic system for future dev
if self.library == "aidge":
if self.dataformat == "float32":
copyfile(str(ROOT / "kernels" / "ElemWise" / "Add" / "aidge_add_float32.c"),
str(Path(export_folder) / "src" / "kernels"))
# Add to config list the include of configurations
list_configs.append(f"layers/{self.name}.h")
# Export configuration file
generate_file(
f"{export_folder}/layers/{self.name}.h",
str(ROOT / "templates" / "configuration" / "elemwise.jinja"),
name=self.name,
elemwise_op="\"ADD\"",
nb_inputs=np.prod(self.inputs_dims[0]),
nb_outputs=np.prod(self.outputs_dims[0]))
return list_configs
def forward(self, list_actions:list):
if not self.is_last:
list_actions.append(set_up_output(self.name, "float"))
list_actions.append(generate_action(
str(ROOT / "templates" / "kernel" / "elemwise.jinja"),
name=self.name,
elemwise_type="add",
dataformat="float32", # Only this choice so far
input1_name=self.inputs[0].name(),
input2_name=self.inputs[1].name(),
output_name=self.name
))
return list_actions
@operator_register("Sub")
class Sub(ExportNode):
def __init__(self, node, board, dataformat, library):
# Copy dims for first input
node.get_operator().get_output(0).resize(node.get_operator().get_input(0).dims())
super().__init__(node)
self.board = board
self.library = library
self.dataformat = dataformat
def export(self, export_folder:str, list_configs:list):
# Copying kernel into export
# Find a more generic system for future dev
if self.library == "aidge":
if self.dataformat == "float32":
copyfile(str(ROOT / "kernels" / "ElemWise" / "Sub" / "aidge_sub_float32.c"),
str(Path(export_folder) / "src" / "kernels"))
# Add to config list the include of configurations
list_configs.append(f"layers/{self.name}.h")
# Export configuration file
generate_file(
f"{export_folder}/layers/{self.name}.h",
str(ROOT / "templates" / "configuration" / "elemwise.jinja"),
name=self.name,
elemwise_op="\"SUB\"",
nb_inputs=np.prod(self.inputs_dims[0]),
nb_outputs=np.prod(self.outputs_dims[0]))
return list_configs
def forward(self, list_actions:list):
if not self.is_last:
list_actions.append(set_up_output(self.name, "float"))
list_actions.append(generate_action(
str(ROOT / "templates" / "kernel" / "elemwise.jinja"),
name=self.name,
elemwise_type="sub",
dataformat="float32", # Only this choice so far
input1_name=self.inputs[0].name(),
input2_name=self.inputs[1].name(),
output_name=self.name
))
return list_actions
@operator_register("Mul")
class Mul(ExportNode):
def __init__(self, node, board, dataformat, library):
# Copy dims for first input
node.get_operator().get_output(0).resize(node.get_operator().get_input(0).dims())
super().__init__(node)
self.board = board
self.library = library
self.dataformat = dataformat
def export(self, export_folder:str, list_configs:list):
# Copying kernel into export
# Find a more generic system for future dev
if self.library == "aidge":
if self.dataformat == "float32":
copyfile(str(ROOT / "kernels" / "ElemWise" / "Mul" / "aidge_mul_float32.c"),
str(Path(export_folder) / "src" / "kernels"))
# Add to config list the include of configurations
list_configs.append(f"layers/{self.name}.h")
# Export configuration file
generate_file(
f"{export_folder}/layers/{self.name}.h",
str(ROOT / "templates" / "configuration" / "elemwise.jinja"),
name=self.name,
elemwise_op="\"MUL\"",
nb_inputs=np.prod(self.inputs_dims[0]),
nb_outputs=np.prod(self.outputs_dims[0]))
return list_configs
def forward(self, list_actions:list):
if not self.is_last:
list_actions.append(set_up_output(self.name, "float"))
list_actions.append(generate_action(
str(ROOT / "templates" / "kernel" / "elemwise.jinja"),
name=self.name,
elemwise_type="mul",
dataformat="float32", # Only this choice so far
input1_name=self.inputs[0].name(),
input2_name=self.inputs[1].name(),
output_name=self.name
))
return list_actions
@operator_register("Div")
class Div(ExportNode):
def __init__(self, node, board, dataformat, library):
# Copy dims for first input
node.get_operator().get_output(0).resize(node.get_operator().get_input(0).dims())
super().__init__(node)
self.board = board
self.library = library
self.dataformat = dataformat
def export(self, export_folder:str, list_configs:list):
# Copying kernel into export
# Find a more generic system for future dev
if self.library == "aidge":
if self.dataformat == "float32":
copyfile(str(ROOT / "kernels" / "ElemWise" / "Div" / "aidge_div_float32.c"),
str(Path(export_folder) / "src" / "kernels"))
# Add to config list the include of configurations
list_configs.append(f"layers/{self.name}.h")
# Export configuration file
generate_file(
f"{export_folder}/layers/{self.name}.h",
str(ROOT / "templates" / "configuration" / "elemwise.jinja"),
name=self.name,
elemwise_op="\"DIV\"",
nb_inputs=np.prod(self.inputs_dims[0]),
nb_outputs=np.prod(self.outputs_dims[0]))
return list_configs
def forward(self, list_actions:list):
if not self.is_last:
list_actions.append(set_up_output(self.name, "float"))
list_actions.append(generate_action(
str(ROOT / "templates" / "kernel" / "elemwise.jinja"),
name=self.name,
elemwise_type="div",
dataformat="float32", # Only this choice so far
input1_name=self.inputs[0].name(),
input2_name=self.inputs[1].name(),
output_name=self.name
))
return list_actions
@operator_register("Gemm")
class Gemm(ExportNode):
def __init__(self, node, board, dataformat, library):
w_dims = node.get_operator().get_input(1).dims()
node.get_operator().get_output(0).resize([w_dims[1]])
super().__init__(node)
self.board = board
self.library = library
self.dataformat = dataformat
def export(self, export_folder:str, list_configs:list):
# Copying kernel into export
# Find a more generic system for future dev
if self.library == "aidge":
if self.dataformat == "float32":
copyfile(str(ROOT / "kernels" / "FullyConnected" / "aidge_fc_float32.c"),
str(Path(export_folder) / "src" / "kernels"))
# Add to config list the include of configurations
list_configs.append(f"layers/{self.name}.h")
# Export configuration file
generate_file(
f"{export_folder}/layers/{self.name}.h",
str(ROOT / "templates" / "configuration" / "fullyconnected.jinja"),
name=self.name,
nb_channels=self.inputs_dims[0][0],
nb_outputs=self.outputs_dims[0][0],
biases_size=self.outputs_dims[0][0])
return list_configs
def forward(self, list_actions:list):
if not self.is_last:
list_actions.append(set_up_output(self.name, "float"))
list_actions.append(generate_action(
str(ROOT / "templates" / "kernel" / "fullyconnected.jinja"),
name=self.name,
dataformat="float32", # Only this choice so far
input_name=self.inputs[0].name(),
weight_name=self.inputs[1].name(),
bias_name=self.inputs[2].name(),
output_name=self.name
))
return list_actions
@operator_register("Atan")
class Atan(ExportNode):
def __init__(self, node, board, dataformat, library):
# Copy dims for first input
node.get_operator().get_output(0).resize(node.get_operator().get_input(0).dims())
super().__init__(node)
self.board = board
self.library = library
self.dataformat = dataformat
def export(self, export_folder:str, list_configs:list):
# Copying kernel into export
# Find a more generic system for future dev
if self.library == "aidge":
if self.dataformat == "float32":
copyfile(str(ROOT / "kernels" / "Activation" / "Atan" / "aidge_atan_float32.c"),
str(Path(export_folder) / "src" / "kernels"))
# Add to config list the include of configurations
list_configs.append(f"layers/{self.name}.h")
# Export configuration file
generate_file(
f"{export_folder}/layers/{self.name}.h",
str(ROOT / "templates" / "configuration" / "activation.jinja"),
name=self.name,
activation_type="\"ATAN\"",
nb_inputs=np.prod(self.inputs_dims[0]),
nb_outputs=np.prod(self.outputs_dims[0]))
return list_configs
def forward(self, list_actions:list):
if not self.is_last:
list_actions.append(set_up_output(self.name, "float"))
list_actions.append(generate_action(
str(ROOT / "templates" / "kernel" / "activation.jinja"),
name=self.name,
activation_type="atan",
dataformat="float32", # Only this choice so far
input_name=self.inputs[0].name(),
output_name=self.name
))
return list_actions
@operator_register("Slice")
class Slice(ExportNode):
def __init__(self, node, board, dataformat, library):
self.axes = node.get_operator().attr.axes
self.starts = node.get_operator().attr.starts
self.ends = node.get_operator().attr.ends
# Compute output dims
out_dims = [self.ends[x-1] - self.starts[x-1] for x in self.axes]
node.get_operator().get_output(0).resize(out_dims)
super().__init__(node)
self.board = board
self.library = library
self.dataformat = dataformat
def export(self, export_folder:str, list_configs:list):
# Copying kernel into export
# Find a more generic system for future dev
if self.library == "aidge":
if self.dataformat == "float32":
copyfile(str(ROOT / "kernels" / "Slice" / "aidge_slice_float32.c"),
str(Path(export_folder) / "src" / "kernels"))
# Add to config list the include of configurations
list_configs.append(f"layers/{self.name}.h")
# Export configuration file
generate_file(
f"{export_folder}/layers/{self.name}.h",
str(ROOT / "templates" / "configuration" / "slice.jinja"),
name=self.name,
axes=self.axes,
starts=self.starts,
ends=self.ends,
nb_inputs=np.prod(self.inputs_dims[0]),
nb_outputs=np.prod(self.outputs_dims[0]))
return list_configs
def forward(self, list_actions:list):
if not self.is_last:
list_actions.append(set_up_output(self.name, "float"))
list_actions.append(generate_action(
str(ROOT / "templates" / "kernel" / "slice.jinja"),
name=self.name,
dataformat="float32", # Only this choice so far
input_name=self.inputs[0].name(),
output_name=self.name
))
return list_actions
@operator_register("Concat")
class Concat(ExportNode):
def __init__(self, node, board, dataformat, library):
self.axis = node.get_operator().attr.axis
out_dims = node.get_operator().get_input(0).dims()
out_dims[self.axis - 1] = 0
for parent in node.get_parents():
out_dims[self.axis - 1] += parent.get_operator().get_output(0).dims()[self.axis - 1]
node.get_operator().get_output(0).resize(out_dims)
super().__init__(node)
self.board = board
self.library = library
self.dataformat = dataformat
def export(self, export_folder:str, list_configs:list):
# Copying kernel into export
# Find a more generic system for future dev
if self.library == "aidge":
if self.dataformat == "float32":
copyfile(str(ROOT / "kernels" / "Concat" / "aidge_concat_float32.c"),
str(Path(export_folder) / "src" / "kernels"))
# Add to config list the include of configurations
list_configs.append(f"layers/{self.name}.h")
# Get all input size
list_input_size = []
for i in range(len(self.inputs)):
list_input_size.append(np.prod(self.node.get_operator().get_input(i).dims()))
# Export configuration file
generate_file(
f"{export_folder}/layers/{self.name}.h",
str(ROOT / "templates" / "configuration" / "concat.jinja"),
name=self.name,
nb_inputs=len(self.node.get_parents()),
axis=self.axis,
list_input_size=list_input_size,
output_size=np.sum(list_input_size)
)
return list_configs
def forward(self, list_actions:list):
if not self.is_last:
list_actions.append(set_up_output(self.name, "float"))
list_input_names = []
for i in range(len(self.inputs)):
list_input_names.append(self.inputs[i].name())
list_actions.append(generate_action(
str(ROOT / "templates" / "kernel" / "concat.jinja"),
name=self.name,
dataformat="float32",
nb_inputs=len(self.inputs),
list_in_names=list_input_names,
output_name=self.name,
))
return list_actions
@operator_register("Producer")
class Producer(ExportNode):
"""
If there is a standardization of the export operators
then this class should be just a inheritance of ProducerCPP
"""
def __init__(self, node, board, dataformat, library):
super().__init__(node)
self.board = board
self.library = library
self.dataformat = dataformat
self.values = np.array(self.operator.get_output(0))
def export(self, export_folder:str, list_configs:list):
list_configs.append(f"parameters/{self.name}.h")
export_to_static(self.name,
self.values.reshape(-1),
f"{export_folder}/parameters/{self.name}.h")
return list_configs
def forward(self, list_actions:list):
return list_actions
#ifndef MEM_INFO_H
#define MEM_INFO_H
#define MEMORY_SIZE {{ mem_size }}
#define MEMORY_ALIGNMENT {{ mem_alignment }}
{% for i in range(mem_info|length) -%}
{%- set layer_name = mem_info[i][0] %}
/* {{layer_name}} memory */
{% for j in range(1, mem_info[i]|length) %}
#define {{ layer_name|upper }}_MEM_{{ mem_info_legends[j]|upper }} {{ mem_info[i][j] }}
{%- endfor %}
{% endfor %}
#endif /* MEM_INFO_H */
{#- For name header -#}
#ifndef DNN_H
#define DNN_H
#ifdef __cplusplus
extern "C" {
#endif
{#- For libraries #}
{% for lib in libraries %}
#include <{{ lib }}>
{%- endfor %}
{% for func in functions %}
{{ func }}
{% endfor %}
#ifdef __cplusplus
}
#endif
#endif /* DNN_H */
\ No newline at end of file
{#- For libraries -#}
#include <stdint.h>
#include "dnn.h"
#include "network_functions.h"
// Layer & memory configurations
{%- for header in headers %}
#include "{{ header }}"
{%- endfor %}
{# mem has the datatype of the firt input #}
{#- Change here to improve it -#}
{% if inputs[0][0] %}
static {{inputs[0][0]}} mem[MEMORY_SIZE];
{% else %}
static float mem[MEMORY_SIZE];
{% endif %}
{# Forward function #}
{#- Support multiple inputs with different datatypes and multiple outputs with different datatypes -#}
void model_forward({% for inp in inputs %}const {{inp[0]}}* {{inp[1]}}, {% endfor %}{% for out in outputs %}{{out[0]}}* {{out[1]}}{{ ", " if not loop.last else "" }}{% endfor %})
{
{%- for action in actions %}
{{ action }}
{%- endfor %}
}
{#- For name header -#}
#ifndef NETWORK_FUNCTIONS_HPP
#define NETWORK_FUNCTIONS_HPP
{#- For libraries #}
{% for lib in libraries %}
#include <{{ lib }}>
{%- endfor %}
{% for file in files %}
#include "{{ file }}"
{%- endfor %}
{% for func in functions %}
{{ func }}
{% endfor %}
#endif /* NETWORK_FUNCTIONS_HPP */
\ No newline at end of file
from pathlib import Path
import os
# Constants
FILE = Path(__file__).resolve()
ROOT = FILE.parents[1]
def get_all_available_boards():
boards = {}
directory_path = Path(str(ROOT / "boards"))
for subfolder in directory_path.rglob('*'):
if subfolder.is_dir() and \
subfolder.name != "__pycache__" and \
(subfolder.parent / '__init__.py').exists() and \
not (subfolder / '__init__.py').exists():
# Get relative path to boards directory
relpath = str(subfolder.relative_to(directory_path))
# Get board name
board_name = relpath.replace('/', '').replace('\\', '')
boards[board_name.lower()] = str(subfolder)
return boards
AVAILABLE_BOARDS = get_all_available_boards()
def has_board(board_name: str) -> bool:
return board_name.lower() in AVAILABLE_BOARDS.keys()
OPERATORS_REGISTRY = {}
def operator_register(*args):
key_list = [arg for arg in args]
def decorator(operator):
class Wrapper(operator):
def __init__(self, *args, **kwargs):
return operator(*args, **kwargs)
for key in key_list:
OPERATORS_REGISTRY[key] = operator
return Wrapper
return decorator
def supported_operators():
return list(OPERATORS_REGISTRY.keys())
import numpy as np
import aidge_core
def numpy_dtype2ctype(dtype):
if dtype == np.int8:
return "int8_t"
elif dtype == np.uint8:
return "uint8_t"
elif dtype == np.int16:
return "int16_t"
elif dtype == np.int32:
return "int32_t"
elif dtype == np.int64:
return "int64_t"
elif dtype == np.float32:
return "float"
elif dtype == np.float64:
return "double"
# Add more dtype mappings as needed
else:
raise ValueError(f"Unsupported {dtype} dtype")
def aidge_datatype2ctype(datatype):
if datatype == aidge_core.dtype.int8:
return "int8_t"
elif datatype == aidge_core.dtype.uint8:
return "uint8_t"
elif datatype == aidge_core.dtype.int32:
return "int32_t"
elif datatype == aidge_core.dtype.int64:
return "int64_t"
elif datatype == aidge_core.dtype.float32:
return "float"
elif datatype == aidge_core.dtype.float64:
return "double"
# Add more dtype mappings as needed
else:
raise ValueError(f"Unsupported {datatype} aidge dtype")
def aidge_datatype2dataformat(datatype):
if datatype == aidge_core.dtype.int8:
return "int8"
elif datatype == aidge_core.dtype.int32:
return "int32"
elif datatype == aidge_core.dtype.int64:
return "int64"
elif datatype == aidge_core.dtype.float32:
return "float32"
elif datatype == aidge_core.dtype.float64:
return "float64"
# Add more dtype mappings as needed
else:
raise ValueError(f"Unsupported {datatype} aidge dtype")
import re
import os
import shutil
from jinja2 import Environment, FileSystemLoader
def get_functions_from_c_file(file_path):
functions = []
pattern = r'\w+\s+(\w+)\s*\(([^)]*)\)\s*{'
keyword = ['else', 'for', 'if', 'while', 'do']
with open(file_path, 'r') as file:
file_content = file.read()
matches = re.findall(pattern, file_content)
for match in matches:
function_name = match[0]
if function_name in keyword:
continue
arguments = match[1].split(',')
arguments = [arg.strip() for arg in arguments]
return_type = get_return_type(file_content, function_name)
function_string = f"{return_type} {function_name}({', '.join(arguments)});"
functions.append(function_string)
return functions
def get_return_type(file_content, function_name):
pattern = rf'\w+\s+{function_name}\s*\([^)]*\)\s*{{'
return_type = re.search(pattern, file_content).group()
return_type = return_type.split()[0].strip()
return return_type
def get_functions_from_c_folder(folder_path):
functions = []
for _, _, files in os.walk(folder_path):
for file in files:
functions += get_functions_from_c_file(os.path.join(folder_path, file))
return functions
def get_filenames_from_folder(folder_path: str, pattern: str = r'.*'):
# Ensure the provided folder path exists
if not os.path.isdir(folder_path):
raise ValueError(f"The provided folder path '{folder_path}' does not exist.")
# Compile the regex pattern
regex = re.compile(pattern)
# List all files and directories in the provided folder path
all_entries = os.listdir(folder_path)
# Use a regex pattern to filter only filenames (excluding directories)
filenames = [entry for entry in all_entries if os.path.isfile(os.path.join(folder_path, entry)) and regex.match(entry)]
return filenames
def copyfile(filename, dst_folder):
# If directory doesn't exist, create it
if not os.path.exists(dst_folder):
os.makedirs(dst_folder)
shutil.copy(filename, dst_folder)
def topological_sort(graphview):
"""Take an Aidge Graphview
and returns a list of nodes topologically sorting
"""
nodes = graphview.get_nodes()
result = []
visited = set()
visiting = set() # To track nodes being currently visited
def visit(node):
if node in visiting:
raise ValueError("Graph contains a cycle")
if node in visited:
return
visiting.add(node)
for parent in node.get_parents():
if parent and parent in nodes:
visit(parent)
visiting.remove(node)
visited.add(node)
result.append(node)
for node in nodes:
visit(node)
return result
# Examples on how to use this module Aidge ARM CortexM Export
This folder contains some examples on how to use the `Aidge ARM CortexM Export` module in your projects.
- [LeNet export for MNIST dataset](./export_LeNet/)
Feel free to propose your own contributions with this module !
\ No newline at end of file
%% Cell type:markdown id: tags:
# Export a MNIST model to a CPP standalone project
%% Cell type:code id: tags:
``` python
%pip install requests numpy ipywidgets ipycanvas
```
%% Cell type:markdown id: tags:
## Download the model
%% Cell type:code id: tags:
``` python
import os
import requests
```
%% Cell type:code id: tags:
``` python
# Download onnx file if it has not been done before
if not os.path.isfile("./lenet_mnist.onnx"):
response = requests.get("https://huggingface.co/vtemplier/LeNet_MNIST/resolve/main/lenet_mnist.onnx?download=true")
if response.status_code == 200:
with open("lenet_mnist.onnx", 'wb') as f:
f.write(response.content)
print("ONNX model downloaded successfully.")
else:
print("Failed to download ONNX model. Status code:", response.status_code)
```
%% Cell type:markdown id: tags:
## Load the model in Aidge and manipulate it
%% Cell type:code id: tags:
``` python
import aidge_core
import aidge_backend_cpu
import aidge_onnx
import aidge_export_cpp
import aidge_export_arm_cortexm
```
%% Cell type:code id: tags:
``` python
model = aidge_onnx.load_onnx("lenet_mnist.onnx")
```
%% Cell type:code id: tags:
``` python
# Remove Flatten node, useless in the CPP export
aidge_core.remove_flatten(model)
# Freeze the model by setting constant to parameters producers
for node in model.get_nodes():
if node.type() == "Producer":
node.get_operator().set_attr("Constant", True)
# Create Producer Node for the Graph
input_node = aidge_core.Producer([1, 1, 28, 28], "input")
input_node.add_child(model)
model.add(input_node)
# Configuration for the model + forward dimensions
model.compile("cpu", aidge_core.DataType.Float32)
```
%% Cell type:code id: tags:
``` python
# Generate scheduling of the model
scheduler = aidge_core.SequentialScheduler(model)
scheduler.generate_scheduling()
```
%% Cell type:markdown id: tags:
## Export the model
%% Cell type:code id: tags:
``` python
aidge_export_arm_cortexm.export("lenet_export_fp32", model, scheduler, board="stm32h7")
```
%% Cell type:markdown id: tags:
### Draw your own number
%% Cell type:code id: tags:
``` python
from ipywidgets import HBox, VBox, Button, Layout
from ipycanvas import RoughCanvas, hold_canvas
img_name = "my_number.png"
canvas = RoughCanvas(width=28, height=28, sync_image_data=True)
button_gen = Button(description="Generate PNG")
button_clear = Button(description="Clear")
drawing = False
position = None
shape = []
def on_erase_button_clicked(b):
canvas.clear()
def on_generate_button_clicked(b):
try:
canvas.to_file(img_name)
print(f"Image generated to {img_name} !")
except:
print("Draw a number before generating the image.")
button_clear.on_click(on_erase_button_clicked)
button_gen.on_click(on_generate_button_clicked)
def on_mouse_down(x, y):
global drawing
global position
global shape
drawing = True
position = (x, y)
shape = [position]
def on_mouse_move(x, y):
global drawing
global position
global shape
if not drawing:
return
with hold_canvas():
canvas.stroke_line(position[0], position[1], x, y)
position = (x, y)
shape.append(position)
def on_mouse_up(x, y):
global drawing
global position
global shape
drawing = False
with hold_canvas():
canvas.stroke_line(position[0], position[1], x, y)
shape = []
canvas.on_mouse_down(on_mouse_down)
canvas.on_mouse_move(on_mouse_move)
canvas.on_mouse_up(on_mouse_up)
canvas.stroke_style = "#000000"
VBox((canvas, HBox((button_gen, button_clear))),
layout=Layout(height='auto', width="300px"))
```
%% Cell type:markdown id: tags:
### Generate inputs for testing the model from your drawing
%% Cell type:code id: tags:
``` python
try:
number_np = canvas.get_image_data()
# We got a numpy array with the shape of (28,28,4)
# Transform it to (28,28)
x = number_np[:, :, 3].astype("float32")
# Convert from [0, 255] to [0, 1] and export it
aidge_export_cpp.generate_input_file(export_folder="lenet_export_fp32",
array_name="inputs",
array=x / 255)
except:
print("Please draw a number in the previous cell before running this one.")
```
%% Cell type:markdown id: tags:
### Compile the export and test it
%% Cell type:code id: tags:
``` python
!cd lenet_export_fp32 && make build_image_docker && make build_docker
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment