Skip to content
Snippets Groups Projects
Commit 17fd23b2 authored by Swetha Lakshmana Murthy's avatar Swetha Lakshmana Murthy
Browse files

Added: Agentic Event Planner

parent 6ec48b84
No related branches found
No related tags found
No related merge requests found
Showing
with 1003 additions and 0 deletions
# Use a slim Python base image
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
build-essential \
gcc \
libffi-dev \
libssl-dev \
git \
curl \
ca-certificates \
tzdata \
protobuf-compiler && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set timezone
ARG TIMEZONE=Europe/Berlin
ENV TZ=$TIMEZONE
RUN ln -snf /usr/share/zoneinfo/$TIMEZONE /etc/localtime && echo $TIMEZONE > /etc/timezone
# Set working directory
WORKDIR /places-agent
# Copy requirements and install
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy project files
COPY . .
# Compile .proto file
RUN python3 -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
agent.proto
# Run the server
CMD ["python3", "-u", "server.py"]
syntax = "proto3";
message Empty {
}
enum AgentType {
UNKNOWN = 0;
EVENT = 1;
PLACES = 2;
WEATHER = 3;
}
message FunctionSchemasResponse {
AgentType agent_type = 1;
repeated string function_schemas = 2;
}
message ToolRequest {
AgentType agent_type = 1;
string tool_name = 2;
string arguments_json = 3;
}
message ToolInvocationResult {
AgentType agent_type = 1;
repeated string results = 2;
}
service ToolService {
rpc SendFunctionSchemas (Empty) returns (stream FunctionSchemasResponse);
rpc InvokeTool (stream ToolRequest) returns (stream ToolInvocationResult);
}
#!/bin/bash
# Variables
IMAGE_NAME="mcp-places-agent"
REGISTRY_URL="cicd.ai4eu-dev.eu:7444"
REPO_PATH="tutorials/mcp-agent"
TAG="latest"
# Step 1: Build the Docker image
echo "Building Docker image..."
docker build -t $IMAGE_NAME .
# Step 2: Tag the Docker image
echo "Tagging Docker image..."
docker tag $IMAGE_NAME $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Step 3: Push the Docker image to the registry
echo "Pushing Docker image to registry..."
docker push $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Check if the image was pushed successfully
if [ $? -eq 0 ]; then
echo "Docker image successfully pushed to $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG"
else
echo "Error: Docker push failed!"
exit 1
fi
\ No newline at end of file
# server.py
from fastmcp import FastMCP
import json
import requests
from geopy.geocoders import Nominatim
# Create MCP server
mcp = FastMCP("MapExplorer")
print(f"Starting server {mcp.name}")
# Initialize geolocator
geolocator = Nominatim(user_agent="map_explorer_app")
@mcp.tool()
def places_and_attractions_by_address(address: str):
"""
Given an address, returns nearby tourist attractions and local amenities
using OpenStreetMap's Overpass API. Returns JSON summary only.
"""
max_results = 20
radius = 500 # in meters
geolocator = Nominatim(user_agent="place_finder")
# Geocode the input address
location = geolocator.geocode(address)
if not location:
return json.dumps({"error": f"Address '{address}' not found"}, indent=4)
lat, lon = location.latitude, location.longitude
def query_overpass(tag: str):
query = f"""
[out:json];
(
node["{tag}"](around:{radius},{lat},{lon});
way["{tag}"](around:{radius},{lat},{lon});
relation["{tag}"](around:{radius},{lat},{lon});
);
out center;
"""
try:
res = requests.get("http://overpass-api.de/api/interpreter", params={"data": query})
res.raise_for_status()
return res.json().get("elements", [])
except requests.exceptions.RequestException as e:
return {"error": f"Error fetching {tag} data: {str(e)}"}
def extract_elements(elements, tag_type):
result_list = []
for el in elements[:max_results]:
tags = el.get("tags", {})
name = tags.get("name")
item_type = tags.get(tag_type, f"Unknown {tag_type.capitalize()}")
lat_el = el.get("lat") or el.get("center", {}).get("lat")
lon_el = el.get("lon") or el.get("center", {}).get("lon")
if name and lat_el and lon_el:
result_list.append({
"name": name,
"type": item_type,
"latitude": lat_el,
"longitude": lon_el
})
return result_list
# Query data
attraction_elements = query_overpass("tourism")
if isinstance(attraction_elements, dict): return json.dumps(attraction_elements, indent=4)
amenity_elements = query_overpass("amenity")
if isinstance(amenity_elements, dict): return json.dumps(amenity_elements, indent=4)
# Extract results
attractions = extract_elements(attraction_elements, "tourism")
amenities = extract_elements(amenity_elements, "amenity")
return json.dumps({
"location": {"latitude": lat, "longitude": lon},
"attractions": attractions,
"local_amenities": amenities
}, indent=4)
@mcp.prompt()
def map_prompt(mes_map: str) -> str:
"""
Prompt for generating a location-based search map.
"""
return f"Show interesting places and hidden gems near the address '{mes_map}' within walking distance."
grpcio
grpcio-tools
grpc-interceptor
protobuf
multithreading
requests
fastmcp
folium
geopy
\ No newline at end of file
import asyncio
import json
import grpc
from fastmcp import Client
from mcp_map import mcp
import agent_pb2
import agent_pb2_grpc
class ToolService(agent_pb2_grpc.ToolServiceServicer):
def __init__(self):
self.last_sent_function_schemas = None
async def SendFunctionSchemas(self, request, context):
print("📥 Received request for SendFunctionSchemas (streaming).")
print(context.peer())
while True:
client = Client(mcp)
async with client:
available_tools = await client.list_tools()
print(available_tools)
serialized_schemas = []
for tool in available_tools:
function_schema_dict = {
"name": tool.name,
"description": tool.description or tool.name,
"parameters": {
"type": "object",
"properties": {}
},
"required": []
}
if hasattr(tool, 'inputSchema') and tool.inputSchema:
if tool.inputSchema.get("type") == "object":
function_schema_dict["parameters"] = tool.inputSchema
serialized_schemas.append(json.dumps(function_schema_dict))
schema_update = agent_pb2.FunctionSchemasResponse(
agent_type=agent_pb2.PLACES,
function_schemas=serialized_schemas
)
print("🔄 Sending schema update (heartbeat or change).")
yield schema_update
async def InvokeTool(self, request_iterator, context):
"""
Processes tool invocation requests from a client stream and returns tool results.
"""
print("📥 Received InvokeTool stream")
async for tool_request in request_iterator:
tool_name = tool_request.tool_name
agent_type = tool_request.agent_type # Enum value, e.g., 1 for EVENT
arguments = json.loads(tool_request.arguments_json)
print(f"🔧 Invoking tool: {tool_name}")
print(f"🧾 Arguments as dict: {arguments}")
print(f"👤 Agent Type: {agent_type}")
# Check if the agent type matches EVENT (or any condition you want)
if agent_type == agent_pb2.PLACES:
try:
client = Client(mcp)
async with client:
tool_result = await client.call_tool(tool_name, arguments)
print(f"✅ Tool invocation result: {tool_result}")
yield agent_pb2.ToolInvocationResult(results=[str(tool_result)], agent_type=agent_type)
except Exception as e:
print(f"⚠️ Failed to invoke tool '{tool_name}': {str(e)}")
else:
print(f"🚫 Tool not allowed for agent type {agent_type}, skipping...")
print("✅ Finished processing all InvokeTool requests.")
async def serve():
"""
Starts the gRPC server and listens for incoming requests.
"""
server = grpc.aio.server()
agent_pb2_grpc.add_ToolServiceServicer_to_server(ToolService(), server)
server.add_insecure_port("[::]:8061")
print("🚀 gRPC server listening on port 8061...")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
# Use a slim Python base image
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
build-essential \
gcc \
libffi-dev \
libssl-dev \
git \
curl \
ca-certificates \
tzdata \
protobuf-compiler && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set timezone
ARG TIMEZONE=Europe/Berlin
ENV TZ=$TIMEZONE
RUN ln -snf /usr/share/zoneinfo/$TIMEZONE /etc/localtime && echo $TIMEZONE > /etc/timezone
# Set working directory
WORKDIR /weather-agent
# Copy requirements and install
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy project files
COPY . .
# Compile .proto file
RUN python3 -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
agent.proto
# Run the server
CMD ["python3", "-u", "server.py"]
syntax = "proto3";
message Empty {
}
enum AgentType {
UNKNOWN = 0;
EVENT = 1;
PLACES = 2;
WEATHER = 3;
}
message FunctionSchemasResponse {
AgentType agent_type = 1;
repeated string function_schemas = 2;
}
message ToolRequest {
AgentType agent_type = 1;
string tool_name = 2;
string arguments_json = 3;
}
message ToolInvocationResult {
AgentType agent_type = 1;
repeated string results = 2;
}
service ToolService {
rpc SendFunctionSchemas (Empty) returns (stream FunctionSchemasResponse);
rpc InvokeTool (stream ToolRequest) returns (stream ToolInvocationResult);
}
#!/bin/bash
# Variables
IMAGE_NAME="mcp-weather-agent"
REGISTRY_URL="cicd.ai4eu-dev.eu:7444"
REPO_PATH="tutorials/mcp-agent"
TAG="latest"
# Step 1: Build the Docker image
echo "Building Docker image..."
docker build -t $IMAGE_NAME .
# Step 2: Tag the Docker image
echo "Tagging Docker image..."
docker tag $IMAGE_NAME $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Step 3: Push the Docker image to the registry
echo "Pushing Docker image to registry..."
docker push $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Check if the image was pushed successfully
if [ $? -eq 0 ]; then
echo "Docker image successfully pushed to $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG"
else
echo "Error: Docker push failed!"
exit 1
fi
\ No newline at end of file
from fastmcp import FastMCP
from geopy.geocoders import Nominatim
from timezonefinder import TimezoneFinder
import openmeteo_requests
from openmeteo_sdk.Variable import Variable
import openmeteo_requests
from geopy.exc import GeocoderTimedOut, GeocoderUnavailable
# MCP setup
mcp = FastMCP("WeatherForecastService")
print(f"Starting server {mcp.name}")
# Initialize once
geolocator = Nominatim(user_agent="weather_forecast_range")
tz_finder = TimezoneFinder()
# Add a weather forecast tool
@mcp.tool()
def get_weather_forecast(location: str) -> str:
# Step 1: Geocode the place name
try:
location = geolocator.geocode(location)
except (GeocoderTimedOut, GeocoderUnavailable) as e:
print(f"Error during geocoding: {e}.")
if location is None:
raise ValueError(f"Location '{location}' not found.")
# Step 2: Set up Open-Meteo parameters
om = openmeteo_requests.Client()
params = {
"latitude": location.latitude,
"longitude": location.longitude,
"hourly": ["temperature_2m", "precipitation", "wind_speed_10m"],
"current": ["temperature_2m", "relative_humidity_2m"]
}
# Step 3: Call the weather API
responses = om.weather_api("https://api.open-meteo.com/v1/forecast", params=params)
response = responses[0]
# Step 4: Extract current weather data
current = response.Current()
current_variables = [current.Variables(i) for i in range(current.VariablesLength())]
current_temperature_2m = next(
x for x in current_variables if x.Variable() == Variable.temperature and x.Altitude() == 2
)
current_relative_humidity_2m = next(
x for x in current_variables if x.Variable() == Variable.relative_humidity and x.Altitude() == 2
)
# Step 5: Return or print results
return {
"place": location,
"latitude": response.Latitude(),
"longitude": response.Longitude(),
"elevation": response.Elevation(),
"timezone": response.Timezone(),
"timezone_abbreviation": response.TimezoneAbbreviation(),
"utc_offset_seconds": response.UtcOffsetSeconds(),
"current_time": current.Time(),
"temperature_2m": current_temperature_2m.Value(),
"relative_humidity_2m": current_relative_humidity_2m.Value()
}
@mcp.prompt()
def weather_prompt(mes_weather: str) -> str:
"""Create a more intelligent prompt for obtaining weather data"""
return f"Retrieve detailed current weather and next 24h forecast for the location mentioned in: '{mes_weather}'."
grpcio
grpcio-tools
grpc-interceptor
protobuf
multithreading
requests
fastmcp
geopy
timezonefinder
openmeteo_requests
\ No newline at end of file
import asyncio
import json
import grpc
from fastmcp import Client
from mcp_weather import mcp
import agent_pb2
import agent_pb2_grpc
class ToolService(agent_pb2_grpc.ToolServiceServicer):
def __init__(self):
self.last_sent_function_schemas = None
async def SendFunctionSchemas(self, request, context):
print("📥 Received request for SendFunctionSchemas (streaming).")
print(context.peer())
while True:
client = Client(mcp)
async with client:
available_tools = await client.list_tools()
serialized_schemas = []
for tool in available_tools:
function_schema_dict = {
"name": tool.name,
"description": tool.description or tool.name,
"parameters": {
"type": "object",
"properties": {}
},
"required": []
}
if hasattr(tool, 'inputSchema') and tool.inputSchema:
if tool.inputSchema.get("type") == "object":
function_schema_dict["parameters"] = tool.inputSchema
serialized_schemas.append(json.dumps(function_schema_dict))
schema_update = agent_pb2.FunctionSchemasResponse(
agent_type=agent_pb2.WEATHER,
function_schemas=serialized_schemas
)
print("🔄 Sending schema update (heartbeat or change).")
yield schema_update
async def InvokeTool(self, request_iterator, context):
"""
Processes tool invocation requests from a client stream and returns tool results.
"""
print("📥 Received InvokeTool stream")
async for tool_request in request_iterator:
tool_name = tool_request.tool_name
agent_type = tool_request.agent_type # Enum value, e.g., 1 for EVENT
arguments = json.loads(tool_request.arguments_json)
print(f"🔧 Invoking tool: {tool_name}")
print(f"🧾 Arguments as dict: {arguments}")
print(f"👤 Agent Type: {agent_type}")
# Check if the agent type matches EVENT (or any condition you want)
if agent_type == agent_pb2.WEATHER:
try:
client = Client(mcp)
async with client:
tool_result = await client.call_tool(tool_name, arguments)
print(f"✅ Tool invocation result: {tool_result}")
yield agent_pb2.ToolInvocationResult(results=[str(tool_result)], agent_type=agent_type)
except Exception as e:
print(f"⚠️ Failed to invoke tool '{tool_name}': {str(e)}")
else:
print(f"🚫 Tool not allowed for agent type {agent_type}, skipping...")
print("✅ Finished processing all InvokeTool requests.")
async def serve():
"""
Starts the gRPC server and listens for incoming requests.
"""
server = grpc.aio.server()
agent_pb2_grpc.add_ToolServiceServicer_to_server(ToolService(), server)
server.add_insecure_port("[::]:8061")
print("🚀 gRPC server listening on port 8061...")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
# Use a slim Python base image
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
build-essential \
gcc \
libffi-dev \
libssl-dev \
git \
curl \
ca-certificates \
tzdata \
protobuf-compiler && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set timezone
ARG TIMEZONE=Europe/Berlin
ENV TZ=$TIMEZONE
RUN ln -snf /usr/share/zoneinfo/$TIMEZONE /etc/localtime && echo $TIMEZONE > /etc/timezone
# Set working directory
WORKDIR /event-details-agent
# Copy requirements and install
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy project files
COPY . .
# Compile .proto file
RUN python3 -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
agent.proto
# Run the server
CMD ["python3", "-u", "server.py"]
syntax = "proto3";
message Empty {
}
enum AgentType {
UNKNOWN = 0;
EVENT = 1;
PLACES = 2;
WEATHER = 3;
}
message FunctionSchemasResponse {
AgentType agent_type = 1;
repeated string function_schemas = 2;
}
message ToolRequest {
AgentType agent_type = 1;
string tool_name = 2;
string arguments_json = 3;
}
message ToolInvocationResult {
AgentType agent_type = 1;
repeated string results = 2;
}
service ToolService {
rpc SendFunctionSchemas (Empty) returns (stream FunctionSchemasResponse);
rpc InvokeTool (stream ToolRequest) returns (stream ToolInvocationResult);
}
#!/bin/bash
# Variables
IMAGE_NAME="mcp-scraper-agent"
REGISTRY_URL="cicd.ai4eu-dev.eu:7444"
REPO_PATH="tutorials/mcp-agent"
TAG="latest"
# Step 1: Build the Docker image
echo "Building Docker image..."
docker build -t $IMAGE_NAME .
# Step 2: Tag the Docker image
echo "Tagging Docker image..."
docker tag $IMAGE_NAME $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Step 3: Push the Docker image to the registry
echo "Pushing Docker image to registry..."
docker push $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Check if the image was pushed successfully
if [ $? -eq 0 ]; then
echo "Docker image successfully pushed to $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG"
else
echo "Error: Docker push failed!"
exit 1
fi
\ No newline at end of file
# server.py
from fastmcp import FastMCP
import requests
from bs4 import BeautifulSoup
import re
import json
# Create an MCP server
mcp = FastMCP("WebScraper")
print(f"Starting server {mcp.name}")
# Add a web-scraping tool
@mcp.tool()
def scrape_and_clean(url: str):
"""Scrapes and extracts structured content, headings, and text from a webpage."""
try:
headers = {"User-Agent": "Mozilla/5.0 (compatible; WebScraperBot/1.0)"}
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
# Remove scripts, styles, noscripts
for tag in soup(["script", "style", "noscript", "iframe"]):
tag.decompose()
# Extract main page text (cleaned and trimmed)
text = ' '.join(soup.stripped_strings)
text = re.sub(r'\s+', ' ', text)[:12000] # Trim to prevent overload
# Extract headings
headings = [h.get_text(strip=True) for h in soup.find_all(['h1', 'h2', 'h3'])]
# Extract structured data (JSON-LD)
structured_data = []
for script in soup.find_all('script', type='application/ld+json'):
try:
raw_data = script.string
if raw_data:
data = json.loads(raw_data)
if isinstance(data, list):
structured_data.extend(data)
else:
structured_data.append(data)
except (json.JSONDecodeError, TypeError):
continue
return text
except Exception as e:
return e
@mcp.prompt()
def webscraper_prompt(mes_web_scraper: str) -> str:
"""Create an intelligent webscraper prompt."""
return f"Extract the main content, headings, and structured data from the following URL or query: '{mes_web_scraper}'."
grpcio
grpcio-tools
grpc-interceptor
protobuf
multithreading
requests
fastmcp
bs4
\ No newline at end of file
import asyncio
import json
import grpc
from fastmcp import Client
from mcp_web_scraper import mcp
import agent_pb2
import agent_pb2_grpc
class ToolService(agent_pb2_grpc.ToolServiceServicer):
def __init__(self):
self.last_sent_function_schemas = None
async def SendFunctionSchemas(self, request, context):
print("📥 Received request for SendFunctionSchemas (streaming).")
print(context.peer())
while True:
client = Client(mcp)
async with client:
available_tools = await client.list_tools()
serialized_schemas = []
for tool in available_tools:
function_schema_dict = {
"name": tool.name,
"description": tool.description or tool.name,
"parameters": {
"type": "object",
"properties": {}
},
"required": []
}
if hasattr(tool, 'inputSchema') and tool.inputSchema:
if tool.inputSchema.get("type") == "object":
function_schema_dict["parameters"] = tool.inputSchema
serialized_schemas.append(json.dumps(function_schema_dict))
schema_update = agent_pb2.FunctionSchemasResponse(
agent_type=agent_pb2.EVENT,
function_schemas=serialized_schemas
)
print("🔄 Sending schema update (heartbeat or change).")
yield schema_update
async def InvokeTool(self, request_iterator, context):
"""
Processes tool invocation requests from a client stream and returns tool results.
"""
print("📥 Received InvokeTool stream")
async for tool_request in request_iterator:
tool_name = tool_request.tool_name
agent_type = tool_request.agent_type # Enum value, e.g., 1 for EVENT
arguments = json.loads(tool_request.arguments_json)
print(f"🔧 Invoking tool: {tool_name}")
print(f"🧾 Arguments as dict: {arguments}")
print(f"👤 Agent Type: {agent_type}")
# Check if the agent type matches EVENT (or any condition you want)
if agent_type == agent_pb2.EVENT:
try:
client = Client(mcp)
async with client:
tool_result = await client.call_tool(tool_name, arguments)
print(f"✅ Tool invocation result: {tool_result}")
yield agent_pb2.ToolInvocationResult(results=[str(tool_result)], agent_type=agent_type)
except Exception as e:
print(f"⚠️ Failed to invoke tool '{tool_name}': {str(e)}")
else:
print(f"🚫 Tool not allowed for agent type {agent_type}, skipping...")
print("✅ Finished processing all InvokeTool requests.")
async def serve():
"""
Starts the gRPC server and listens for incoming requests.
"""
server = grpc.aio.server()
agent_pb2_grpc.add_ToolServiceServicer_to_server(ToolService(), server)
server.add_insecure_port("[::]:8061")
print("🚀 gRPC server listening on port 8061...")
await server.start()
await server.wait_for_termination()
if __name__ == "__main__":
asyncio.run(serve())
# Use a slim Python base image
FROM python:3.12-slim
ENV PYTHONUNBUFFERED=1
# Install system dependencies
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
build-essential \
gcc \
libffi-dev \
libssl-dev \
git \
curl \
ca-certificates \
tzdata \
protobuf-compiler && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set timezone
ARG TIMEZONE=Europe/Berlin
ENV TZ=$TIMEZONE
RUN ln -snf /usr/share/zoneinfo/$TIMEZONE /etc/localtime && echo $TIMEZONE > /etc/timezone
# Create and set working directory
WORKDIR /planner
# Copy requirements and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy the rest of the project files
COPY . .
# Compile the protobuf file
RUN python3 -m grpc_tools.protoc \
-I. \
--python_out=. \
--grpc_python_out=. \
planner.proto
# Expose ports
EXPOSE 8061 8062
# Run the gRPC + Flask server
CMD ["python3", "-u", "server.py"]
from flask import Flask, render_template, request, redirect, url_for, session, jsonify, render_template_string
import logging
# Initialize Flask app
app = Flask(__name__)
app.config["SECRET_KEY"] = "chat" # Used for encrypting session data
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# This will hold the user input parameters
parameters = []
# This function will return the parameters list
def get_parameters():
return parameters
@app.route("/", methods=["GET", "POST"])
def chat():
if request.method == "POST":
user_query = request.form.get("user_query", "")
if user_query:
logger.info(f"User query added to parameters: {user_query}")
parameters.clear()
parameters.append(user_query) # Add the new query to the parameters list
try:
app.chatbot_instance.receive_new_query(
user_query
) # Using the shared instance
except Exception as e:
print("Error with chatbot_instance:", e)
return redirect(url_for("chat"))
chat_history = []
try:
with open("chat_log.txt", "r", encoding="utf-8") as log_file:
lines = log_file.readlines()
chat_history = [line.strip() for line in lines if line.strip()]
except FileNotFoundError:
chat_history = ["No conversation history yet."]
return render_template("chat.html", chat_history=chat_history)
@app.route("/ask", methods=["POST"])
def ask():
data = request.get_json()
user_query = data.get("user_query", "")
if user_query:
logger.info(f"User query received via AJAX: {user_query}")
parameters.clear()
parameters.append(user_query)
try:
app.chatbot_instance.receive_new_query(user_query)
except Exception as e:
logger.error("Error with chatbot_instance: %s", e)
return jsonify({"success": False, "error": str(e)}), 500
return jsonify({"success": True})
@app.route("/chat_history", methods=["GET"])
def chat_history():
try:
with open("chat_log.txt", "r", encoding="utf-8") as log_file:
lines = log_file.readlines()
chat_history = [line.strip() for line in lines if line.strip()]
except FileNotFoundError:
chat_history = ["No conversation history yet."]
html = render_template_string('''
<h2><i class="fas fa-comments"></i> Chat History</h2>
{% for line in chat_history %}
<p class="chat-line {{ 'user' if 'User:' in line else 'bot' }}">
<i class="fas {{ 'fa-user' if 'User:' in line else 'fa-robot' }}"></i>
{{ line|safe }}
</p>
{% endfor %}
''', chat_history=chat_history)
return html
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment