File size: 8,337 Bytes
d507388
412346b
2b884cd
d507388
453e0ef
2b884cd
 
d507388
 
 
 
 
 
2b884cd
d507388
 
 
 
 
 
 
 
 
 
 
2b884cd
d507388
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
453e0ef
d507388
 
 
453e0ef
 
d507388
 
453e0ef
 
d507388
 
 
 
 
 
 
 
453e0ef
d507388
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2b884cd
d507388
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
412346b
d507388
 
 
 
 
412346b
d507388
 
 
 
 
 
 
 
 
 
453e0ef
d507388
 
2b884cd
d507388
 
 
 
 
 
 
 
 
2b884cd
d507388
 
 
 
 
 
 
 
453e0ef
d507388
453e0ef
2b884cd
d507388
 
 
412346b
 
d507388
412346b
 
 
 
 
d507388
 
412346b
d507388
2b884cd
d507388
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from fastapi import FastAPI, HTTPException
from fastapi.responses import RedirectResponse # Import RedirectResponse
import google.generativeai as genai
from fastapi.middleware.cors import CORSMiddleware
import requests
import os
import json
import gradio as gr
from dotenv import load_dotenv

load_dotenv()

app = FastAPI()

# Configure CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# Initialize Gemini
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
genai.configure(api_key=GEMINI_API_KEY)

# Define the tool for Gemini
get_weather_tool = genai.protos.Tool(
    function_declarations=[
        genai.protos.FunctionDeclaration(
            name="get_weather",
            description="Get the current weather for a specified location.",
            parameters=genai.protos.Schema(
                type=genai.protos.Type.OBJECT,
                properties={
                    "location": genai.protos.Schema(type=genai.protos.Type.STRING, description="The city name to get weather for"),
                },
                required=["location"],
            ),
        )
    ]
)

# Initialize model with tools
model = genai.GenerativeModel('gemini-2.5-flash', tools=[get_weather_tool])
API_KEY = os.getenv("API_KEY") # This is your OpenWeatherMap API key

# Weather API function
def get_weather(location: str) -> dict:
    api_key = os.getenv("API_KEY")
    if not api_key:
        print("Error: Weather API key not configured")
        return {"error": "Weather API key not configured"}

    try:
        response = requests.get(
            "http://api.openweathermap.org/data/2.5/weather",
            params={"q": location, "appid": api_key, "units": "metric"}, # Added units: metric
            timeout=10
        )
        response.raise_for_status() # Raise an exception for HTTP errors (4xx or 5xx)
        return response.json()
    except requests.exceptions.Timeout:
        print(f"Error fetching weather: Request to OpenWeatherMap timed out for {location}")
        return {"error": "Request to weather API timed out."}
    except requests.exceptions.RequestException as e:
        print(f"Error fetching weather for {location}: {e}")
        return {"error": f"Error connecting to weather API: {str(e)}"}
    except Exception as e:
        print(f"Unexpected error in get_weather for {location}: {e}")
        return {"error": str(e)}

# Tool execution function
def execute_tool(tool_name: str, parameters: dict) -> str:
    if tool_name == "get_weather":
        result = get_weather(parameters.get("location", "London"))
        if "error" in result:
            return result["error"]

        try:
            location_name = result.get("name", "Unknown City")
            country = result.get("sys", {}).get("country", "Unknown Country")
            temp_c = result.get("main", {}).get("temp")
            humidity = result.get("main", {}).get("humidity")
            wind_speed_ms = result.get("wind", {}).get("speed")
            weather_condition_list = result.get("weather", [])
            weather_condition = weather_condition_list[0].get("description", "N/A") if weather_condition_list else "N/A"
            feels_like_c = result.get("main", {}).get("feels_like")

            if None in [temp_c, humidity, wind_speed_ms, feels_like_c]:
                return "Incomplete weather data received from API."

            wind_kph = wind_speed_ms * 3.6

            return (
                f"Weather in {location_name}, {country}:\n"
                f"• Temperature: {temp_c:.1f}°C\n"
                f"• Condition: {weather_condition.capitalize()}\n"
                f"• Humidity: {humidity}%\n"
                f"• Wind: {wind_kph:.1f} km/h\n"
                f"• Feels like: {feels_like_c:.1f}°C"
            )
        except KeyError as e:
            print(f"Error parsing weather data: Missing key {e}. Response: {result}")
            return f"Error parsing weather data: Missing key {e}. Please check the API response structure."
        except Exception as e:
            print(f"An unexpected error occurred during weather parsing: {str(e)}")
            return f"An unexpected error occurred: {str(e)}"
    else:
        return f"Unknown tool: {tool_name}"

# AI processing with tool selection (using Function Calling)
def process_with_tools(query: str) -> str:
    try:
        response = model.generate_content(query)

        if response.candidates and response.candidates[0].content.parts:
            for part in response.candidates[0].content.parts:
                if part.function_call:
                    function_name = part.function_call.name
                    
                    function_args = {}
                    if hasattr(part.function_call, 'args'):
                        if hasattr(part.function_call.args, 'items'):
                            function_args = dict(part.function_call.args)
                        elif isinstance(part.function_call.args, str):
                            try:
                                parsed_args = json.loads(part.function_call.args)
                                if isinstance(parsed_args, dict):
                                    function_args = parsed_args
                                else:
                                    print(f"Warning: function_call.args was string but not a dict after json.loads: {parsed_args}")
                            except json.JSONDecodeError:
                                print(f"Warning: function_call.args was a string but not valid JSON: {part.function_call.args}")
                        else:
                            print(f"Warning: Unexpected type for function_call.args: {type(part.function_call.args)}")


                    if function_name == "get_weather":
                        print(f"Gemini requested tool: {function_name} with args: {function_args}")
                        tool_result = execute_tool("get_weather", function_args)
                        print(f"Tool execution result: {tool_result}")

                        chat_session = model.start_chat()
                        chat_session.send_message(query)
                        
                        response_with_tool_output = chat_session.send_message(
                            genai.protos.Part(
                                function_response=genai.protos.FunctionResponse(
                                    name="get_weather",
                                    response={"result": tool_result}
                                )
                            )
                        )
                        return response_with_tool_output.text
                    else:
                        return f"Unknown tool requested by AI: {function_name}"
                elif part.text:
                    return part.text
        return "No coherent response from AI (neither text nor function call)."

    except Exception as e:
        print(f"Error during AI processing in process_with_tools: {e}")
        return f"AI Error: {str(e)}"

# FastAPI endpoints
@app.post("/api/query")
async def handle_query(payload: dict):
    query = payload.get("query", "")
    if not query:
        raise HTTPException(status_code=400, detail="Query is required")
    
    response = process_with_tools(query)
    return {"response": response}

# Gradio UI
def gradio_interface(query: str):
    return process_with_tools(query)

gradio_app = gr.Interface(
    fn=gradio_interface,
    inputs=gr.Textbox(label="Your Message"),
    outputs=gr.Textbox(label="MCP Response"),
    title="MCP Server",
    description="Multi-Component Processing Server with Gemini AI and Weather Tools"
)

# Mount Gradio on FastAPI
app = gr.mount_gradio_app(app, gradio_app, path="/ui")

# === NEW CODE STARTS HERE ===
# Redirect the root path to the Gradio UI
@app.get("/")
async def redirect_to_gradio():
    return RedirectResponse(url="/ui")

# Optional: Keep the health check at a different path if still desired
@app.get("/health_status")
def health_check():
    return {"status": "active", "components": ["fastapi", "gemini", "weather-api", "gradio"]}
# === NEW CODE ENDS HERE ===

if __name__ == "__main__":
    import uvicorn
    port = int(os.getenv("PORT", 7860))
    uvicorn.run(app, host="0.0.0.0", port=port)