Skip to content
Snippets Groups Projects
Commit 471315a7 authored by Swetha Lakshmana Murthy's avatar Swetha Lakshmana Murthy
Browse files

New tutorial for agentic workflow

parent 291e322a
No related branches found
No related tags found
No related merge requests found
Showing
with 1013 additions and 0 deletions
cicd.ai4eu-dev.eu:7444/tutorials/multi-agent-new/chatbot-stream
# Use a slim Python base image
FROM python:3.10-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
build-essential \
gcc \
libffi-dev \
libssl-dev \
git \
curl \
ca-certificates \
tzdata \
protobuf-compiler && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set timezone
ARG TIMEZONE=Europe/Berlin
ENV TZ=$TIMEZONE
RUN ln -snf /usr/share/zoneinfo/$TIMEZONE /etc/localtime && echo $TIMEZONE > /etc/timezone
# Set working directory
WORKDIR /chatbot
# Copy requirements and install
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy project files
COPY . .
# Compile .proto file
RUN python3 -m grpc_tools.protoc \
--proto_path=. \
--python_out=. \
--grpc_python_out=. \
chatbot.proto
# Run the server
CMD ["python3", "-u", "server.py"]
from flask import Flask, render_template, request, redirect, url_for, session
import logging
# Initialize Flask app
app = Flask(__name__)
app.config["SECRET_KEY"] = "chat" # Used for encrypting session data
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# This will hold the user input parameters
parameters = []
# This function will return the parameters list
def get_parameters():
return parameters
@app.route("/", methods=["GET", "POST"])
def chat():
if request.method == "POST":
user_query = request.form.get("user_query", "")
if user_query:
logger.info(f"User query added to parameters: {user_query}")
parameters.clear()
parameters.append(user_query) # Add the new query to the parameters list
try:
app.chatbot_instance.receive_new_query(
user_query
) # Using the shared instance
except Exception as e:
print("Error with chatbot_instance:", e)
return redirect(url_for("chat"))
chat_history = []
try:
with open("chat_log.txt", "r", encoding="utf-8") as log_file:
lines = log_file.readlines()
chat_history = [line.strip() for line in lines if line.strip()]
except FileNotFoundError:
chat_history = ["No conversation history yet."]
return render_template("chat.html", chat_history=chat_history)
#!/bin/bash
# Variables
IMAGE_NAME="chatbot-stream"
REGISTRY_URL="cicd.ai4eu-dev.eu:7444"
REPO_PATH="tutorials/multi-agent-new"
TAG="latest"
# Step 1: Build the Docker image
echo "Building Docker image..."
docker build -t $IMAGE_NAME .
# Step 2: Tag the Docker image
echo "Tagging Docker image..."
docker tag $IMAGE_NAME $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Step 3: Push the Docker image to the registry
echo "Pushing Docker image to registry..."
docker push $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Check if the image was pushed successfully
if [ $? -eq 0 ]; then
echo "Docker image successfully pushed to $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG"
else
echo "Error: Docker push failed!"
exit 1
fi
\ No newline at end of file
syntax = "proto3";
message Empty {
}
message UserQuery {
string text = 1;
}
message AgentResponse {
string text = 1;
}
service Chat {
rpc requestUserQueriesToPlanner(Empty) returns (stream UserQuery);
rpc processAgentResponsesFromPlanner(stream AgentResponse) returns (Empty);
}
\ No newline at end of file
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: chatbot.proto
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n\rchatbot.proto"\x07\n\x05\x45mpty"\x19\n\tUserQuery\x12\x0c\n\x04text\x18\x01 \x01(\t"\x1d\n\rAgentResponse\x12\x0c\n\x04text\x18\x01 \x01(\t2y\n\x04\x43hat\x12\x33\n\x1brequestUserQueriesToPlanner\x12\x06.Empty\x1a\n.UserQuery0\x01\x12<\n processAgentResponsesFromPlanner\x12\x0e.AgentResponse\x1a\x06.Empty(\x01\x62\x06proto3'
)
_EMPTY = DESCRIPTOR.message_types_by_name["Empty"]
_USERQUERY = DESCRIPTOR.message_types_by_name["UserQuery"]
_AGENTRESPONSE = DESCRIPTOR.message_types_by_name["AgentResponse"]
Empty = _reflection.GeneratedProtocolMessageType(
"Empty",
(_message.Message,),
{
"DESCRIPTOR": _EMPTY,
"__module__": "chatbot_pb2",
# @@protoc_insertion_point(class_scope:Empty)
},
)
_sym_db.RegisterMessage(Empty)
UserQuery = _reflection.GeneratedProtocolMessageType(
"UserQuery",
(_message.Message,),
{
"DESCRIPTOR": _USERQUERY,
"__module__": "chatbot_pb2",
# @@protoc_insertion_point(class_scope:UserQuery)
},
)
_sym_db.RegisterMessage(UserQuery)
AgentResponse = _reflection.GeneratedProtocolMessageType(
"AgentResponse",
(_message.Message,),
{
"DESCRIPTOR": _AGENTRESPONSE,
"__module__": "chatbot_pb2",
# @@protoc_insertion_point(class_scope:AgentResponse)
},
)
_sym_db.RegisterMessage(AgentResponse)
_CHAT = DESCRIPTOR.services_by_name["Chat"]
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_EMPTY._serialized_start = 17
_EMPTY._serialized_end = 24
_USERQUERY._serialized_start = 26
_USERQUERY._serialized_end = 51
_AGENTRESPONSE._serialized_start = 53
_AGENTRESPONSE._serialized_end = 82
_CHAT._serialized_start = 84
_CHAT._serialized_end = 205
# @@protoc_insertion_point(module_scope)
# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
"""Client and server classes corresponding to protobuf-defined services."""
import grpc
import chatbot_pb2 as chatbot__pb2
class ChatStub(object):
"""Missing associated documentation comment in .proto file."""
def __init__(self, channel):
"""Constructor.
Args:
channel: A grpc.Channel.
"""
self.requestUserQueriesToPlanner = channel.unary_stream(
"/Chat/requestUserQueriesToPlanner",
request_serializer=chatbot__pb2.Empty.SerializeToString,
response_deserializer=chatbot__pb2.UserQuery.FromString,
)
self.processAgentResponsesFromPlanner = channel.stream_unary(
"/Chat/processAgentResponsesFromPlanner",
request_serializer=chatbot__pb2.AgentResponse.SerializeToString,
response_deserializer=chatbot__pb2.Empty.FromString,
)
class ChatServicer(object):
"""Missing associated documentation comment in .proto file."""
def requestUserQueriesToPlanner(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def processAgentResponsesFromPlanner(self, request_iterator, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")
def add_ChatServicer_to_server(servicer, server):
rpc_method_handlers = {
"requestUserQueriesToPlanner": grpc.unary_stream_rpc_method_handler(
servicer.requestUserQueriesToPlanner,
request_deserializer=chatbot__pb2.Empty.FromString,
response_serializer=chatbot__pb2.UserQuery.SerializeToString,
),
"processAgentResponsesFromPlanner": grpc.stream_unary_rpc_method_handler(
servicer.processAgentResponsesFromPlanner,
request_deserializer=chatbot__pb2.AgentResponse.FromString,
response_serializer=chatbot__pb2.Empty.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler("Chat", rpc_method_handlers)
server.add_generic_rpc_handlers((generic_handler,))
# This class is part of an EXPERIMENTAL API.
class Chat(object):
"""Missing associated documentation comment in .proto file."""
@staticmethod
def requestUserQueriesToPlanner(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_stream(
request,
target,
"/Chat/requestUserQueriesToPlanner",
chatbot__pb2.Empty.SerializeToString,
chatbot__pb2.UserQuery.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
@staticmethod
def processAgentResponsesFromPlanner(
request_iterator,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.stream_unary(
request_iterator,
target,
"/Chat/processAgentResponsesFromPlanner",
chatbot__pb2.AgentResponse.SerializeToString,
chatbot__pb2.Empty.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)
import grpc
import chatbot_pb2
import chatbot_pb2_grpc
def run():
with grpc.insecure_channel("localhost:8061") as channel:
stub = chatbot_pb2_grpc.ChatStub(channel)
print("🌐 Connected to gRPC server")
try:
responses = stub.requestUserQueriesToPlanner(chatbot_pb2.Empty())
for response in responses:
print(f"✅ Received user query: {response.text}")
except grpc.RpcError as e:
print(f"gRPC Error: {e.code()} - {e.details()}")
if __name__ == "__main__":
run()
grpcio==1.38.0
grpcio-tools==1.38.0
grpc-interceptor
protobuf==3.16.0
multithreading
Flask
\ No newline at end of file
import grpc
from concurrent import futures
import logging
import threading
import queue
# Import generated classes
import chatbot_pb2
import chatbot_pb2_grpc
import re
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
port = 8061
class Chatbot(chatbot_pb2_grpc.ChatServicer):
def __init__(self):
super().__init__()
self.allow_send = False
self.query_text = None
self.log_queue = queue.Queue()
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
self.log_thread = threading.Thread(target=self.log_writer)
self.log_thread.daemon = True
self.log_thread.start()
def log_writer(self):
"""Background thread that writes logs asynchronously."""
while True:
query = self.log_queue.get()
try:
with open("chat_log.txt", "a", encoding="utf-8") as log_file:
log_file.write(f"User: {query}\n")
print(f"User query written to chat_log.txt: {query}")
except Exception as e:
print(f"Error writing to log file: {e}")
self.log_queue.task_done()
def receive_new_query(self, query_text):
"""Receives new user query and notifies the streaming generator."""
print(f"receive_new_query called with: {query_text}")
with self.condition:
self.query_text = query_text
self.allow_send = True
self.condition.notify()
def requestUserQueriesToPlanner(self, request, context):
print("⚡ requestUserQueriesToPlanner started (streaming begins)")
while context.is_active():
with self.condition:
while not (self.allow_send and self.query_text):
print("Waiting for new query...")
self.condition.wait(timeout=5)
if not context.is_active():
print("⚠️ Client disconnected.")
return
current_query = self.query_text
self.allow_send = False
self.query_text = None
print(f"🔄 Yielding query: {current_query}")
self.log_queue.put(current_query)
yield chatbot_pb2.UserQuery(text=current_query)
print("❗ Exiting streaming loop - client context is no longer active.")
def processAgentResponsesFromPlanner(self, request_iterator, context):
print("Connected to processAgentResponsesFromPlanner....")
for response in request_iterator:
print(f"Received bot response: {response.text}")
# Format the bot response (you can use HTML tags here)
formatted_response = self.format_bot_response(response.text)
with open("chat_log.txt", "a", encoding="utf-8") as log_file:
log_file.write(f"Bot: {formatted_response}\n")
return chatbot_pb2.Empty()
def format_bot_response(self, response_text):
formatted_response = re.sub(r"\*\*(.*?)\*\*", r"<b>\1</b>", response_text)
formatted_response = formatted_response.replace("\n", "<br>")
formatted_response = re.sub(
r"(https?://[^\s]+)",
r'<a href="\1" target="_blank">\1</a>',
formatted_response,
)
formatted_response = re.sub(r"\d+\.\s", "", formatted_response)
formatted_response = formatted_response.replace("- ", "")
return formatted_response
chatbot_instance = Chatbot() # Global and shared
def serve():
# gRPC Server setup
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
chatbot_pb2_grpc.add_ChatServicer_to_server(chatbot_instance, server)
server.add_insecure_port(f"[::]:{port}")
logger.info(f"Starting streaming server on port {port}")
server.start()
threading.Thread(target=app_run).start()
server.wait_for_termination()
def app_run():
from app import app
app.chatbot_instance = chatbot_instance
app.run(host="0.0.0.0", port=8062, debug=False)
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
serve()
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Travel Advisor</title>
<!-- Leaflet.js CSS -->
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.7.1/dist/leaflet.css" />
<!-- Font Awesome for icons -->
<link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.0/css/all.min.css">
<style>
body, html {
height: 100%;
margin: 0;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
/* Create the map container to be the background */
#map {
position: absolute;
top: 0;
left: 0;
right: 0;
bottom: 0;
z-index: -1; /* Ensure the map is behind everything else */
}
.overlay {
background: rgba(255, 255, 255, 0.85);
min-height: 100vh;
padding: 40px 20px;
display: flex;
flex-direction: column;
align-items: center;
z-index: 1;
}
h1 {
font-size: 2.5em;
margin-bottom: 10px;
color: #2c3e50;
}
form {
display: flex;
background: #fff;
padding: 15px 20px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.15);
width: 100%;
max-width: 600px;
margin-bottom: 30px;
gap: 10px;
}
input[type="text"] {
flex: 1;
padding: 12px;
font-size: 16px;
border: 1px solid #ccc;
border-radius: 6px;
}
button {
background-color: #3498db;
color: white;
border: none;
border-radius: 6px;
padding: 12px 20px;
font-size: 16px;
cursor: pointer;
transition: background-color 0.3s ease;
}
button:hover {
background-color: #2980b9;
}
.chat-box {
background: white;
overflow-y: auto;
padding: 15px;
border-radius: 8px;
width: 100%;
max-width: 600px;
box-shadow: 0 2px 10px rgba(0, 0, 0, 0.1);
font-size: 14px; /* Reduced font size for better readability */
word-wrap: break-word; /* Ensures words break appropriately */
}
.chat-line {
padding: 8px 12px; /* Reduced padding */
margin: 8px 0; /* Added margin for separation between lines */
border-radius: 8px;
display: flex;
flex-direction: column; /* Ensure messages are stacked vertically */
align-items: flex-start; /* Align text to the left */
gap: 8px; /* Reduced gap between text and icon */
white-space: pre-wrap; /* Ensures line breaks are respected */
max-width: 100%; /* Ensures lines don’t break out of the container */
word-wrap: break-word; /* Allow words to wrap within the container */
overflow-wrap: break-word; /* Forces long words to break and wrap */
word-break: break-all; /* Breaks long words at any point */
}
.user {
text-align: right;
background-color: #d0f0fd;
align-self: flex-end;
}
.bot {
text-align: left;
background-color: #fef1e6;
align-self: flex-start;
}
.chat-line i {
margin-right: 8px; /* Reduced icon margin */
color: #555;
}
/* Smaller font size for mobile screens */
@media (max-width: 600px) {
.chat-box {
font-size: 12px; /* Even smaller text on smaller screens */
}
}
</style>
</head>
<body>
<div id="map"></div> <!-- Map container for interactive background -->
<div class="overlay">
<h1><i class="fas fa-plane-departure"></i> Travel Advisor</h1>
<form method="POST">
<input type="text" id="user_query" name="user_query" placeholder="Ask right away!..." required>
<button type="submit"><i class="fas fa-paper-plane"></i></button>
</form>
<div class="chat-box">
<h2><i class="fas fa-comments"></i> Chat History</h2>
{% for line in chat_history %}
<p class="chat-line {{ 'user' if 'User:' in line else 'bot' }}">
<i class="fas {{ 'fa-user' if 'User:' in line else 'fa-robot' }}"></i>
<!-- Using the |safe filter to render HTML content -->
{{ line|safe }}
</p>
{% endfor %}
</div>
</div>
<script src="https://unpkg.com/leaflet@1.7.1/dist/leaflet.js"></script>
<script>
var map = L.map('map').setView([51.505, -0.09], 2);
L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {
attribution: '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors'
}).addTo(map);
L.marker([51.505, -0.09]).addTo(map)
.bindPopup("<b>Welcome!</b><br>Ask me about your next trip.")
.openPopup();
</script>
<script>
window.onload = function() {
const chatBox = document.querySelector('.chat-box');
chatBox.scrollTop = chatBox.scrollHeight;
};
</script>
</body>
</html>
cicd.ai4eu-dev.eu:7444/tutorials/multi-agent-new/food-rec-agent-s
\ No newline at end of file
# Use a slim Python base image
FROM python:3.10-slim
ENV PYTHONUNBUFFERED=1
RUN apt-get update -y && \
apt-get install -y --no-install-recommends \
build-essential \
gcc \
libffi-dev \
libssl-dev \
git \
curl \
ca-certificates \
tzdata \
protobuf-compiler && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Set timezone
ARG TIMEZONE=Europe/Berlin
ENV TZ=$TIMEZONE
RUN ln -snf /usr/share/zoneinfo/$TIMEZONE /etc/localtime && echo $TIMEZONE > /etc/timezone
# Set working directory
WORKDIR /food-agent
# Copy requirements and install
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy project files
COPY . .
# Compile .proto file
RUN python3 -m grpc_tools.protoc \
--proto_path=. \
--python_out=. \
--grpc_python_out=. \
agent.proto
# Run the server
CMD ["python3", "-u", "server.py"]
syntax = "proto3";
message Empty {
}
message UserQuery {
string text = 1;
}
message AgentRequest {
string text = 1;
AgentType agent_type = 2;
}
enum AgentType {
UNKNOWN = 0;
WEATHER = 1;
PLACES = 2;
FOOD = 3;
MAP = 4;
}
message AgentResponse {
string text = 1;
}
service Agent {
rpc evaluateAgentRequestsFromPlanner(stream AgentRequest) returns (stream AgentResponse);
}
\ No newline at end of file
#!/bin/bash
# Variables
IMAGE_NAME="food-rec-agent-s"
REGISTRY_URL="cicd.ai4eu-dev.eu:7444"
REPO_PATH="tutorials/multi-agent-new"
TAG="latest"
# Step 1: Build the Docker image
echo "Building Docker image..."
docker build -t $IMAGE_NAME .
# Step 2: Tag the Docker image
echo "Tagging Docker image..."
docker tag $IMAGE_NAME $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Step 3: Push the Docker image to the registry
echo "Pushing Docker image to registry..."
docker push $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG
# Check if the image was pushed successfully
if [ $? -eq 0 ]; then
echo "Docker image successfully pushed to $REGISTRY_URL/$REPO_PATH/$IMAGE_NAME:$TAG"
else
echo "Error: Docker push failed!"
exit 1
fi
\ No newline at end of file
import grpc
import agent_pb2
import agent_pb2_grpc
def run():
channel = grpc.insecure_channel("localhost:8061")
stub = agent_pb2_grpc.AgentStub(channel)
user_query = input("Please enter your query (e.g., 'Is it sunny in Rome?'): ")
request = agent_pb2.UserQuery(text=user_query)
response = stub.ProcessUserQuery(request)
print("Response from agent:", response.text)
channel.close()
if __name__ == "__main__":
run()
grpcio==1.38.0
grpcio-tools==1.38.0
grpc-interceptor
protobuf==3.16.0
multithreading
openai
requests
\ No newline at end of file
import grpc
import json
from concurrent import futures
import agent_pb2
import agent_pb2_grpc
from tool_calling.tools import get_food_recommendations, tools
from tool_calling.llm_client import client, deployment_name
def run_conversation(prompt: str) -> str:
messages = [{"role": "user", "content": prompt}]
response = client.chat.completions.create(
model=deployment_name, messages=messages, tools=tools, tool_choice="required"
)
response_message = response.choices[0].message
messages.append(response_message)
print("\nModel's Response:")
print(response_message)
if response_message.tool_calls:
for tool_call in response_message.tool_calls:
function_name = tool_call.function.name
function_args = json.loads(tool_call.function.arguments)
print("------------------TOOL CALLING---------------------------\n")
print(f"\nFunction Call: {function_name}")
print(f"Arguments: {function_args}")
print("------------------TOOL CALLING---------------------------\n")
if function_name == "get_food_recommendations":
function_response = get_food_recommendations(
place=function_args.get("place")
)
else:
function_response = json.dumps({"error": "Unknown function"})
messages.append(
{
"tool_call_id": tool_call.id,
"role": "tool",
"name": function_name,
"content": function_response,
}
)
else:
print("No tool calls were made by the model.")
# Second API call: Get final response
final_response = client.chat.completions.create(
model=deployment_name,
messages=messages,
)
return final_response.choices[0].message.content
class FoodRecAgent(agent_pb2_grpc.AgentServicer):
def __init__(self):
self.latest_food_info = None
def evaluateAgentRequestsFromPlanner(self, request_iterator, context):
for request in request_iterator:
if request.agent_type == agent_pb2.FOOD:
print(f"\nReceived query: {request.text}")
food_info = run_conversation(request.text)
if food_info is None:
food_info = "No food rec. info available."
self.latest_food_info = food_info
print(f"Food Rec. info set to: {self.latest_food_info}")
response = agent_pb2.AgentResponse(text=food_info)
yield response
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
agent_pb2_grpc.add_AgentServicer_to_server(FoodRecAgent(), server)
port = 8061
server.add_insecure_port(f"[::]:{port}")
print(f"Food Rec. agent server started on port {port}...")
server.start()
server.wait_for_termination()
if __name__ == "__main__":
serve()
from openai import OpenAI
import os
from base64 import b64encode
platform = "OpenAI"
# "Llama-3.3-70B-Instruct"
# "gpt-4o-mini"
deployment_name = "gpt-4o-mini"
if platform == "GenAI":
# Initialize OpenAI client
genai_username = os.getenv("IAIS_GENAI_USERNAME")
genai_password = os.getenv("IAIS_GENAI_PASSWORD")
token_string = f"{genai_username}:{genai_password}"
token_bytes = b64encode(token_string.encode())
client = OpenAI(
api_key="xxxx",
default_headers={"Authorization": f"Basic {token_bytes.decode()}"},
base_url="https://genai.iais.fraunhofer.de/api/v2",
)
elif platform == "OpenAI":
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")
# os.environ["OPENAI_ORG_ID"] = os.getenv("OPENAI_ORG_ID")
client = OpenAI()
import requests
import json
tools = [
{
"type": "function",
"function": {
"name": "get_food_recommendations",
"description": "Fetches food recommendations for a given place using the OpenFoodFacts API.",
"parameters": {
"type": "object",
"properties": {
"place": {
"type": "string",
"description": "The place to search for food recommendations.",
},
"limit": {
"type": "integer",
"description": "The maximum number of food recommendations to return.",
"default": 20,
},
},
"required": ["place"],
"additionalProperties": False,
},
},
}
]
import requests
import json
from typing import List
def get_food_recommendations(place: str, limit: int = 20) -> str:
"""
Fetches food product recommendations from OpenFoodFacts based on a given place.
Args:
place (str): The place or keyword to search for food-related items.
limit (int): Maximum number of recommendations to return.
Returns:
str: A JSON-formatted string containing the status and either data or error message.
"""
api_url = "https://world.openfoodfacts.org/cgi/search.pl"
params = {"search_terms": place, "action": "process", "json": 1, "page_size": 100}
try:
response = requests.get(api_url, params=params, timeout=10)
response.raise_for_status()
except requests.RequestException as e:
return json.dumps(
{"status": "error", "message": f"Error fetching data: {str(e)}"}, indent=4
)
data = response.json()
products = data.get("products", [])
valid_categories = {
"foods",
"beverages",
"snacks",
"dairy",
"cheese",
"meat",
"sweets",
"drinks",
"biscuits",
"cereals",
"bread",
"pasta",
"rice",
"spreads",
"chocolates",
"cookies",
"candies",
"sauces",
"soups",
"juices",
"alcoholic beverages",
}
recommendations: List[str] = []
for product in products:
name = product.get("product_name", "").strip()
categories = product.get("categories_tags", [])
if name and any(
cat.replace("en:", "") in valid_categories for cat in categories
):
recommendations.append(name)
if len(recommendations) >= limit:
break
if recommendations:
return json.dumps({"status": "success", "data": recommendations}, indent=4)
else:
return json.dumps(
{
"status": "error",
"message": f"No valid food recommendations found for '{place}'.",
},
indent=4,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment