|
|
""" |
|
|
MCP Video Agent - HF Space with Modal Backend + Security |
|
|
Connects to Modal backend with authentication and rate limiting |
|
|
""" |
|
|
|
|
|
import os |
|
|
import gradio as gr |
|
|
import time |
|
|
import hashlib |
|
|
import base64 |
|
|
from datetime import datetime, timedelta |
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RateLimiter: |
|
|
"""Simple in-memory rate limiter""" |
|
|
def __init__(self, max_requests_per_hour=10): |
|
|
self.max_requests = max_requests_per_hour |
|
|
self.requests = defaultdict(list) |
|
|
|
|
|
def is_allowed(self, user_id): |
|
|
"""Check if user is within rate limit""" |
|
|
now = datetime.now() |
|
|
cutoff = now - timedelta(hours=1) |
|
|
|
|
|
|
|
|
self.requests[user_id] = [ |
|
|
req_time for req_time in self.requests[user_id] |
|
|
if req_time > cutoff |
|
|
] |
|
|
|
|
|
|
|
|
if len(self.requests[user_id]) >= self.max_requests: |
|
|
return False |
|
|
|
|
|
|
|
|
self.requests[user_id].append(now) |
|
|
return True |
|
|
|
|
|
def get_remaining(self, user_id): |
|
|
"""Get remaining requests for user""" |
|
|
now = datetime.now() |
|
|
cutoff = now - timedelta(hours=1) |
|
|
recent = [t for t in self.requests[user_id] if t > cutoff] |
|
|
return max(0, self.max_requests - len(recent)) |
|
|
|
|
|
|
|
|
MAX_REQUESTS_PER_HOUR = int(os.environ.get("MAX_REQUESTS_PER_HOUR", "10")) |
|
|
rate_limiter = RateLimiter(max_requests_per_hour=MAX_REQUESTS_PER_HOUR) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import modal |
|
|
|
|
|
def get_modal_function(function_name): |
|
|
"""Connect to Modal function""" |
|
|
try: |
|
|
func = modal.Function.from_name("mcp-video-agent", function_name) |
|
|
return func |
|
|
except Exception as e: |
|
|
print(f"β Failed to connect to Modal: {e}") |
|
|
return None |
|
|
|
|
|
def get_modal_volume(): |
|
|
"""Get Modal Volume for file operations""" |
|
|
try: |
|
|
vol = modal.Volume.from_name("video-storage") |
|
|
return vol |
|
|
except Exception as e: |
|
|
print(f"β Failed to connect to Modal Volume: {e}") |
|
|
return None |
|
|
|
|
|
def upload_to_modal_volume(local_path, remote_filename): |
|
|
"""Upload file to Modal Volume using SDK batch_upload""" |
|
|
try: |
|
|
vol = get_modal_volume() |
|
|
if vol is None: |
|
|
return False, "Failed to connect to Modal Volume" |
|
|
|
|
|
|
|
|
with vol.batch_upload() as batch: |
|
|
batch.put_file(local_path, f"/{remote_filename}") |
|
|
|
|
|
print(f"β
Uploaded to Modal Volume: {remote_filename}") |
|
|
return True, "Success" |
|
|
except Exception as e: |
|
|
print(f"β Upload error: {e}") |
|
|
return False, str(e) |
|
|
|
|
|
def download_from_modal_volume(remote_filename, local_path): |
|
|
"""Download file from Modal Volume using SDK read_file""" |
|
|
try: |
|
|
vol = get_modal_volume() |
|
|
if vol is None: |
|
|
return False |
|
|
|
|
|
|
|
|
if os.path.exists(local_path): |
|
|
os.remove(local_path) |
|
|
|
|
|
|
|
|
with open(local_path, 'wb') as f: |
|
|
for chunk in vol.read_file(f"/{remote_filename}"): |
|
|
f.write(chunk) |
|
|
|
|
|
print(f"β
Downloaded from Modal Volume: {remote_filename}") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"β Download error: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
uploaded_videos_cache = {} |
|
|
|
|
|
def process_interaction(user_message, history, video_file, username, request: gr.Request): |
|
|
""" |
|
|
Core chatbot logic with Modal backend and security. |
|
|
""" |
|
|
if history is None: |
|
|
history = [] |
|
|
|
|
|
|
|
|
user_id = username |
|
|
|
|
|
|
|
|
history = history + [{"role": "user", "content": user_message}] |
|
|
history = history + [{"role": "assistant", "content": "β³ Processing your request..."}] |
|
|
yield history |
|
|
|
|
|
|
|
|
if not rate_limiter.is_allowed(user_id): |
|
|
remaining = rate_limiter.get_remaining(user_id) |
|
|
history[-1] = {"role": "assistant", "content": f"β οΈ Rate limit exceeded. You have {remaining} requests remaining this hour. Please try again later."} |
|
|
yield history |
|
|
return |
|
|
|
|
|
|
|
|
remaining = rate_limiter.get_remaining(user_id) |
|
|
print(f"π‘ User {user_id}: {remaining} requests remaining this hour") |
|
|
|
|
|
|
|
|
if video_file is None: |
|
|
history[-1] = {"role": "assistant", "content": "β οΈ Please upload a video first!"} |
|
|
yield history |
|
|
return |
|
|
|
|
|
local_path = video_file |
|
|
|
|
|
|
|
|
file_size_mb = os.path.getsize(local_path) / (1024 * 1024) |
|
|
if file_size_mb > 100: |
|
|
history[-1] = {"role": "assistant", "content": f"β Video too large! Size: {file_size_mb:.1f}MB. Please upload a video smaller than 100MB."} |
|
|
yield history |
|
|
return |
|
|
|
|
|
|
|
|
with open(local_path, 'rb') as f: |
|
|
file_hash = hashlib.md5(f.read()).hexdigest()[:8] |
|
|
|
|
|
timestamp = int(time.time()) |
|
|
unique_filename = f"video_{timestamp}_{file_hash}.mp4" |
|
|
cache_key = f"{local_path}_{file_hash}" |
|
|
|
|
|
|
|
|
if cache_key not in uploaded_videos_cache: |
|
|
history[-1] = {"role": "assistant", "content": f"π€ Uploading video ({file_size_mb:.1f}MB)... This may take a moment."} |
|
|
yield history |
|
|
|
|
|
try: |
|
|
success, error_msg = upload_to_modal_volume(local_path, unique_filename) |
|
|
|
|
|
if not success: |
|
|
history[-1] = {"role": "assistant", "content": f"β Upload failed: {error_msg}"} |
|
|
yield history |
|
|
return |
|
|
|
|
|
uploaded_videos_cache[cache_key] = unique_filename |
|
|
print(f"β
Video uploaded: {unique_filename}") |
|
|
|
|
|
|
|
|
time.sleep(1) |
|
|
|
|
|
except Exception as e: |
|
|
history[-1] = {"role": "assistant", "content": f"β Upload error: {str(e)}"} |
|
|
yield history |
|
|
return |
|
|
else: |
|
|
unique_filename = uploaded_videos_cache[cache_key] |
|
|
history[-1] = {"role": "assistant", "content": "β»οΈ Using cached video..."} |
|
|
yield history |
|
|
|
|
|
|
|
|
history[-1] = {"role": "assistant", "content": "π€ Analyzing video with Gemini..."} |
|
|
yield history |
|
|
|
|
|
try: |
|
|
analyze_fn = get_modal_function("_internal_analyze_video") |
|
|
if analyze_fn is None: |
|
|
history[-1] = {"role": "assistant", "content": "β Failed to connect to Modal backend. Please check deployment."} |
|
|
yield history |
|
|
return |
|
|
|
|
|
text_response = analyze_fn.remote(user_message, video_filename=unique_filename) |
|
|
except Exception as e: |
|
|
text_response = f"β Analysis error: {str(e)}" |
|
|
|
|
|
full_text_response = text_response |
|
|
|
|
|
|
|
|
if "β" not in text_response and "β οΈ" not in text_response: |
|
|
history[-1] = {"role": "assistant", "content": "π£οΈ Generating audio response..."} |
|
|
yield history |
|
|
|
|
|
try: |
|
|
speak_fn = get_modal_function("_internal_speak_text") |
|
|
if speak_fn is None: |
|
|
history[-1] = {"role": "assistant", "content": f"β οΈ TTS unavailable.\n\n<div style='background: black; color: lime; padding: 20px; border-radius: 10px; white-space: normal; word-wrap: break-word;'>{full_text_response}</div>"} |
|
|
yield history |
|
|
return |
|
|
|
|
|
audio_filename = f"audio_{unique_filename.replace('.mp4', '.mp3')}" |
|
|
speak_fn.remote(text_response, audio_filename=audio_filename) |
|
|
|
|
|
|
|
|
time.sleep(3) |
|
|
local_audio = f"/tmp/{audio_filename}" |
|
|
|
|
|
|
|
|
if os.path.exists(local_audio): |
|
|
os.remove(local_audio) |
|
|
|
|
|
max_retries = 3 |
|
|
for retry in range(max_retries): |
|
|
success = download_from_modal_volume(audio_filename, local_audio) |
|
|
|
|
|
if success and os.path.exists(local_audio) and os.path.getsize(local_audio) > 1000: |
|
|
break |
|
|
|
|
|
|
|
|
if os.path.exists(local_audio): |
|
|
os.remove(local_audio) |
|
|
|
|
|
time.sleep(2) |
|
|
|
|
|
if os.path.exists(local_audio) and os.path.getsize(local_audio) > 1000: |
|
|
with open(local_audio, 'rb') as f: |
|
|
audio_bytes = f.read() |
|
|
audio_base64 = base64.b64encode(audio_bytes).decode() |
|
|
|
|
|
response_content = f"""ποΈ **Audio Response** ({remaining} requests remaining this hour) |
|
|
|
|
|
<audio controls autoplay style="width: 100%; margin: 10px 0; background: #f0f0f0; border-radius: 5px;"> |
|
|
<source src="data:audio/mpeg;base64,{audio_base64}" type="audio/mpeg"> |
|
|
</audio> |
|
|
|
|
|
**π Full Text Response:** |
|
|
|
|
|
<div style="background-color: #000000; color: #00ff00; padding: 25px; border-radius: 10px; font-family: 'Courier New', monospace; line-height: 1.8; font-size: 14px; white-space: normal; word-wrap: break-word; overflow-wrap: break-word; max-width: 100%;"> |
|
|
{full_text_response} |
|
|
</div>""" |
|
|
|
|
|
history[-1] = {"role": "assistant", "content": response_content} |
|
|
yield history |
|
|
else: |
|
|
history[-1] = {"role": "assistant", "content": f"β οΈ Audio generation incomplete.\n\n<div style='background: black; color: lime; padding: 20px; border-radius: 10px; white-space: normal; word-wrap: break-word;'>{full_text_response}</div>"} |
|
|
yield history |
|
|
|
|
|
except Exception as e: |
|
|
history[-1] = {"role": "assistant", "content": f"β Audio error: {str(e)}\n\n<div style='background: black; color: lime; padding: 20px; border-radius: 10px; white-space: normal; word-wrap: break-word;'>{full_text_response}</div>"} |
|
|
yield history |
|
|
else: |
|
|
history[-1] = {"role": "assistant", "content": text_response} |
|
|
yield history |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
GRADIO_USERNAME = os.environ.get("GRADIO_USERNAME", "admin") |
|
|
GRADIO_PASSWORD = os.environ.get("GRADIO_PASSWORD") |
|
|
|
|
|
|
|
|
def authenticate(username, password): |
|
|
"""Authenticate users - only if password is set""" |
|
|
if GRADIO_PASSWORD is None: |
|
|
|
|
|
return True |
|
|
return username == GRADIO_USERNAME and password == GRADIO_PASSWORD |
|
|
|
|
|
with gr.Blocks(title="π₯ MCP Video Agent") as demo: |
|
|
gr.Markdown("# π₯ MCP Video Agent") |
|
|
gr.Markdown("**π MCP 1st Birthday Hackathon** | Track: MCP in Action (Consumer & Creative)") |
|
|
|
|
|
gr.Markdown(f""" |
|
|
### β‘ Key Innovation: Smart Frame Caching |
|
|
|
|
|
**First Query**: Video is analyzed deeply and cached (8-12 seconds) |
|
|
**Follow-up Queries**: Instant responses using cached context (2-3 seconds, 90% cost reduction!) |
|
|
**Cache Duration**: 1 hour - ask multiple questions without reprocessing |
|
|
|
|
|
--- |
|
|
|
|
|
### π How to Use |
|
|
|
|
|
1. **Upload** a video (MP4, max 100MB) |
|
|
2. **Ask** your first question - video will be analyzed and cached |
|
|
3. **Continue** asking follow-up questions - experience the speed boost! |
|
|
4. **Listen** to voice responses (powered by ElevenLabs TTS) |
|
|
|
|
|
**Pro Tip**: After your first question, try asking 2-3 more to see how fast cached responses are! |
|
|
|
|
|
--- |
|
|
|
|
|
### π‘οΈ Fair Usage Policy |
|
|
|
|
|
- **Rate Limit**: {MAX_REQUESTS_PER_HOUR} requests per hour per user |
|
|
- **Video Size**: Max 100MB |
|
|
- **Shared Resources**: This is a Hackathon demo - please use responsibly |
|
|
|
|
|
--- |
|
|
|
|
|
### π§ Tech Stack |
|
|
|
|
|
- **Gemini 2.5 Flash**: Multimodal video analysis + Context Caching |
|
|
- **Modal**: Serverless backend + Persistent storage |
|
|
- **ElevenLabs**: Neural text-to-speech |
|
|
- **Gradio 6.0**: Interactive UI |
|
|
|
|
|
**Sponsor Tech Used**: β
Modal | β
Google Gemini | β
ElevenLabs |
|
|
""") |
|
|
|
|
|
username_state = gr.State("") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
video_input = gr.Video(label="πΉ Upload Video (MP4)", sources=["upload"]) |
|
|
gr.Markdown("**Supported:** MP4, max 100MB") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
chatbot = gr.Chatbot(label="π¬ Conversation", height=500) |
|
|
msg = gr.Textbox( |
|
|
label="Your question...", |
|
|
placeholder="What is this video about?", |
|
|
lines=2 |
|
|
) |
|
|
submit_btn = gr.Button("π Send", variant="primary") |
|
|
|
|
|
|
|
|
gr.Examples( |
|
|
examples=[ |
|
|
["What is happening in this video?"], |
|
|
["Describe the main content of this video."], |
|
|
["What are the key visual elements?"], |
|
|
], |
|
|
inputs=msg |
|
|
) |
|
|
|
|
|
|
|
|
def set_username(request: gr.Request): |
|
|
return request.username if hasattr(request, 'username') else "anonymous" |
|
|
|
|
|
demo.load(set_username, None, username_state) |
|
|
|
|
|
|
|
|
submit_btn.click( |
|
|
process_interaction, |
|
|
inputs=[msg, chatbot, video_input, username_state], |
|
|
outputs=[chatbot] |
|
|
) |
|
|
|
|
|
msg.submit( |
|
|
process_interaction, |
|
|
inputs=[msg, chatbot, video_input, username_state], |
|
|
outputs=[chatbot] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
auth_config = None |
|
|
if GRADIO_PASSWORD: |
|
|
auth_config = authenticate |
|
|
print(f"π Authentication enabled. Username: {GRADIO_USERNAME}") |
|
|
else: |
|
|
print("π Public access enabled (no authentication required)") |
|
|
print(" Rate limiting active to prevent abuse") |
|
|
print(f" Limit: {MAX_REQUESTS_PER_HOUR} requests/hour per user") |
|
|
|
|
|
demo.launch( |
|
|
auth=auth_config, |
|
|
show_error=True, |
|
|
share=False |
|
|
) |
|
|
|
|
|
|