Spaces:
Sleeping
Sleeping
| """ | |
| SAAP Cost Tracking API Endpoints | |
| Provides real-time cost metrics and analytics for OpenRouter integration | |
| """ | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Any | |
| from fastapi import APIRouter, HTTPException, Query, Depends | |
| from pydantic import BaseModel, Field | |
| from ..services.cost_efficiency_logger import cost_efficiency_logger, CostAnalytics | |
| from ..config.settings import get_settings | |
| router = APIRouter(prefix="/api/v1/cost", tags=["Cost Tracking"]) | |
| # Response Models | |
| class CostSummaryResponse(BaseModel): | |
| """Cost summary response model""" | |
| total_cost_usd: float = Field(..., description="Total cost in USD") | |
| total_requests: int = Field(..., description="Total number of requests") | |
| successful_requests: int = Field(..., description="Number of successful requests") | |
| failed_requests: int = Field(..., description="Number of failed requests") | |
| success_rate: float = Field(..., description="Success rate (0-1)") | |
| average_cost_per_request: float = Field(..., description="Average cost per request") | |
| daily_budget_used: float = Field(..., description="Daily budget utilization percentage") | |
| budget_remaining_usd: float = Field(..., description="Remaining budget in USD") | |
| by_provider: Dict[str, Dict[str, Any]] = Field(..., description="Cost breakdown by provider") | |
| period_hours: int = Field(..., description="Time period in hours") | |
| class CostAnalyticsResponse(BaseModel): | |
| """Comprehensive cost analytics response""" | |
| time_period: str = Field(..., description="Analysis time period") | |
| total_cost_usd: float = Field(..., description="Total cost") | |
| total_requests: int = Field(..., description="Total requests") | |
| successful_requests: int = Field(..., description="Successful requests") | |
| failed_requests: int = Field(..., description="Failed requests") | |
| average_cost_per_request: float = Field(..., description="Average cost per request") | |
| total_tokens: int = Field(..., description="Total tokens processed") | |
| average_response_time: float = Field(..., description="Average response time in seconds") | |
| cost_per_1k_tokens: float = Field(..., description="Cost per 1000 tokens") | |
| tokens_per_second: float = Field(..., description="Processing speed in tokens/second") | |
| top_expensive_models: List[Dict[str, Any]] = Field(..., description="Most expensive models") | |
| cost_by_agent: Dict[str, float] = Field(..., description="Cost breakdown by agent") | |
| cost_by_provider: Dict[str, float] = Field(..., description="Cost breakdown by provider") | |
| daily_budget_utilization: float = Field(..., description="Daily budget usage percentage") | |
| cost_trend_24h: List[Dict[str, Any]] = Field(..., description="24-hour cost trend") | |
| efficiency_score: float = Field(..., description="Cost efficiency score (tokens per dollar)") | |
| class PerformanceBenchmarkResponse(BaseModel): | |
| """Performance benchmark response""" | |
| provider: str = Field(..., description="Provider name") | |
| model: str = Field(..., description="Model name") | |
| avg_response_time: float = Field(..., description="Average response time") | |
| tokens_per_second: float = Field(..., description="Tokens per second") | |
| cost_per_token: float = Field(..., description="Cost per token") | |
| success_rate: float = Field(..., description="Success rate (0-1)") | |
| cost_efficiency_score: float = Field(..., description="Cost efficiency score") | |
| sample_size: int = Field(..., description="Number of samples") | |
| class BudgetStatusResponse(BaseModel): | |
| """Budget status response""" | |
| daily_budget_usd: float = Field(..., description="Daily budget limit") | |
| current_daily_cost: float = Field(..., description="Current daily cost") | |
| budget_used_percentage: float = Field(..., description="Budget usage percentage") | |
| budget_remaining_usd: float = Field(..., description="Remaining budget") | |
| alert_threshold_percentage: float = Field(..., description="Alert threshold") | |
| is_over_threshold: bool = Field(..., description="Whether over alert threshold") | |
| is_budget_exceeded: bool = Field(..., description="Whether budget is exceeded") | |
| estimated_requests_remaining: int = Field(..., description="Estimated requests remaining in budget") | |
| # API Endpoints | |
| async def get_cost_summary( | |
| hours: int = Query(24, ge=1, le=168, description="Time period in hours (1-168)") | |
| ) -> CostSummaryResponse: | |
| """ | |
| Get cost summary for specified time period | |
| Returns comprehensive cost metrics including: | |
| - Total costs and request counts | |
| - Success/failure rates | |
| - Budget utilization | |
| - Provider breakdowns | |
| """ | |
| try: | |
| analytics = await cost_efficiency_logger.get_cost_analytics(hours) | |
| daily_cost = await cost_efficiency_logger.get_daily_cost() | |
| settings = get_settings() | |
| budget_remaining = max(0, settings.agents.daily_cost_budget - daily_cost) | |
| budget_used_percentage = (daily_cost / settings.agents.daily_cost_budget) * 100 | |
| # Create provider breakdown | |
| by_provider = {} | |
| for provider, cost in analytics.cost_by_provider.items(): | |
| by_provider[provider] = { | |
| "cost": cost, | |
| "requests": 0, # Will be populated from analytics if available | |
| "tokens": 0 | |
| } | |
| return CostSummaryResponse( | |
| total_cost_usd=analytics.total_cost_usd, | |
| total_requests=analytics.total_requests, | |
| successful_requests=analytics.successful_requests, | |
| failed_requests=analytics.failed_requests, | |
| success_rate=analytics.successful_requests / analytics.total_requests if analytics.total_requests > 0 else 0, | |
| average_cost_per_request=analytics.average_cost_per_request, | |
| daily_budget_used=budget_used_percentage, | |
| budget_remaining_usd=budget_remaining, | |
| by_provider=by_provider, | |
| period_hours=hours | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to retrieve cost summary: {str(e)}") | |
| async def get_cost_analytics( | |
| hours: int = Query(24, ge=1, le=168, description="Time period in hours (1-168)") | |
| ) -> CostAnalyticsResponse: | |
| """ | |
| Get comprehensive cost analytics | |
| Provides detailed cost analysis including: | |
| - Token metrics and efficiency scores | |
| - Agent and provider breakdowns | |
| - Cost trends and expensive models | |
| - Performance metrics | |
| """ | |
| try: | |
| analytics = await cost_efficiency_logger.get_cost_analytics(hours) | |
| return CostAnalyticsResponse( | |
| time_period=analytics.time_period, | |
| total_cost_usd=analytics.total_cost_usd, | |
| total_requests=analytics.total_requests, | |
| successful_requests=analytics.successful_requests, | |
| failed_requests=analytics.failed_requests, | |
| average_cost_per_request=analytics.average_cost_per_request, | |
| total_tokens=analytics.total_tokens, | |
| average_response_time=analytics.average_response_time, | |
| cost_per_1k_tokens=analytics.cost_per_1k_tokens, | |
| tokens_per_second=analytics.tokens_per_second, | |
| top_expensive_models=analytics.top_expensive_models, | |
| cost_by_agent=analytics.cost_by_agent, | |
| cost_by_provider=analytics.cost_by_provider, | |
| daily_budget_utilization=analytics.daily_budget_utilization, | |
| cost_trend_24h=analytics.cost_trend_24h, | |
| efficiency_score=analytics.efficiency_score | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to retrieve cost analytics: {str(e)}") | |
| async def get_performance_benchmarks( | |
| hours: int = Query(24, ge=1, le=168, description="Time period in hours (1-168)") | |
| ) -> List[PerformanceBenchmarkResponse]: | |
| """ | |
| Get performance benchmarks by provider and model | |
| Returns performance metrics for cost-efficiency analysis: | |
| - Response times and processing speeds | |
| - Cost per token comparisons | |
| - Success rates and efficiency scores | |
| """ | |
| try: | |
| benchmarks = await cost_efficiency_logger.get_performance_benchmarks(hours) | |
| return [ | |
| PerformanceBenchmarkResponse( | |
| provider=benchmark.provider, | |
| model=benchmark.model, | |
| avg_response_time=benchmark.avg_response_time, | |
| tokens_per_second=benchmark.tokens_per_second, | |
| cost_per_token=benchmark.cost_per_token, | |
| success_rate=benchmark.success_rate, | |
| cost_efficiency_score=benchmark.cost_efficiency_score, | |
| sample_size=benchmark.sample_size | |
| ) | |
| for benchmark in benchmarks | |
| ] | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to retrieve performance benchmarks: {str(e)}") | |
| async def get_budget_status() -> BudgetStatusResponse: | |
| """ | |
| Get current budget status and utilization | |
| Provides real-time budget monitoring: | |
| - Daily budget limits and usage | |
| - Alert thresholds and warnings | |
| - Estimated remaining capacity | |
| """ | |
| try: | |
| settings = get_settings() | |
| daily_cost = await cost_efficiency_logger.get_daily_cost() | |
| daily_budget = settings.agents.daily_cost_budget | |
| budget_used_percentage = (daily_cost / daily_budget) * 100 | |
| budget_remaining = max(0, daily_budget - daily_cost) | |
| alert_threshold = settings.agents.warning_cost_threshold | |
| is_over_threshold = budget_used_percentage >= (alert_threshold * 100) | |
| is_budget_exceeded = daily_cost >= daily_budget | |
| # Estimate remaining requests based on average cost | |
| analytics = await cost_efficiency_logger.get_cost_analytics(24) | |
| avg_cost_per_request = analytics.average_cost_per_request | |
| estimated_requests_remaining = 0 | |
| if avg_cost_per_request > 0 and budget_remaining > 0: | |
| estimated_requests_remaining = int(budget_remaining / avg_cost_per_request) | |
| return BudgetStatusResponse( | |
| daily_budget_usd=daily_budget, | |
| current_daily_cost=daily_cost, | |
| budget_used_percentage=budget_used_percentage, | |
| budget_remaining_usd=budget_remaining, | |
| alert_threshold_percentage=alert_threshold * 100, | |
| is_over_threshold=is_over_threshold, | |
| is_budget_exceeded=is_budget_exceeded, | |
| estimated_requests_remaining=estimated_requests_remaining | |
| ) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to retrieve budget status: {str(e)}") | |
| async def get_cost_report( | |
| hours: int = Query(24, ge=1, le=168, description="Time period in hours (1-168)") | |
| ) -> Dict[str, str]: | |
| """ | |
| Generate detailed cost efficiency report | |
| Returns a formatted text report with: | |
| - Cost summaries and token metrics | |
| - Provider and agent breakdowns | |
| - Performance benchmarks | |
| - Efficiency recommendations | |
| """ | |
| try: | |
| report = await cost_efficiency_logger.generate_cost_report(hours) | |
| return { | |
| "report": report, | |
| "generated_at": datetime.now().isoformat(), | |
| "time_period_hours": hours | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to generate cost report: {str(e)}") | |
| async def reset_daily_costs() -> Dict[str, str]: | |
| """ | |
| Reset daily cost tracking (admin function) | |
| Should be called at midnight to reset daily budgets and alerts. | |
| """ | |
| try: | |
| # Get current daily cost before reset | |
| current_daily_cost = await cost_efficiency_logger.get_daily_cost() | |
| # Reset alerts (cost tracking reset should be handled by the enhanced agent manager) | |
| cost_efficiency_logger.reset_daily_alerts() | |
| return { | |
| "message": "Daily costs and alerts reset successfully", | |
| "previous_daily_cost": f"${current_daily_cost:.6f}", | |
| "reset_at": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to reset daily costs: {str(e)}") | |
| async def cleanup_old_data( | |
| days_to_keep: int = Query(30, ge=7, le=365, description="Days of data to keep (7-365)") | |
| ) -> Dict[str, str]: | |
| """ | |
| Clean up old cost tracking data | |
| Removes cost records older than specified days to manage database size. | |
| """ | |
| try: | |
| await cost_efficiency_logger.cleanup_old_data(days_to_keep) | |
| return { | |
| "message": f"Old cost data cleanup completed", | |
| "days_kept": days_to_keep, | |
| "cleanup_at": datetime.now().isoformat() | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to cleanup old data: {str(e)}") | |
| # WebSocket endpoint for real-time cost monitoring would go here | |
| # This could stream live cost updates to the frontend dashboard |