Spaces:
Sleeping
Sleeping
| """ | |
| SAAP Cost Efficiency Logger - Advanced Cost Tracking & Analytics | |
| Monitors OpenRouter costs, performance metrics, and budget management | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Any | |
| from dataclasses import dataclass, asdict | |
| from pathlib import Path | |
| import sqlite3 | |
| import aiosqlite | |
| from collections import defaultdict | |
| from ..config.settings import get_settings | |
| # Initialize cost logging | |
| cost_logger = logging.getLogger("saap.cost") | |
| performance_logger = logging.getLogger("saap.performance") | |
| class CostAnalytics: | |
| """Comprehensive cost analytics""" | |
| time_period: str | |
| total_cost_usd: float | |
| total_requests: int | |
| successful_requests: int | |
| failed_requests: int | |
| average_cost_per_request: float | |
| total_tokens: int | |
| average_response_time: float | |
| cost_per_1k_tokens: float | |
| tokens_per_second: float | |
| top_expensive_models: List[Dict[str, Any]] | |
| cost_by_agent: Dict[str, float] | |
| cost_by_provider: Dict[str, float] | |
| daily_budget_utilization: float | |
| cost_trend_24h: List[Dict[str, Any]] | |
| efficiency_score: float # Tokens per dollar | |
| class PerformanceBenchmark: | |
| """Performance benchmarking data""" | |
| provider: str | |
| model: str | |
| avg_response_time: float | |
| tokens_per_second: float | |
| cost_per_token: float | |
| success_rate: float | |
| cost_efficiency_score: float | |
| sample_size: int | |
| class CostEfficiencyLogger: | |
| """Advanced cost tracking and analytics system""" | |
| def __init__(self): | |
| self.settings = get_settings() | |
| self.cost_db_path = "logs/saap_cost_tracking.db" | |
| self.analytics_cache = {} | |
| self.cost_alerts = [] | |
| # Ensure logs directory exists | |
| Path("logs").mkdir(exist_ok=True) | |
| # Initialize database | |
| asyncio.create_task(self._initialize_database()) | |
| cost_logger.info("π° Cost Efficiency Logger initialized") | |
| async def _initialize_database(self): | |
| """Initialize SQLite database for cost tracking""" | |
| async with aiosqlite.connect(self.cost_db_path) as db: | |
| await db.execute(""" | |
| CREATE TABLE IF NOT EXISTS cost_metrics ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| timestamp TEXT NOT NULL, | |
| agent_id TEXT NOT NULL, | |
| provider TEXT NOT NULL, | |
| model TEXT NOT NULL, | |
| input_tokens INTEGER NOT NULL, | |
| output_tokens INTEGER NOT NULL, | |
| total_tokens INTEGER NOT NULL, | |
| cost_usd REAL NOT NULL, | |
| response_time_seconds REAL NOT NULL, | |
| request_success BOOLEAN NOT NULL, | |
| cost_per_1k_tokens REAL, | |
| tokens_per_second REAL, | |
| metadata TEXT | |
| ) | |
| """) | |
| await db.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_timestamp ON cost_metrics(timestamp) | |
| """) | |
| await db.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_agent_id ON cost_metrics(agent_id) | |
| """) | |
| await db.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_provider ON cost_metrics(provider) | |
| """) | |
| await db.commit() | |
| async def log_cost_metrics(self, metrics_data: Dict[str, Any]): | |
| """Log cost metrics to database and generate analytics""" | |
| # Calculate derived metrics | |
| metrics_data['cost_per_1k_tokens'] = ( | |
| metrics_data['cost_usd'] / (metrics_data['total_tokens'] / 1000) | |
| if metrics_data['total_tokens'] > 0 else 0 | |
| ) | |
| metrics_data['tokens_per_second'] = ( | |
| metrics_data['total_tokens'] / metrics_data['response_time_seconds'] | |
| if metrics_data['response_time_seconds'] > 0 else 0 | |
| ) | |
| # Store in database | |
| async with aiosqlite.connect(self.cost_db_path) as db: | |
| await db.execute(""" | |
| INSERT INTO cost_metrics ( | |
| timestamp, agent_id, provider, model, input_tokens, output_tokens, | |
| total_tokens, cost_usd, response_time_seconds, request_success, | |
| cost_per_1k_tokens, tokens_per_second, metadata | |
| ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
| """, ( | |
| metrics_data['timestamp'], | |
| metrics_data['agent_id'], | |
| metrics_data['provider'], | |
| metrics_data['model'], | |
| metrics_data['input_tokens'], | |
| metrics_data['output_tokens'], | |
| metrics_data['total_tokens'], | |
| metrics_data['cost_usd'], | |
| metrics_data['response_time_seconds'], | |
| metrics_data['request_success'], | |
| metrics_data['cost_per_1k_tokens'], | |
| metrics_data['tokens_per_second'], | |
| json.dumps(metrics_data.get('metadata', {})) | |
| )) | |
| await db.commit() | |
| # Real-time cost logging | |
| if metrics_data['request_success']: | |
| cost_logger.info( | |
| f"π° COST: {metrics_data['agent_id']} | " | |
| f"${metrics_data['cost_usd']:.6f} | " | |
| f"{metrics_data['total_tokens']} tokens | " | |
| f"{metrics_data['response_time_seconds']:.2f}s | " | |
| f"{metrics_data['tokens_per_second']:.1f} tok/s | " | |
| f"${metrics_data['cost_per_1k_tokens']:.4f}/1k" | |
| ) | |
| else: | |
| cost_logger.error( | |
| f"β FAILED: {metrics_data['agent_id']} | " | |
| f"{metrics_data['provider']} | " | |
| f"{metrics_data['response_time_seconds']:.2f}s timeout" | |
| ) | |
| # Check for budget alerts | |
| await self._check_budget_alerts() | |
| async def _check_budget_alerts(self): | |
| """Check and generate budget alerts""" | |
| daily_cost = await self.get_daily_cost() | |
| daily_budget = self.settings.agents.daily_cost_budget | |
| usage_percentage = (daily_cost / daily_budget) * 100 | |
| # Generate alerts at specific thresholds | |
| thresholds = [50, 75, 90, 95, 100] | |
| for threshold in thresholds: | |
| if usage_percentage >= threshold: | |
| alert_key = f"daily_budget_{threshold}" | |
| if alert_key not in self.cost_alerts: | |
| self.cost_alerts.append(alert_key) | |
| if threshold < 100: | |
| cost_logger.warning( | |
| f"β οΈ BUDGET ALERT: ${daily_cost:.4f} / ${daily_budget} " | |
| f"({usage_percentage:.1f}%) - {threshold}% threshold reached" | |
| ) | |
| else: | |
| cost_logger.critical( | |
| f"π¨ BUDGET EXCEEDED: ${daily_cost:.4f} / ${daily_budget} " | |
| f"({usage_percentage:.1f}%) - Switching to free models!" | |
| ) | |
| break | |
| async def get_daily_cost(self) -> float: | |
| """Get current daily cost""" | |
| today = datetime.now().strftime('%Y-%m-%d') | |
| async with aiosqlite.connect(self.cost_db_path) as db: | |
| cursor = await db.execute(""" | |
| SELECT SUM(cost_usd) FROM cost_metrics | |
| WHERE date(timestamp) = ? AND request_success = 1 | |
| """, (today,)) | |
| result = await cursor.fetchone() | |
| return result[0] or 0.0 | |
| async def get_cost_analytics(self, hours: int = 24) -> CostAnalytics: | |
| """Generate comprehensive cost analytics""" | |
| cutoff_time = (datetime.now() - timedelta(hours=hours)).isoformat() | |
| async with aiosqlite.connect(self.cost_db_path) as db: | |
| # Basic metrics | |
| cursor = await db.execute(""" | |
| SELECT | |
| COUNT(*) as total_requests, | |
| SUM(CASE WHEN request_success = 1 THEN 1 ELSE 0 END) as successful_requests, | |
| SUM(CASE WHEN request_success = 0 THEN 1 ELSE 0 END) as failed_requests, | |
| SUM(cost_usd) as total_cost, | |
| SUM(total_tokens) as total_tokens, | |
| AVG(response_time_seconds) as avg_response_time | |
| FROM cost_metrics | |
| WHERE timestamp >= ? | |
| """, (cutoff_time,)) | |
| basic_stats = await cursor.fetchone() | |
| if not basic_stats or basic_stats[0] == 0: | |
| return self._empty_analytics(hours) | |
| total_requests, successful_requests, failed_requests, total_cost, total_tokens, avg_response_time = basic_stats | |
| # Cost by agent | |
| cursor = await db.execute(""" | |
| SELECT agent_id, SUM(cost_usd) as cost | |
| FROM cost_metrics | |
| WHERE timestamp >= ? AND request_success = 1 | |
| GROUP BY agent_id | |
| ORDER BY cost DESC | |
| """, (cutoff_time,)) | |
| cost_by_agent = {row[0]: row[1] for row in await cursor.fetchall()} | |
| # Cost by provider | |
| cursor = await db.execute(""" | |
| SELECT provider, SUM(cost_usd) as cost | |
| FROM cost_metrics | |
| WHERE timestamp >= ? AND request_success = 1 | |
| GROUP BY provider | |
| ORDER BY cost DESC | |
| """, (cutoff_time,)) | |
| cost_by_provider = {row[0]: row[1] for row in await cursor.fetchall()} | |
| # Top expensive models | |
| cursor = await db.execute(""" | |
| SELECT model, SUM(cost_usd) as total_cost, COUNT(*) as requests, | |
| AVG(cost_per_1k_tokens) as avg_cost_per_1k | |
| FROM cost_metrics | |
| WHERE timestamp >= ? AND request_success = 1 | |
| GROUP BY model | |
| ORDER BY total_cost DESC | |
| LIMIT 5 | |
| """, (cutoff_time,)) | |
| top_expensive_models = [ | |
| { | |
| 'model': row[0], | |
| 'total_cost': row[1], | |
| 'requests': row[2], | |
| 'avg_cost_per_1k_tokens': row[3] | |
| } | |
| for row in await cursor.fetchall() | |
| ] | |
| # Hourly cost trend (last 24 hours) | |
| cursor = await db.execute(""" | |
| SELECT | |
| strftime('%Y-%m-%d %H:00', timestamp) as hour, | |
| SUM(cost_usd) as cost, | |
| COUNT(*) as requests | |
| FROM cost_metrics | |
| WHERE timestamp >= datetime('now', '-24 hours') AND request_success = 1 | |
| GROUP BY strftime('%Y-%m-%d %H:00', timestamp) | |
| ORDER BY hour | |
| """, ()) | |
| cost_trend_24h = [ | |
| {'hour': row[0], 'cost': row[1], 'requests': row[2]} | |
| for row in await cursor.fetchall() | |
| ] | |
| # Calculate derived metrics | |
| average_cost_per_request = total_cost / total_requests if total_requests > 0 else 0 | |
| cost_per_1k_tokens = (total_cost / (total_tokens / 1000)) if total_tokens > 0 else 0 | |
| tokens_per_second = total_tokens / (avg_response_time * total_requests) if avg_response_time and total_requests > 0 else 0 | |
| efficiency_score = total_tokens / total_cost if total_cost > 0 else 0 | |
| # Daily budget utilization | |
| daily_cost = await self.get_daily_cost() | |
| daily_budget_utilization = (daily_cost / self.settings.agents.daily_cost_budget) * 100 | |
| return CostAnalytics( | |
| time_period=f"{hours}h", | |
| total_cost_usd=total_cost or 0, | |
| total_requests=total_requests or 0, | |
| successful_requests=successful_requests or 0, | |
| failed_requests=failed_requests or 0, | |
| average_cost_per_request=average_cost_per_request, | |
| total_tokens=total_tokens or 0, | |
| average_response_time=avg_response_time or 0, | |
| cost_per_1k_tokens=cost_per_1k_tokens, | |
| tokens_per_second=tokens_per_second, | |
| top_expensive_models=top_expensive_models, | |
| cost_by_agent=cost_by_agent, | |
| cost_by_provider=cost_by_provider, | |
| daily_budget_utilization=daily_budget_utilization, | |
| cost_trend_24h=cost_trend_24h, | |
| efficiency_score=efficiency_score | |
| ) | |
| def _empty_analytics(self, hours: int) -> CostAnalytics: | |
| """Return empty analytics object""" | |
| return CostAnalytics( | |
| time_period=f"{hours}h", | |
| total_cost_usd=0.0, | |
| total_requests=0, | |
| successful_requests=0, | |
| failed_requests=0, | |
| average_cost_per_request=0.0, | |
| total_tokens=0, | |
| average_response_time=0.0, | |
| cost_per_1k_tokens=0.0, | |
| tokens_per_second=0.0, | |
| top_expensive_models=[], | |
| cost_by_agent={}, | |
| cost_by_provider={}, | |
| daily_budget_utilization=0.0, | |
| cost_trend_24h=[], | |
| efficiency_score=0.0 | |
| ) | |
| async def get_performance_benchmarks(self, hours: int = 24) -> List[PerformanceBenchmark]: | |
| """Get performance benchmarks by provider and model""" | |
| cutoff_time = (datetime.now() - timedelta(hours=hours)).isoformat() | |
| async with aiosqlite.connect(self.cost_db_path) as db: | |
| cursor = await db.execute(""" | |
| SELECT | |
| provider, | |
| model, | |
| AVG(response_time_seconds) as avg_response_time, | |
| AVG(tokens_per_second) as avg_tokens_per_second, | |
| AVG(cost_per_1k_tokens) as avg_cost_per_1k, | |
| SUM(CASE WHEN request_success = 1 THEN 1 ELSE 0 END) * 100.0 / COUNT(*) as success_rate, | |
| COUNT(*) as sample_size, | |
| SUM(total_tokens) as total_tokens, | |
| SUM(cost_usd) as total_cost | |
| FROM cost_metrics | |
| WHERE timestamp >= ? | |
| GROUP BY provider, model | |
| HAVING COUNT(*) >= 3 | |
| ORDER BY avg_tokens_per_second DESC | |
| """, (cutoff_time,)) | |
| benchmarks = [] | |
| for row in await cursor.fetchall(): | |
| provider, model, avg_response_time, avg_tokens_per_second, avg_cost_per_1k, success_rate, sample_size, total_tokens, total_cost = row | |
| # Calculate cost efficiency score (tokens per dollar) | |
| cost_efficiency_score = total_tokens / total_cost if total_cost > 0 else 0 | |
| cost_per_token = total_cost / total_tokens if total_tokens > 0 else 0 | |
| benchmarks.append(PerformanceBenchmark( | |
| provider=provider, | |
| model=model, | |
| avg_response_time=avg_response_time, | |
| tokens_per_second=avg_tokens_per_second or 0, | |
| cost_per_token=cost_per_token, | |
| success_rate=success_rate / 100, # Convert to decimal | |
| cost_efficiency_score=cost_efficiency_score, | |
| sample_size=sample_size | |
| )) | |
| return benchmarks | |
| async def generate_cost_report(self, hours: int = 24) -> str: | |
| """Generate detailed cost report""" | |
| analytics = await self.get_cost_analytics(hours) | |
| benchmarks = await self.get_performance_benchmarks(hours) | |
| report_lines = [ | |
| "=" * 60, | |
| f"π SAAP Cost Efficiency Report - Last {hours} Hours", | |
| "=" * 60, | |
| "", | |
| "π° COST SUMMARY:", | |
| f" Total Cost: ${analytics.total_cost_usd:.6f}", | |
| f" Requests: {analytics.total_requests} ({analytics.successful_requests} successful)", | |
| f" Average Cost/Request: ${analytics.average_cost_per_request:.6f}", | |
| f" Daily Budget Used: {analytics.daily_budget_utilization:.1f}%", | |
| "", | |
| "π’ TOKEN METRICS:", | |
| f" Total Tokens: {analytics.total_tokens:,}", | |
| f" Cost per 1K Tokens: ${analytics.cost_per_1k_tokens:.4f}", | |
| f" Tokens per Second: {analytics.tokens_per_second:.1f}", | |
| f" Efficiency Score: {analytics.efficiency_score:.1f} tokens/$", | |
| "" | |
| ] | |
| if analytics.cost_by_provider: | |
| report_lines.extend([ | |
| "π’ COST BY PROVIDER:", | |
| *[f" {provider}: ${cost:.6f}" for provider, cost in analytics.cost_by_provider.items()], | |
| "" | |
| ]) | |
| if analytics.cost_by_agent: | |
| report_lines.extend([ | |
| "π€ COST BY AGENT:", | |
| *[f" {agent}: ${cost:.6f}" for agent, cost in list(analytics.cost_by_agent.items())[:5]], | |
| "" | |
| ]) | |
| if analytics.top_expensive_models: | |
| report_lines.extend([ | |
| "πΈ TOP EXPENSIVE MODELS:", | |
| *[f" {model['model']}: ${model['total_cost']:.6f} ({model['requests']} requests)" | |
| for model in analytics.top_expensive_models[:3]], | |
| "" | |
| ]) | |
| if benchmarks: | |
| report_lines.extend([ | |
| "β‘ PERFORMANCE BENCHMARKS:", | |
| f"{'Provider':<15} {'Model':<25} {'Speed (t/s)':<12} {'Cost/Token':<12} {'Success':<8}", | |
| "-" * 80 | |
| ]) | |
| for bench in benchmarks[:5]: | |
| report_lines.append( | |
| f"{bench.provider:<15} {bench.model[:24]:<25} {bench.tokens_per_second:<12.1f} " | |
| f"${bench.cost_per_token:<11.8f} {bench.success_rate:<8.1%}" | |
| ) | |
| report_lines.append("") | |
| report_lines.extend([ | |
| "=" * 60, | |
| f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", | |
| "=" * 60 | |
| ]) | |
| return "\n".join(report_lines) | |
| async def cleanup_old_data(self, days_to_keep: int = 30): | |
| """Cleanup old cost tracking data""" | |
| cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat() | |
| async with aiosqlite.connect(self.cost_db_path) as db: | |
| cursor = await db.execute(""" | |
| DELETE FROM cost_metrics WHERE timestamp < ? | |
| """, (cutoff_date,)) | |
| deleted_rows = cursor.rowcount | |
| await db.commit() | |
| if deleted_rows > 0: | |
| cost_logger.info(f"π§Ή Cleaned up {deleted_rows} old cost records (>{days_to_keep} days)") | |
| def reset_daily_alerts(self): | |
| """Reset daily cost alerts (called at midnight)""" | |
| self.cost_alerts.clear() | |
| cost_logger.info("π Daily cost alerts reset") | |
| # Global cost logger instance | |
| cost_efficiency_logger = CostEfficiencyLogger() |