Skip to content
Snippets Groups Projects
Commit 2e75bb53 authored by Swetha Lakshmana Murthy's avatar Swetha Lakshmana Murthy
Browse files

Restructured Folder

parent 7cb3ae07
No related branches found
No related tags found
No related merge requests found
Showing
with 661 additions and 16 deletions
syntax = "proto3";
package combined;
// Shared empty message
message Empty {}
// Shared enum AgentType
enum AgentType {
UNKNOWN = 0;
WEBSCRAPER = 1;
PLACES = 2;
WEATHER = 3;
NEWS = 4;
WIKI = 5;
URBAN_PLANNING = 6;
OTHER = 7;
}
// Shared messages
message FunctionSchemasResponse {
AgentType agent_type = 1;
repeated string function_schemas = 2;
}
message ToolRequest {
AgentType agent_type = 1;
string tool_name = 2;
string arguments_json = 3;
}
message ToolInvocationResult {
AgentType agent_type = 1;
repeated string results = 2;
}
// Unique message from llm.proto
message UserMessage {
string message = 1;
}
// Services
service ToolService {
rpc SendFunctionSchemas (Empty) returns (stream FunctionSchemasResponse);
rpc InvokeTool (stream ToolRequest) returns (stream ToolInvocationResult);
}
service LLM {
rpc GetUserQuery (stream UserMessage) returns (Empty);
rpc GetFuncSchemas (stream FunctionSchemasResponse) returns (stream ToolRequest);
rpc RecvCallTool (stream ToolInvocationResult) returns (stream ToolInvocationResult);
}
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: combined.proto
# Protobuf Python Version: 5.29.0
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
5,
29,
0,
'',
'combined.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0e\x63ombined.proto\x12\x08\x63ombined\"\x07\n\x05\x45mpty\"\\\n\x17\x46unctionSchemasResponse\x12\'\n\nagent_type\x18\x01 \x01(\x0e\x32\x13.combined.AgentType\x12\x18\n\x10\x66unction_schemas\x18\x02 \x03(\t\"a\n\x0bToolRequest\x12\'\n\nagent_type\x18\x01 \x01(\x0e\x32\x13.combined.AgentType\x12\x11\n\ttool_name\x18\x02 \x01(\t\x12\x16\n\x0e\x61rguments_json\x18\x03 \x01(\t\"P\n\x14ToolInvocationResult\x12\'\n\nagent_type\x18\x01 \x01(\x0e\x32\x13.combined.AgentType\x12\x0f\n\x07results\x18\x02 \x03(\t\"\x1e\n\x0bUserMessage\x12\x0f\n\x07message\x18\x01 \x01(\t*t\n\tAgentType\x12\x0b\n\x07UNKNOWN\x10\x00\x12\x0e\n\nWEBSCRAPER\x10\x01\x12\n\n\x06PLACES\x10\x02\x12\x0b\n\x07WEATHER\x10\x03\x12\x08\n\x04NEWS\x10\x04\x12\x08\n\x04WIKI\x10\x05\x12\x12\n\x0eURBAN_PLANNING\x10\x06\x12\t\n\x05OTHER\x10\x07\x32\xa3\x01\n\x0bToolService\x12K\n\x13SendFunctionSchemas\x12\x0f.combined.Empty\x1a!.combined.FunctionSchemasResponse0\x01\x12G\n\nInvokeTool\x12\x15.combined.ToolRequest\x1a\x1e.combined.ToolInvocationResult(\x01\x30\x01\x32\xe3\x01\n\x03LLM\x12\x38\n\x0cGetUserQuery\x12\x15.combined.UserMessage\x1a\x0f.combined.Empty(\x01\x12N\n\x0eGetFuncSchemas\x12!.combined.FunctionSchemasResponse\x1a\x15.combined.ToolRequest(\x01\x30\x01\x12R\n\x0cRecvCallTool\x12\x1e.combined.ToolInvocationResult\x1a\x1e.combined.ToolInvocationResult(\x01\x30\x01\x62\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'combined_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals['_AGENTTYPE']._serialized_start=344
_globals['_AGENTTYPE']._serialized_end=460
_globals['_EMPTY']._serialized_start=28
_globals['_EMPTY']._serialized_end=35
_globals['_FUNCTIONSCHEMASRESPONSE']._serialized_start=37
_globals['_FUNCTIONSCHEMASRESPONSE']._serialized_end=129
_globals['_TOOLREQUEST']._serialized_start=131
_globals['_TOOLREQUEST']._serialized_end=228
_globals['_TOOLINVOCATIONRESULT']._serialized_start=230
_globals['_TOOLINVOCATIONRESULT']._serialized_end=310
_globals['_USERMESSAGE']._serialized_start=312
_globals['_USERMESSAGE']._serialized_end=342
_globals['_TOOLSERVICE']._serialized_start=463
_globals['_TOOLSERVICE']._serialized_end=626
_globals['_LLM']._serialized_start=629
_globals['_LLM']._serialized_end=856
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import warnings
import combined_pb2 as combined__pb2
GRPC_GENERATED_VERSION = '1.71.0'
GRPC_VERSION = grpc.__version__
_version_not_supported = False
try:
from grpc._utilities import first_version_is_lower
_version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
except ImportError:
_version_not_supported = True
if _version_not_supported:
raise RuntimeError(
f'The grpc package installed is at version {GRPC_VERSION},'
+ f' but the generated code in combined_pb2_grpc.py depends on'
+ f' grpcio>={GRPC_GENERATED_VERSION}.'
+ f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+ f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
)
class ToolServiceStub(object):
"""Services
"""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.SendFunctionSchemas = channel.unary_stream(
'/combined.ToolService/SendFunctionSchemas',
request_serializer=combined__pb2.Empty.SerializeToString,
response_deserializer=combined__pb2.FunctionSchemasResponse.FromString,
_registered_method=True)
self.InvokeTool = channel.stream_stream(
'/combined.ToolService/InvokeTool',
request_serializer=combined__pb2.ToolRequest.SerializeToString,
response_deserializer=combined__pb2.ToolInvocationResult.FromString,
_registered_method=True)
class ToolServiceServicer(object):
"""Services
"""
def SendFunctionSchemas(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def InvokeTool(self, request_iterator, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_ToolServiceServicer_to_server(servicer, server):
rpc_method_handlers = {
'SendFunctionSchemas': grpc.unary_stream_rpc_method_handler(
servicer.SendFunctionSchemas,
request_deserializer=combined__pb2.Empty.FromString,
response_serializer=combined__pb2.FunctionSchemasResponse.SerializeToString,
),
'InvokeTool': grpc.stream_stream_rpc_method_handler(
servicer.InvokeTool,
request_deserializer=combined__pb2.ToolRequest.FromString,
response_serializer=combined__pb2.ToolInvocationResult.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'combined.ToolService', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('combined.ToolService', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class ToolService(object):
"""Services
"""
@staticmethod
def SendFunctionSchemas(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/combined.ToolService/SendFunctionSchemas',
combined__pb2.Empty.SerializeToString,
combined__pb2.FunctionSchemasResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def InvokeTool(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(
request_iterator,
target,
'/combined.ToolService/InvokeTool',
combined__pb2.ToolRequest.SerializeToString,
combined__pb2.ToolInvocationResult.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
class LLMStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.GetUserQuery = channel.stream_unary(
'/combined.LLM/GetUserQuery',
request_serializer=combined__pb2.UserMessage.SerializeToString,
response_deserializer=combined__pb2.Empty.FromString,
_registered_method=True)
self.GetFuncSchemas = channel.stream_stream(
'/combined.LLM/GetFuncSchemas',
request_serializer=combined__pb2.FunctionSchemasResponse.SerializeToString,
response_deserializer=combined__pb2.ToolRequest.FromString,
_registered_method=True)
self.RecvCallTool = channel.stream_stream(
'/combined.LLM/RecvCallTool',
request_serializer=combined__pb2.ToolInvocationResult.SerializeToString,
response_deserializer=combined__pb2.ToolInvocationResult.FromString,
_registered_method=True)
class LLMServicer(object):
"""Missing associated documentation comment in .proto file."""
def GetUserQuery(self, request_iterator, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def GetFuncSchemas(self, request_iterator, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def RecvCallTool(self, request_iterator, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def add_LLMServicer_to_server(servicer, server):
rpc_method_handlers = {
'GetUserQuery': grpc.stream_unary_rpc_method_handler(
servicer.GetUserQuery,
request_deserializer=combined__pb2.UserMessage.FromString,
response_serializer=combined__pb2.Empty.SerializeToString,
),
'GetFuncSchemas': grpc.stream_stream_rpc_method_handler(
servicer.GetFuncSchemas,
request_deserializer=combined__pb2.FunctionSchemasResponse.FromString,
response_serializer=combined__pb2.ToolRequest.SerializeToString,
),
'RecvCallTool': grpc.stream_stream_rpc_method_handler(
servicer.RecvCallTool,
request_deserializer=combined__pb2.ToolInvocationResult.FromString,
response_serializer=combined__pb2.ToolInvocationResult.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'combined.LLM', rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
server.add_registered_method_handlers('combined.LLM', rpc_method_handlers)
# This class is part of an EXPERIMENTAL API.
class LLM(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def GetUserQuery(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_unary(
request_iterator,
target,
'/combined.LLM/GetUserQuery',
combined__pb2.UserMessage.SerializeToString,
combined__pb2.Empty.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def GetFuncSchemas(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(
request_iterator,
target,
'/combined.LLM/GetFuncSchemas',
combined__pb2.FunctionSchemasResponse.SerializeToString,
combined__pb2.ToolRequest.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
@staticmethod
def RecvCallTool(request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.stream_stream(
request_iterator,
target,
'/combined.LLM/RecvCallTool',
combined__pb2.ToolInvocationResult.SerializeToString,
combined__pb2.ToolInvocationResult.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)
# server.py
from fastmcp import FastMCP
import random
import requests
from bs4 import BeautifulSoup
import re
......@@ -12,23 +12,28 @@ mcp = FastMCP("WebScraper")
print(f"Starting server {mcp.name}")
# Add a web-scraping tool
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64)...",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)...",
# add several real user agents here to rotate
]
@mcp.tool()
def scrape_and_clean(url: str):
"""Scrapes and extracts structured content, headings, and text from a webpage."""
try:
import validators
if not validators.url(url):
return {"error": "Invalid URL format."}
headers = {"User-Agent": "Mozilla/5.0 (compatible; WebScraperBot/1.0)"}
response = requests.get(url, headers=headers, timeout=10)
headers = {
"User-Agent": random.choice(USER_AGENTS)
}
response = requests.get(url, headers=headers, timeout=15)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
# Remove unwanted tags
for tag in soup(["script", "style", "noscript", "iframe"]):
tag.decompose()
......@@ -54,10 +59,3 @@ def scrape_and_clean(url: str):
except Exception as e:
return {"error": str(e)}
@mcp.prompt()
def webscraper_prompt(mes_web_scraper: str) -> str:
"""Create an intelligent webscraper prompt."""
return f"Extract the main content, headings, and structured data from the following URL or query: '{mes_web_scraper}'."
......@@ -5,4 +5,5 @@ protobuf
multithreading
requests
fastmcp
bs4
\ No newline at end of file
bs4
validators
\ No newline at end of file
......@@ -41,7 +41,7 @@ class ToolService(agent_pb2_grpc.ToolServiceServicer):
agent_type=agent_pb2.WEBSCRAPER, function_schemas=serialized_schemas
)
print("🔄 Sending schema update (heartbeat or change).")
#print("Sending schema update (heartbeat or change).")
yield schema_update
async def InvokeTool(self, request_iterator, context):
......
#!/bin/bash
# Variables
IMAGE_NAME="mcp-to-do"
REGISTRY_URL="cicd.ai4eu-dev.eu:7444"
REPO_PATH="tutorials/id-aiod-mcp"
TAG="latest"
# Step 1: Build the Docker image
echo "Building Docker image..."
docker build -t $IMAGE_NAME .
# Step 2: Tag the Docker image
echo "Tagging Docker image..."
docker tag $IMAGE_NAME $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Step 3: Push the Docker image to the registry
echo "Pushing Docker image to registry..."
docker push $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Check if the image was pushed successfully
if [ $? -eq 0 ]; then
echo "Docker image successfully pushed to $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG"
else
echo "Error: Docker push failed!"
exit 1
fi
\ No newline at end of file
from fastmcp import FastMCP
import random
from datetime import datetime, timedelta
# Initialize MCP server
mcp = FastMCP("MyToDoList")
print(f"Starting server {mcp.name}")
task_names = [
"Buy groceries", "Read AI research paper", "Write blog post",
"Exercise", "Call Mom", "Plan vacation", "Fix bug in code",
"Prepare presentation", "Water plants", "Clean the kitchen"
]
def random_task():
name = random.choice(task_names)
priority = random.randint(1, 5) # 1 = highest, 5 = lowest
due_date = datetime.now() + timedelta(days=random.randint(0, 10))
return {
"task": name,
"priority": priority,
"due": due_date.strftime("%Y-%m-%d")
}
@mcp.tool()
def fetch_my_list():
"""Return a randomly generated list of tasks."""
sample_tasks = [random_task() for _ in range(random.randint(3, 6))]
return {"tasks": sample_tasks}
import asyncio
import json
import grpc
from fastmcp import Client
from mcp_to_do import mcp # Change this import accordingly
import agent_pb2
import agent_pb2_grpc
class ToolService(agent_pb2_grpc.ToolServiceServicer):
def __init__(self):
self.last_sent_function_schemas = None
async def SendFunctionSchemas(self, request, context):
print("Received request for SendFunctionSchemas (streaming).")
print(context.peer())
while True:
client = Client(mcp)
async with client:
available_tools = await client.list_tools()
serialized_schemas = []
for tool in available_tools:
function_schema_dict = {
"name": tool.name,
"description": tool.description or tool.name,
"parameters": {"type": "object", "properties": {}},
"required": [],
}
if hasattr(tool, "inputSchema") and tool.inputSchema:
if tool.inputSchema.get("type") == "object":
function_schema_dict["parameters"] = tool.inputSchema
serialized_schemas.append(json.dumps(function_schema_dict))
schema_update = agent_pb2.FunctionSchemasResponse(
agent_type=agent_pb2.OTHER, function_schemas=serialized_schemas # Add this to the ENUM in the protobuf file
)
# print("Sending schema update (heartbeat or change).")
yield schema_update
async def InvokeTool(self, request_iterator, context):
"""
Processes tool invocation requests from a client stream and returns tool results.
"""
print("📥 Received InvokeTool stream")
async for tool_request in request_iterator:
tool_name = tool_request.tool_name
agent_type = tool_request.agent_type # Enum value, e.g., 1 for WEBSCRAPER
arguments = json.loads(tool_request.arguments_json)
print(f"Invoking tool: {tool_name}")
print(f"Arguments as dict: {arguments}")
print(f"Agent Type: {agent_type}")
# Check if the agent type matches YOUR_TOOL_NAME_ENUM
if agent_type == agent_pb2.OTHER:
try:
client = Client(mcp)
async with client:
tool_result = await client.call_tool(tool_name, arguments)
print(f"---**---Tool invocation result: {tool_result}")
yield agent_pb2.ToolInvocationResult(
results=[str(tool_result)], agent_type=agent_type
)
except Exception as e:
print(f"Failed to invoke tool '{tool_name}': {str(e)}")
else:
print(f"Tool not allowed for agent type {agent_type}, skipping...")
print("Finished processing all InvokeTool requests.")
async def serve():
"""
Starts the gRPC server and listens for incoming requests.
"""
server = grpc.aio.server()
agent_pb2_grpc.add_ToolServiceServicer_to_server(ToolService(), server)
server.add_insecure_port("[::]:8061")
print("--*--gRPC server listening on port 8061...")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
# Use a slim Python base image
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
build-essential \
gcc \
libffi-dev \
libssl-dev \
git \
curl \
ca-certificates \
tzdata \
protobuf-compiler && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set timezone
ARG TIMEZONE=Europe/Berlin
ENV TZ=$TIMEZONE
RUN ln -snf /usr/share/zoneinfo/$TIMEZONE /etc/localtime && echo $TIMEZONE > /etc/timezone
# Set working directory
WORKDIR /weather-agent
# Copy requirements and install
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy project files
COPY . .
# Compile .proto file
RUN python3 -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
agent.proto
# Run the server
CMD ["python3", "-u", "server.py"]
syntax = "proto3";
message Empty {
}
enum AgentType {
UNKNOWN = 0;
WEBSCRAPER = 1;
PLACES = 2;
WEATHER = 3;
NEWS = 4;
WIKI = 5;
URBAN_PLANNING = 6;
OTHER = 7;
YOUR_TOOL_NAME_ENUM = 8;
}
message FunctionSchemasResponse {
AgentType agent_type = 1;
repeated string function_schemas = 2;
}
message ToolRequest {
AgentType agent_type = 1;
string tool_name = 2;
string arguments_json = 3;
}
message ToolInvocationResult {
AgentType agent_type = 1;
repeated string results = 2;
}
service ToolService {
rpc SendFunctionSchemas (Empty) returns (stream FunctionSchemasResponse);
rpc InvokeTool (stream ToolRequest) returns (stream ToolInvocationResult);
}
grpcio
grpcio-tools
grpc-interceptor
protobuf
multithreading
requests
fastmcp
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment