ko-freshqa-leaderboard / freshqa /fresheval_parallel.py
jisubae
feat: random key rotation
ca1e416
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Any
import time
import queue
from freshqa.fresheval import FreshEval
from src.api_key_rotator import get_rotator
from src.utils import get_current_date_str
class FreshEvalParallel:
"""병렬 처리λ₯Ό μœ„ν•œ FreshEval 래퍼 클래슀"""
def __init__(self, model: str = 'solar-pro2', max_workers: int = 4):
self.model = model
self.max_workers = max_workers
def evaluate_dataframe(self, df: pd.DataFrame, mode: str, progress_queue: "queue.Queue[int] | None" = None, on_item_done=None) -> pd.DataFrame:
"""병렬 처리λ₯Ό ν†΅ν•œ λ°μ΄ν„°ν”„λ ˆμž„ 평가 (μ§„ν–‰λ₯  ν‘œμ‹œ)"""
current_date = get_current_date_str()
total_rows = len(df)
# print(f"πŸš€ {mode} λͺ¨λ“œ 평가 μ‹œμž‘: {total_rows}개 ν–‰, {self.max_workers}개 μ›Œμ»€")
# μ›Œμ»€λ³„ 인자 μ€€λΉ„
worker_args = []
for index, row in df.iterrows():
worker_args.append((row, mode, current_date))
# 병렬 처리 (μ§„ν–‰λ₯  ν‘œμ‹œ)
results = [None] * total_rows # 미리 크기 ν• λ‹Ή
completed_count = 0
start_time = time.time()
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# λͺ¨λ“  μž‘μ—… 제좜
future_to_index = {
executor.submit(self._evaluate_single_row_worker, args): i
for i, args in enumerate(worker_args)
}
# μ™„λ£Œλœ μž‘μ—…λ“€μ„ μˆœμ„œλŒ€λ‘œ 처리
for future in as_completed(future_to_index):
original_index = future_to_index[future]
try:
result = future.result()
results[original_index] = result
completed_count += 1
# progress_queue에 μ§„ν–‰λ₯  반영 (Gradio UI μ—…λ°μ΄νŠΈμš©)
if progress_queue is not None:
progress_queue.put(1)
# on_item_done 콜백 호좜 (μ˜΅μ…˜)
if on_item_done:
on_item_done(original_index, result)
# μ§„ν–‰λ₯  ν‘œμ‹œ
progress_percent = (completed_count / total_rows) * 100
elapsed_time = time.time() - start_time
# 10% λ‹¨μœ„λ‘œ ν‘œμ‹œ (μ΅œμ†Œ 10개 λ‹¨μœ„ 보μž₯)
# total_rows // 10 = 10%에 ν•΄λ‹Ήν•˜λŠ” 개수 (예: 3000ν–‰ β†’ 300개)
# max(1, ...)으둜 μ΅œμ†Œ 1κ°œλ§ˆλ‹€ 보μž₯
print_interval = max(10, total_rows // 10) # μ΅œμ†Œ 10개, 10% λ‹¨μœ„
if (completed_count % print_interval == 0 or
completed_count == total_rows):
remaining_time = (elapsed_time / completed_count) * (total_rows - completed_count) if completed_count > 0 else 0
# print(f"πŸ“Š {mode} λͺ¨λ“œ μ§„ν–‰λ₯ : {progress_percent:.1f}% ({completed_count}/{total_rows}) - "
# f"κ²½κ³Ό: {elapsed_time:.1f}초, μ˜ˆμƒ 남은 μ‹œκ°„: {remaining_time:.1f}초")
pass
except Exception as e:
print(f"❌ 평가 μ‹€νŒ¨ (ν–‰ {original_index}): {e}")
# μ‹€νŒ¨ν•œ 경우 κΈ°λ³Έκ°’ λ°˜ν™˜
results[original_index] = {
'rating': 0,
'explanation': f"평가 μ‹€νŒ¨: {str(e)}"
}
completed_count += 1
# μ‹€νŒ¨ν•΄λ„ 큐에 μ§„ν–‰λ₯  반영
if progress_queue is not None:
progress_queue.put(1)
total_time = time.time() - start_time
# print(f"βœ… {mode} λͺ¨λ“œ 평가 μ™„λ£Œ: {total_time:.1f}초 μ†Œμš”")
return pd.DataFrame(results)
def _evaluate_single_row_worker(self, args: tuple) -> Dict[str, Any]:
"""μ›Œμ»€ ν•¨μˆ˜ - 각 μ›Œμ»€λ§ˆλ‹€ 독립적인 FreshEval μΈμŠ€ν„΄μŠ€ 생성"""
row, mode, current_date = args
# 각 μ›Œμ»€λ§ˆλ‹€ λžœλ€ν•œ ν‚€λ‘œ μ‹œμž‘ (λΆ€ν•˜ λΆ„μ‚°)
# 429 μ—λŸ¬ λ°œμƒ μ‹œ FreshEval λ‚΄λΆ€μ—μ„œ μžλ™μœΌλ‘œ λ‹€μŒ ν‚€λ‘œ 순차 λ‘œν…Œμ΄μ…˜
api_key = get_rotator().pick_key_random()
worker_eval = FreshEval(model=self.model, api_key=api_key)
# κΈ°μ‘΄ evaluate_single_row λ©”μ„œλ“œ μ‚¬μš©
return worker_eval.evaluate_single_row(row, mode, current_date)
# 편의 ν•¨μˆ˜
def evaluate_dataframe_parallel(
df: pd.DataFrame,
mode: str,
on_item_done=None,
progress_queue: "queue.Queue[int] | None" = None,
max_workers: int = 4) -> pd.DataFrame:
"""병렬 처리λ₯Ό ν†΅ν•œ λ°μ΄ν„°ν”„λ ˆμž„ 평가 (편의 ν•¨μˆ˜)"""
parallel_eval = FreshEvalParallel(model='solar-pro2', max_workers=max_workers)
return parallel_eval.evaluate_dataframe(df, mode, progress_queue, on_item_done)