import pandas as pd from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, Any import time import queue from freshqa.fresheval import FreshEval from src.api_key_rotator import get_rotator from src.utils import get_current_date_str class FreshEvalParallel: """병렬 처리를 위한 FreshEval 래퍼 클래스""" def __init__(self, model: str = 'solar-pro2', max_workers: int = 4): self.model = model self.max_workers = max_workers def evaluate_dataframe(self, df: pd.DataFrame, mode: str, progress_queue: "queue.Queue[int] | None" = None, on_item_done=None) -> pd.DataFrame: """병렬 처리를 통한 데이터프레임 평가 (진행률 표시)""" current_date = get_current_date_str() total_rows = len(df) # print(f"🚀 {mode} 모드 평가 시작: {total_rows}개 행, {self.max_workers}개 워커") # 워커별 인자 준비 worker_args = [] for index, row in df.iterrows(): worker_args.append((row, mode, current_date)) # 병렬 처리 (진행률 표시) results = [None] * total_rows # 미리 크기 할당 completed_count = 0 start_time = time.time() with ThreadPoolExecutor(max_workers=self.max_workers) as executor: # 모든 작업 제출 future_to_index = { executor.submit(self._evaluate_single_row_worker, args): i for i, args in enumerate(worker_args) } # 완료된 작업들을 순서대로 처리 for future in as_completed(future_to_index): original_index = future_to_index[future] try: result = future.result() results[original_index] = result completed_count += 1 # progress_queue에 진행률 반영 (Gradio UI 업데이트용) if progress_queue is not None: progress_queue.put(1) # on_item_done 콜백 호출 (옵션) if on_item_done: on_item_done(original_index, result) # 진행률 표시 progress_percent = (completed_count / total_rows) * 100 elapsed_time = time.time() - start_time # 10% 단위로 표시 (최소 10개 단위 보장) # total_rows // 10 = 10%에 해당하는 개수 (예: 3000행 → 300개) # max(1, ...)으로 최소 1개마다 보장 print_interval = max(10, total_rows // 10) # 최소 10개, 10% 단위 if (completed_count % print_interval == 0 or completed_count == total_rows): remaining_time = (elapsed_time / completed_count) * (total_rows - completed_count) if completed_count > 0 else 0 # print(f"📊 {mode} 모드 진행률: {progress_percent:.1f}% ({completed_count}/{total_rows}) - " # f"경과: {elapsed_time:.1f}초, 예상 남은 시간: {remaining_time:.1f}초") pass except Exception as e: print(f"❌ 평가 실패 (행 {original_index}): {e}") # 실패한 경우 기본값 반환 results[original_index] = { 'rating': 0, 'explanation': f"평가 실패: {str(e)}" } completed_count += 1 # 실패해도 큐에 진행률 반영 if progress_queue is not None: progress_queue.put(1) total_time = time.time() - start_time # print(f"✅ {mode} 모드 평가 완료: {total_time:.1f}초 소요") return pd.DataFrame(results) def _evaluate_single_row_worker(self, args: tuple) -> Dict[str, Any]: """워커 함수 - 각 워커마다 독립적인 FreshEval 인스턴스 생성""" row, mode, current_date = args # 각 워커마다 랜덤한 키로 시작 (부하 분산) # 429 에러 발생 시 FreshEval 내부에서 자동으로 다음 키로 순차 로테이션 api_key = get_rotator().pick_key_random() worker_eval = FreshEval(model=self.model, api_key=api_key) # 기존 evaluate_single_row 메서드 사용 return worker_eval.evaluate_single_row(row, mode, current_date) # 편의 함수 def evaluate_dataframe_parallel( df: pd.DataFrame, mode: str, on_item_done=None, progress_queue: "queue.Queue[int] | None" = None, max_workers: int = 4) -> pd.DataFrame: """병렬 처리를 통한 데이터프레임 평가 (편의 함수)""" parallel_eval = FreshEvalParallel(model='solar-pro2', max_workers=max_workers) return parallel_eval.evaluate_dataframe(df, mode, progress_queue, on_item_done)