Spaces:
Sleeping
Sleeping
File size: 13,436 Bytes
4343907 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 |
"""
SAAP Cost Tracking API Endpoints
Provides real-time cost metrics and analytics for OpenRouter integration
"""
from datetime import datetime
from typing import Dict, List, Optional, Any
from fastapi import APIRouter, HTTPException, Query, Depends
from pydantic import BaseModel, Field
from ..services.cost_efficiency_logger import cost_efficiency_logger, CostAnalytics
from ..config.settings import get_settings
router = APIRouter(prefix="/api/v1/cost", tags=["Cost Tracking"])
# Response Models
class CostSummaryResponse(BaseModel):
"""Cost summary response model"""
total_cost_usd: float = Field(..., description="Total cost in USD")
total_requests: int = Field(..., description="Total number of requests")
successful_requests: int = Field(..., description="Number of successful requests")
failed_requests: int = Field(..., description="Number of failed requests")
success_rate: float = Field(..., description="Success rate (0-1)")
average_cost_per_request: float = Field(..., description="Average cost per request")
daily_budget_used: float = Field(..., description="Daily budget utilization percentage")
budget_remaining_usd: float = Field(..., description="Remaining budget in USD")
by_provider: Dict[str, Dict[str, Any]] = Field(..., description="Cost breakdown by provider")
period_hours: int = Field(..., description="Time period in hours")
class CostAnalyticsResponse(BaseModel):
"""Comprehensive cost analytics response"""
time_period: str = Field(..., description="Analysis time period")
total_cost_usd: float = Field(..., description="Total cost")
total_requests: int = Field(..., description="Total requests")
successful_requests: int = Field(..., description="Successful requests")
failed_requests: int = Field(..., description="Failed requests")
average_cost_per_request: float = Field(..., description="Average cost per request")
total_tokens: int = Field(..., description="Total tokens processed")
average_response_time: float = Field(..., description="Average response time in seconds")
cost_per_1k_tokens: float = Field(..., description="Cost per 1000 tokens")
tokens_per_second: float = Field(..., description="Processing speed in tokens/second")
top_expensive_models: List[Dict[str, Any]] = Field(..., description="Most expensive models")
cost_by_agent: Dict[str, float] = Field(..., description="Cost breakdown by agent")
cost_by_provider: Dict[str, float] = Field(..., description="Cost breakdown by provider")
daily_budget_utilization: float = Field(..., description="Daily budget usage percentage")
cost_trend_24h: List[Dict[str, Any]] = Field(..., description="24-hour cost trend")
efficiency_score: float = Field(..., description="Cost efficiency score (tokens per dollar)")
class PerformanceBenchmarkResponse(BaseModel):
"""Performance benchmark response"""
provider: str = Field(..., description="Provider name")
model: str = Field(..., description="Model name")
avg_response_time: float = Field(..., description="Average response time")
tokens_per_second: float = Field(..., description="Tokens per second")
cost_per_token: float = Field(..., description="Cost per token")
success_rate: float = Field(..., description="Success rate (0-1)")
cost_efficiency_score: float = Field(..., description="Cost efficiency score")
sample_size: int = Field(..., description="Number of samples")
class BudgetStatusResponse(BaseModel):
"""Budget status response"""
daily_budget_usd: float = Field(..., description="Daily budget limit")
current_daily_cost: float = Field(..., description="Current daily cost")
budget_used_percentage: float = Field(..., description="Budget usage percentage")
budget_remaining_usd: float = Field(..., description="Remaining budget")
alert_threshold_percentage: float = Field(..., description="Alert threshold")
is_over_threshold: bool = Field(..., description="Whether over alert threshold")
is_budget_exceeded: bool = Field(..., description="Whether budget is exceeded")
estimated_requests_remaining: int = Field(..., description="Estimated requests remaining in budget")
# API Endpoints
@router.get("/summary", response_model=CostSummaryResponse)
async def get_cost_summary(
hours: int = Query(24, ge=1, le=168, description="Time period in hours (1-168)")
) -> CostSummaryResponse:
"""
Get cost summary for specified time period
Returns comprehensive cost metrics including:
- Total costs and request counts
- Success/failure rates
- Budget utilization
- Provider breakdowns
"""
try:
analytics = await cost_efficiency_logger.get_cost_analytics(hours)
daily_cost = await cost_efficiency_logger.get_daily_cost()
settings = get_settings()
budget_remaining = max(0, settings.agents.daily_cost_budget - daily_cost)
budget_used_percentage = (daily_cost / settings.agents.daily_cost_budget) * 100
# Create provider breakdown
by_provider = {}
for provider, cost in analytics.cost_by_provider.items():
by_provider[provider] = {
"cost": cost,
"requests": 0, # Will be populated from analytics if available
"tokens": 0
}
return CostSummaryResponse(
total_cost_usd=analytics.total_cost_usd,
total_requests=analytics.total_requests,
successful_requests=analytics.successful_requests,
failed_requests=analytics.failed_requests,
success_rate=analytics.successful_requests / analytics.total_requests if analytics.total_requests > 0 else 0,
average_cost_per_request=analytics.average_cost_per_request,
daily_budget_used=budget_used_percentage,
budget_remaining_usd=budget_remaining,
by_provider=by_provider,
period_hours=hours
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve cost summary: {str(e)}")
@router.get("/analytics", response_model=CostAnalyticsResponse)
async def get_cost_analytics(
hours: int = Query(24, ge=1, le=168, description="Time period in hours (1-168)")
) -> CostAnalyticsResponse:
"""
Get comprehensive cost analytics
Provides detailed cost analysis including:
- Token metrics and efficiency scores
- Agent and provider breakdowns
- Cost trends and expensive models
- Performance metrics
"""
try:
analytics = await cost_efficiency_logger.get_cost_analytics(hours)
return CostAnalyticsResponse(
time_period=analytics.time_period,
total_cost_usd=analytics.total_cost_usd,
total_requests=analytics.total_requests,
successful_requests=analytics.successful_requests,
failed_requests=analytics.failed_requests,
average_cost_per_request=analytics.average_cost_per_request,
total_tokens=analytics.total_tokens,
average_response_time=analytics.average_response_time,
cost_per_1k_tokens=analytics.cost_per_1k_tokens,
tokens_per_second=analytics.tokens_per_second,
top_expensive_models=analytics.top_expensive_models,
cost_by_agent=analytics.cost_by_agent,
cost_by_provider=analytics.cost_by_provider,
daily_budget_utilization=analytics.daily_budget_utilization,
cost_trend_24h=analytics.cost_trend_24h,
efficiency_score=analytics.efficiency_score
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve cost analytics: {str(e)}")
@router.get("/benchmarks", response_model=List[PerformanceBenchmarkResponse])
async def get_performance_benchmarks(
hours: int = Query(24, ge=1, le=168, description="Time period in hours (1-168)")
) -> List[PerformanceBenchmarkResponse]:
"""
Get performance benchmarks by provider and model
Returns performance metrics for cost-efficiency analysis:
- Response times and processing speeds
- Cost per token comparisons
- Success rates and efficiency scores
"""
try:
benchmarks = await cost_efficiency_logger.get_performance_benchmarks(hours)
return [
PerformanceBenchmarkResponse(
provider=benchmark.provider,
model=benchmark.model,
avg_response_time=benchmark.avg_response_time,
tokens_per_second=benchmark.tokens_per_second,
cost_per_token=benchmark.cost_per_token,
success_rate=benchmark.success_rate,
cost_efficiency_score=benchmark.cost_efficiency_score,
sample_size=benchmark.sample_size
)
for benchmark in benchmarks
]
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve performance benchmarks: {str(e)}")
@router.get("/budget", response_model=BudgetStatusResponse)
async def get_budget_status() -> BudgetStatusResponse:
"""
Get current budget status and utilization
Provides real-time budget monitoring:
- Daily budget limits and usage
- Alert thresholds and warnings
- Estimated remaining capacity
"""
try:
settings = get_settings()
daily_cost = await cost_efficiency_logger.get_daily_cost()
daily_budget = settings.agents.daily_cost_budget
budget_used_percentage = (daily_cost / daily_budget) * 100
budget_remaining = max(0, daily_budget - daily_cost)
alert_threshold = settings.agents.warning_cost_threshold
is_over_threshold = budget_used_percentage >= (alert_threshold * 100)
is_budget_exceeded = daily_cost >= daily_budget
# Estimate remaining requests based on average cost
analytics = await cost_efficiency_logger.get_cost_analytics(24)
avg_cost_per_request = analytics.average_cost_per_request
estimated_requests_remaining = 0
if avg_cost_per_request > 0 and budget_remaining > 0:
estimated_requests_remaining = int(budget_remaining / avg_cost_per_request)
return BudgetStatusResponse(
daily_budget_usd=daily_budget,
current_daily_cost=daily_cost,
budget_used_percentage=budget_used_percentage,
budget_remaining_usd=budget_remaining,
alert_threshold_percentage=alert_threshold * 100,
is_over_threshold=is_over_threshold,
is_budget_exceeded=is_budget_exceeded,
estimated_requests_remaining=estimated_requests_remaining
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to retrieve budget status: {str(e)}")
@router.get("/report")
async def get_cost_report(
hours: int = Query(24, ge=1, le=168, description="Time period in hours (1-168)")
) -> Dict[str, str]:
"""
Generate detailed cost efficiency report
Returns a formatted text report with:
- Cost summaries and token metrics
- Provider and agent breakdowns
- Performance benchmarks
- Efficiency recommendations
"""
try:
report = await cost_efficiency_logger.generate_cost_report(hours)
return {
"report": report,
"generated_at": datetime.now().isoformat(),
"time_period_hours": hours
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to generate cost report: {str(e)}")
@router.post("/reset-daily")
async def reset_daily_costs() -> Dict[str, str]:
"""
Reset daily cost tracking (admin function)
Should be called at midnight to reset daily budgets and alerts.
"""
try:
# Get current daily cost before reset
current_daily_cost = await cost_efficiency_logger.get_daily_cost()
# Reset alerts (cost tracking reset should be handled by the enhanced agent manager)
cost_efficiency_logger.reset_daily_alerts()
return {
"message": "Daily costs and alerts reset successfully",
"previous_daily_cost": f"${current_daily_cost:.6f}",
"reset_at": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to reset daily costs: {str(e)}")
@router.delete("/cleanup")
async def cleanup_old_data(
days_to_keep: int = Query(30, ge=7, le=365, description="Days of data to keep (7-365)")
) -> Dict[str, str]:
"""
Clean up old cost tracking data
Removes cost records older than specified days to manage database size.
"""
try:
await cost_efficiency_logger.cleanup_old_data(days_to_keep)
return {
"message": f"Old cost data cleanup completed",
"days_kept": days_to_keep,
"cleanup_at": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to cleanup old data: {str(e)}")
# WebSocket endpoint for real-time cost monitoring would go here
# This could stream live cost updates to the frontend dashboard |