Skip to content
Snippets Groups Projects
Commit 8aa14901 authored by Swetha Lakshmana Murthy's avatar Swetha Lakshmana Murthy
Browse files

Added: Template Code

parent 92d47df8
No related branches found
No related tags found
No related merge requests found
# Use a slim Python base image
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
build-essential \
gcc \
libffi-dev \
libssl-dev \
git \
curl \
ca-certificates \
tzdata \
protobuf-compiler && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set timezone
ARG TIMEZONE=Europe/Berlin
ENV TZ=$TIMEZONE
RUN ln -snf /usr/share/zoneinfo/$TIMEZONE /etc/localtime && echo $TIMEZONE > /etc/timezone
# Set working directory
WORKDIR /weather-agent
# Copy requirements and install
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy project files
COPY . .
# Compile .proto file
RUN python3 -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
agent.proto
# Run the server
CMD ["python3", "-u", "server.py"]
syntax = "proto3";
message Empty {
}
enum AgentType {
UNKNOWN = 0;
WEBSCRAPER = 1;
PLACES = 2;
WEATHER = 3;
NEWS = 4;
WIKI = 5;
URBAN_PLANNING = 6;
OTHER = 7;
YOUR_TOOL_NAME_ENUM = 8;
}
message FunctionSchemasResponse {
AgentType agent_type = 1;
repeated string function_schemas = 2;
}
message ToolRequest {
AgentType agent_type = 1;
string tool_name = 2;
string arguments_json = 3;
}
message ToolInvocationResult {
AgentType agent_type = 1;
repeated string results = 2;
}
service ToolService {
rpc SendFunctionSchemas (Empty) returns (stream FunctionSchemasResponse);
rpc InvokeTool (stream ToolRequest) returns (stream ToolInvocationResult);
}
#!/bin/bash
# Variables
IMAGE_NAME="your-tool-name"
REGISTRY_URL="cicd.ai4eu-dev.eu:7444"
REPO_PATH="tutorials/id-aiod-mcp"
TAG="latest"
# Step 1: Build the Docker image
echo "Building Docker image..."
docker build -t $IMAGE_NAME .
# Step 2: Tag the Docker image
echo "Tagging Docker image..."
docker tag $IMAGE_NAME $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Step 3: Push the Docker image to the registry
echo "Pushing Docker image to registry..."
docker push $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Check if the image was pushed successfully
if [ $? -eq 0 ]; then
echo "Docker image successfully pushed to $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG"
else
echo "Error: Docker push failed!"
exit 1
fi
\ No newline at end of file
from fastmcp import FastMCP
# Initialize MCP server
mcp = FastMCP("Your tool name")
print(f"Starting server {mcp.name}")
@mcp.tool()
def enter_tool_name(location_query: str):
"""YOur tool definition goes here"""
grpcio
grpcio-tools
grpc-interceptor
protobuf
multithreading
requests
fastmcp
import asyncio
import json
import grpc
from fastmcp import Client
from mcp_tool_name import mcp # Change this import accordingly
import agent_pb2
import agent_pb2_grpc
class ToolService(agent_pb2_grpc.ToolServiceServicer):
def __init__(self):
self.last_sent_function_schemas = None
async def SendFunctionSchemas(self, request, context):
print("Received request for SendFunctionSchemas (streaming).")
print(context.peer())
while True:
client = Client(mcp)
async with client:
available_tools = await client.list_tools()
serialized_schemas = []
for tool in available_tools:
function_schema_dict = {
"name": tool.name,
"description": tool.description or tool.name,
"parameters": {"type": "object", "properties": {}},
"required": [],
}
if hasattr(tool, "inputSchema") and tool.inputSchema:
if tool.inputSchema.get("type") == "object":
function_schema_dict["parameters"] = tool.inputSchema
serialized_schemas.append(json.dumps(function_schema_dict))
schema_update = agent_pb2.FunctionSchemasResponse(
agent_type=agent_pb2.YOUR_TOOL_NAME_ENUM, function_schemas=serialized_schemas # Add this to the ENUM in the protobuf file
)
# print("Sending schema update (heartbeat or change).")
yield schema_update
async def InvokeTool(self, request_iterator, context):
"""
Processes tool invocation requests from a client stream and returns tool results.
"""
print("📥 Received InvokeTool stream")
async for tool_request in request_iterator:
tool_name = tool_request.tool_name
agent_type = tool_request.agent_type # Enum value, e.g., 1 for WEBSCRAPER
arguments = json.loads(tool_request.arguments_json)
print(f"Invoking tool: {tool_name}")
print(f"Arguments as dict: {arguments}")
print(f"Agent Type: {agent_type}")
# Check if the agent type matches YOUR_TOOL_NAME_ENUM
if agent_type == agent_pb2.YOUR_TOOL_NAME_ENUM:
try:
client = Client(mcp)
async with client:
tool_result = await client.call_tool(tool_name, arguments)
print(f"---**---Tool invocation result: {tool_result}")
yield agent_pb2.ToolInvocationResult(
results=[str(tool_result)], agent_type=agent_type
)
except Exception as e:
print(f"Failed to invoke tool '{tool_name}': {str(e)}")
else:
print(f"Tool not allowed for agent type {agent_type}, skipping...")
print("Finished processing all InvokeTool requests.")
async def serve():
"""
Starts the gRPC server and listens for incoming requests.
"""
server = grpc.aio.server()
agent_pb2_grpc.add_ToolServiceServicer_to_server(ToolService(), server)
server.add_insecure_port("[::]:8061")
print("--*--gRPC server listening on port 8061...")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment