Spaces:
Running
Running
| import pandas as pd | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import Dict, Any | |
| import time | |
| import queue | |
| from freshqa.fresheval import FreshEval | |
| from src.api_key_rotator import get_rotator | |
| from src.utils import get_current_date_str | |
| class FreshEvalParallel: | |
| """λ³λ ¬ μ²λ¦¬λ₯Ό μν FreshEval λνΌ ν΄λμ€""" | |
| def __init__(self, model: str = 'solar-pro2', max_workers: int = 4): | |
| self.model = model | |
| self.max_workers = max_workers | |
| def evaluate_dataframe(self, df: pd.DataFrame, mode: str, progress_queue: "queue.Queue[int] | None" = None, on_item_done=None) -> pd.DataFrame: | |
| """λ³λ ¬ μ²λ¦¬λ₯Ό ν΅ν λ°μ΄ν°νλ μ νκ° (μ§νλ₯ νμ)""" | |
| current_date = get_current_date_str() | |
| total_rows = len(df) | |
| # print(f"π {mode} λͺ¨λ νκ° μμ: {total_rows}κ° ν, {self.max_workers}κ° μ컀") | |
| # μμ»€λ³ μΈμ μ€λΉ | |
| worker_args = [] | |
| for index, row in df.iterrows(): | |
| worker_args.append((row, mode, current_date)) | |
| # λ³λ ¬ μ²λ¦¬ (μ§νλ₯ νμ) | |
| results = [None] * total_rows # 미리 ν¬κΈ° ν λΉ | |
| completed_count = 0 | |
| start_time = time.time() | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| # λͺ¨λ μμ μ μΆ | |
| future_to_index = { | |
| executor.submit(self._evaluate_single_row_worker, args): i | |
| for i, args in enumerate(worker_args) | |
| } | |
| # μλ£λ μμ λ€μ μμλλ‘ μ²λ¦¬ | |
| for future in as_completed(future_to_index): | |
| original_index = future_to_index[future] | |
| try: | |
| result = future.result() | |
| results[original_index] = result | |
| completed_count += 1 | |
| # progress_queueμ μ§νλ₯ λ°μ (Gradio UI μ λ°μ΄νΈμ©) | |
| if progress_queue is not None: | |
| progress_queue.put(1) | |
| # on_item_done μ½λ°± νΈμΆ (μ΅μ ) | |
| if on_item_done: | |
| on_item_done(original_index, result) | |
| # μ§νλ₯ νμ | |
| progress_percent = (completed_count / total_rows) * 100 | |
| elapsed_time = time.time() - start_time | |
| # 10% λ¨μλ‘ νμ (μ΅μ 10κ° λ¨μ 보μ₯) | |
| # total_rows // 10 = 10%μ ν΄λΉνλ κ°μ (μ: 3000ν β 300κ°) | |
| # max(1, ...)μΌλ‘ μ΅μ 1κ°λ§λ€ 보μ₯ | |
| print_interval = max(10, total_rows // 10) # μ΅μ 10κ°, 10% λ¨μ | |
| if (completed_count % print_interval == 0 or | |
| completed_count == total_rows): | |
| remaining_time = (elapsed_time / completed_count) * (total_rows - completed_count) if completed_count > 0 else 0 | |
| # print(f"π {mode} λͺ¨λ μ§νλ₯ : {progress_percent:.1f}% ({completed_count}/{total_rows}) - " | |
| # f"κ²½κ³Ό: {elapsed_time:.1f}μ΄, μμ λ¨μ μκ°: {remaining_time:.1f}μ΄") | |
| pass | |
| except Exception as e: | |
| print(f"β νκ° μ€ν¨ (ν {original_index}): {e}") | |
| # μ€ν¨ν κ²½μ° κΈ°λ³Έκ° λ°ν | |
| results[original_index] = { | |
| 'rating': 0, | |
| 'explanation': f"νκ° μ€ν¨: {str(e)}" | |
| } | |
| completed_count += 1 | |
| # μ€ν¨ν΄λ νμ μ§νλ₯ λ°μ | |
| if progress_queue is not None: | |
| progress_queue.put(1) | |
| total_time = time.time() - start_time | |
| # print(f"β {mode} λͺ¨λ νκ° μλ£: {total_time:.1f}μ΄ μμ") | |
| return pd.DataFrame(results) | |
| def _evaluate_single_row_worker(self, args: tuple) -> Dict[str, Any]: | |
| """μ컀 ν¨μ - κ° μ컀λ§λ€ λ 립μ μΈ FreshEval μΈμ€ν΄μ€ μμ±""" | |
| row, mode, current_date = args | |
| # κ° μ컀λ§λ€ λλ€ν ν€λ‘ μμ (λΆν λΆμ°) | |
| # 429 μλ¬ λ°μ μ FreshEval λ΄λΆμμ μλμΌλ‘ λ€μ ν€λ‘ μμ°¨ λ‘ν μ΄μ | |
| api_key = get_rotator().pick_key_random() | |
| worker_eval = FreshEval(model=self.model, api_key=api_key) | |
| # κΈ°μ‘΄ evaluate_single_row λ©μλ μ¬μ© | |
| return worker_eval.evaluate_single_row(row, mode, current_date) | |
| # νΈμ ν¨μ | |
| def evaluate_dataframe_parallel( | |
| df: pd.DataFrame, | |
| mode: str, | |
| on_item_done=None, | |
| progress_queue: "queue.Queue[int] | None" = None, | |
| max_workers: int = 4) -> pd.DataFrame: | |
| """λ³λ ¬ μ²λ¦¬λ₯Ό ν΅ν λ°μ΄ν°νλ μ νκ° (νΈμ ν¨μ)""" | |
| parallel_eval = FreshEvalParallel(model='solar-pro2', max_workers=max_workers) | |
| return parallel_eval.evaluate_dataframe(df, mode, progress_queue, on_item_done) | |