Spaces:
Running
Running
| import re | |
| import pandas as pd | |
| from openai import OpenAI | |
| from typing import List, Dict, Any, Tuple | |
| import time | |
| import random | |
| from config import Config | |
| from src.utils import get_current_date_str | |
| class FreshEval: | |
| def __init__(self, model: str='solar-pro2', api_key: str=None): | |
| self.model = model | |
| self.api_key = api_key or Config.UPSTAGE_API_KEY | |
| self.client = OpenAI( | |
| api_key=self.api_key, | |
| base_url="https://api.upstage.ai/v1/solar" | |
| ) | |
| self.temperature = 0.0 | |
| self.max_tokens = 256 | |
| self.chat_completions = True | |
| if model.startswith('gpt-4') | model.startswith('solar'): | |
| self.num_organic_results = 15 | |
| self.num_related_questions = 3 | |
| self.num_questions_and_answers = 3 | |
| self.num_retrieved_evidences = 15 | |
| else: | |
| self.num_organic_results = 15 | |
| self.num_related_questions = 2 | |
| self.num_questions_and_answers = 2 | |
| self.num_retrieved_evidences = 5 | |
| def _is_rate_limit_error(self, error: Exception) -> bool: | |
| """429 μλ¬ κ°μ§ ν¨μ""" | |
| error_str = str(error) | |
| error_type = type(error).__name__ | |
| # 1. HTTP μν μ½λ νμΈ | |
| if hasattr(error, 'response') and hasattr(error.response, 'status_code'): | |
| if error.response.status_code == 429: | |
| # print(f"β HTTP 429 μλ¬ κ°μ§: {error.response.status_code}") | |
| return True | |
| # 2. ν μ€νΈ κΈ°λ° κ°μ§ (λ°±μ ) | |
| error_lower = error_str.lower() | |
| if ("429" in error_lower or | |
| "rate" in error_lower or | |
| "limit" in error_lower or | |
| "too_many_requests" in error_lower or | |
| "request limit" in error_lower): | |
| # print(f"β ν μ€νΈ κΈ°λ° 429 μλ¬ κ°μ§") | |
| return True | |
| return False | |
| def call_llm_api(self, prompt:str, current_date:str) -> str: | |
| """LLM API νΈμΆ ν¨μ (ν€ νμ λ° λ°±μ€ν μ§μ)""" | |
| from src.api_key_rotator import get_rotator | |
| rotator = get_rotator() | |
| num_keys = len(rotator.keys) | |
| base_delay = 3.0 | |
| def _make_api_call(eval_instance: FreshEval) -> str: | |
| """API νΈμΆ ν¬νΌ ν¨μ""" | |
| if eval_instance.chat_completions: | |
| # Chat completions API | |
| response = eval_instance.client.chat.completions.create( | |
| model=eval_instance.model, | |
| temperature=eval_instance.temperature, | |
| max_tokens=eval_instance.max_tokens, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": ( | |
| f"You are a helpful assistant. Respond as concisely as possible. Knowledge cutoff: {current_date}." | |
| ) | |
| }, | |
| { | |
| "role": "user", | |
| "content": "What's today's date?" | |
| }, | |
| { | |
| "role": "assistant", | |
| "content": f"Today is {current_date} in Pacific Standard Time." | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| ) | |
| return response.choices[0].message.content | |
| else: | |
| # Completions API | |
| response = eval_instance.client.completions.create( | |
| model=eval_instance.model, | |
| temperature=eval_instance.temperature, | |
| max_tokens=eval_instance.max_tokens, | |
| prompt=prompt, | |
| ) | |
| return response.choices[0].text | |
| # νμ¬ ν€λ‘ μμ | |
| current_key = self.api_key | |
| current_instance = FreshEval(model=self.model, api_key=current_key) | |
| # ν€κ° 1κ°μΈ κ²½μ°: κΈ°μ‘΄ λ°±μ€ν λ‘μ§λ§ μ¬μ© | |
| if num_keys == 1: | |
| max_retries = 7 | |
| for attempt in range(max_retries): | |
| try: | |
| return _make_api_call(current_instance) | |
| except Exception as e: | |
| if self._is_rate_limit_error(e): | |
| if attempt < max_retries - 1: | |
| # μ§μμ λ°±μ€ν | |
| delay = base_delay * (2 ** attempt) + random.uniform(0, 1) | |
| time.sleep(delay) | |
| continue | |
| # else: | |
| # print(f"β μ΅λ μ¬μλ νμ μ΄κ³Ό") | |
| raise e | |
| # max_retries μ΄κ³Όν λκΉμ§ return λμ§ μμΌλ©΄ μλ¬ λ°μ | |
| raise Exception("call llm api:μ΅λ μ¬μλ νμ μ΄κ³Ό") | |
| # ν€κ° 2κ° μ΄μμΈ κ²½μ°: ν€ μ ν λ‘μ§ (3μ΄ λκΈ° ν¬ν¨) | |
| # μ±κ³΅ν λκΉμ§ ν€λ₯Ό μννλ©° μλ (μ΅λ λͺ¨λ ν€λ₯Ό 3λ°ν΄κΉμ§) | |
| max_attempts = num_keys * 3 # λͺ¨λ ν€λ₯Ό μ΅λ 3λ°ν΄κΉμ§ μλ | |
| key_attempt_count = 0 | |
| # νμ¬ ν€λ‘ 첫 μλ | |
| for attempt in range(max_attempts): | |
| try: | |
| return _make_api_call(current_instance) # μ±κ³΅νλ©΄ μ¦μ λ°ν | |
| except Exception as e: | |
| if self._is_rate_limit_error(e): | |
| key_attempt_count += 1 | |
| # λ€μ ν€λ‘ μ ννκΈ° μ μ 2μ΄ λκΈ° | |
| time.sleep(2) | |
| current_key = rotator.pick_key() | |
| # print("π ν€ μ ν") | |
| current_instance = FreshEval(model=self.model, api_key=current_key) | |
| continue # λ€μ ν€λ‘ κ³μ μλ | |
| else: | |
| # 429κ° μλ μλ¬λ μ¦μ μ ν | |
| raise | |
| # μ΅λ μλ νμ μ΄κ³Ό (λͺ¨λ ν€λ₯Ό μ¬λ¬ λ°ν΄ μλνμ§λ§ λͺ¨λ μ€ν¨) | |
| raise Exception(f"λͺ¨λ API ν€μμ 429 μλ¬ λ°μ (μ΅λ {max_attempts}ν μλ)") | |
| def call_fresheval(self, mode:str, question:str, evaluation:str, current_date:str) -> str: | |
| """FreshEval νκ° ν¨μ""" | |
| fresheval_question = f'\nquestion: {question}{evaluation}' | |
| # νκ²½λ³μ κΈ°λ° ν둬ννΈ(본체: prefix + demo) μ°μ μ¬μ© | |
| env_prompt_body = None | |
| if mode == 'Relaxed': | |
| env_prompt_body = Config.FRESHQA_PROMPT_RELAXED | |
| elif mode == 'Strict': | |
| env_prompt_body = Config.FRESHQA_PROMPT_STRICT | |
| if env_prompt_body and str(env_prompt_body).strip(): | |
| base_prompt = str(env_prompt_body).strip() | |
| else: | |
| raise ValueError(f"{mode} νκ° ν둬ννΈ μ€μ μ΄ μμ΅λλ€.") | |
| fresheval_prompt = base_prompt + fresheval_question | |
| # νκ° | |
| answer = self.call_llm_api(fresheval_prompt, current_date) | |
| return answer | |
| def extract_ratings(self, response:str) -> Tuple[bool, Dict[str, str]]: | |
| """νκ° κ²°κ³Όμμ λ±κΈ μΆμΆ""" | |
| def _clean(text: str) -> str: | |
| # μλ μ₯μ/곡백 μ κ±° + λ΄λΆ νμ μ 리 + μλ¬Έμν | |
| text = re.sub(r'^[*`_~\s]+|[*`_~\s]+$', '', text) | |
| text = re.sub(r'[*`_~]', '', text) | |
| return text.strip().strip('.').strip().lower() | |
| def _judge(val: str): | |
| """ | |
| λ¬Έμμ΄μμ correct/incorrect νμ . | |
| - 'incorrect'κ° λ³΄μ΄λ©΄ 무쑰건 FALSE | |
| - 'partially correct'λ λͺ¨νΈ β None | |
| - 'correct'λ TRUE | |
| """ | |
| if re.search(r'(?i)\bincorrect\b', val): | |
| return 'FALSE' | |
| if re.search(r'(?i)\bpartial(?:ly)?\s+correct\b', val): | |
| return None | |
| if re.search(r'(?i)\bcorrect\b', val): | |
| return 'TRUE' | |
| return None | |
| def _from_label(block_label: str): | |
| """ | |
| λΌλ²¨(μ: 'Final Evaluation' λλ 'Evaluation') κΈ°μ€μΌλ‘ | |
| - κ°μ μ€ μΊ‘μ² λ¨Όμ μλ | |
| - μ€ν¨νλ©΄ λΌλ²¨ μ΄ν ~ λ€μ λΉ μ€ μ΄μ λ²μμμ νμ ν€μλ νμ | |
| """ | |
| # κ°μ μ€ μΊ‘μ²: λΌλ²¨ Β± μ₯μ Β± μ½λ‘ μ΄ν ~ μ€λ | |
| same_line = re.search( | |
| rf'(?i){block_label}\s*(?:[*`_~]*\s*:\s*|:\s*[*`_~]*)\s*([^\r\n]+)', | |
| response | |
| ) | |
| if same_line: | |
| val = _clean(same_line.group(1)) | |
| j = _judge(val) | |
| if j is not None: | |
| return j | |
| # μμΉλ§ μ°Ύκ³ (κ° μμ΄ μ€λ°κΏλ μΌμ΄μ€), λ€μ λΉ μ€(or μΉμ ) μ κΉμ§ μ€μΊ | |
| pos = re.search( | |
| rf'(?i){block_label}\s*(?:[*`_~]*\s*:\s*|:\s*[*`_~]*)', | |
| response | |
| ) | |
| if pos: | |
| tail = response[pos.end():] | |
| # λ€μ 'λΉ μ€(μ°μ κ°ν)' λλ λ€μ μΉμ μμ μ κΉμ§λ§ λ³Έλ€ (λ무 λ©λ¦¬ μκ°λλ‘) | |
| m_stop = re.search(r'\n\s*\n', tail) | |
| segment = tail[:m_stop.start()] if m_stop else tail[:300] # μμ ν μν | |
| seg_clean = _clean(segment) | |
| j = _judge(seg_clean) | |
| if j is not None: | |
| return j | |
| return None | |
| # 1) Final Evaluation μ΅μ°μ | |
| final_judgement = _from_label('final\s+evaluation') | |
| if final_judgement: | |
| return True, {'rating': final_judgement} | |
| # 2) Evaluation | |
| eval_judgement = _from_label('evaluation') | |
| if eval_judgement: | |
| return True, {'rating': eval_judgement} | |
| # 3) ν΄λ°±: credited λ¬Έμ₯ | |
| if re.search(r'(?i)thus,\s*the\s*response\s*is\s*credited\b', response): | |
| return True, {'rating': 'TRUE'} | |
| if re.search(r'(?i)thus,\s*the\s*response\s*is\s*not\s*credited\b', response): | |
| return True, {'rating': 'FALSE'} | |
| # 4) μ€ν¨ | |
| return False, {'rating': None} | |
| def evaluate_single_row(self, row: pd.Series, mode: str, current_date:str) -> Dict[str, Any]: | |
| """λ¨μΌ ν νκ°""" | |
| question = row['question'] | |
| response = row['model_response'] | |
| correct_answers = [row[f'answer_{i}'] for i in range(10)] | |
| correct_answers = [str(x) for x in correct_answers if pd.notna(x) and str(x).strip()] | |
| # model_responseκ° λΉμ΄μκ±°λ NaNμΈ κ²½μ° λ°λ‘ νλ Έλ€λ κ²°κ³Όλ‘ μ²λ¦¬νκ³ return | |
| if pd.isna(response) or (isinstance(response, str) and response.strip() == ''): | |
| # print('model_responseκ° λΉμ΄μμ. rating=0μΌλ‘ μ²λ¦¬') | |
| row_dict = row.to_dict() | |
| row_dict['rating'] = 0 | |
| row_dict['explanation'] = "model_responseκ° λΉμ΄μμ" | |
| return row_dict | |
| # νκ° ν νλ¦Ώ μμ± | |
| evaluation_template = ( | |
| "\ncorrect answer(s): {correct_answers}" | |
| "\nresponse: {response}" | |
| "\ncomment: " | |
| ) | |
| evaluation = evaluation_template.format( | |
| correct_answers=' | '.join(correct_answers), | |
| response=response, | |
| ) | |
| # νκ° | |
| fresheval_response = self.call_fresheval( | |
| mode=mode, | |
| question=question, | |
| evaluation=evaluation, | |
| current_date=current_date | |
| ) | |
| is_valid_eval, eval_result = self.extract_ratings(fresheval_response) | |
| # if is_valid_eval: | |
| # print('μλ£') | |
| # μ¬νκ° νμ μ ν (μ΅λ 5ν) | |
| max_retries = 5 | |
| retry_count = 0 | |
| # μ¬μλ loop | |
| while not is_valid_eval and retry_count < max_retries: | |
| retry_count += 1 | |
| # print(f'μ ν¨νμ§ μμ νκ°, μ¬νκ° μ€... ({retry_count}/{max_retries})\n response: {fresheval_response}') | |
| fresheval_response = self.call_fresheval( | |
| mode=mode, | |
| question=question, | |
| evaluation=evaluation, | |
| current_date=current_date | |
| ) | |
| is_valid_eval, eval_result = self.extract_ratings(fresheval_response) | |
| # if is_valid_eval: | |
| # print('μλ£') | |
| # μ΅λ μ¬μλ νμ μ΄κ³Ό μ κΈ°λ³Έ κ° μ¬μ© | |
| if not is_valid_eval: | |
| # print(f'β οΈ μ΅λ μ¬μλ νμ({max_retries}) μ΄κ³Ό. κΈ°λ³Έκ° μ¬μ©: rating=0') | |
| eval_result = {'rating': 0} | |
| fresheval_response = "μ¬μλ νμ μ΄κ³Όλ‘ μΈν κΈ°λ³Έ νκ°" | |
| row_dict = row.to_dict() | |
| row_dict['rating'] = eval_result['rating'] | |
| row_dict['explanation'] = fresheval_response | |
| # π DEBUG: FALSEμΈ κ²½μ°μλ§ μμΈ μΆλ ₯ | |
| # if eval_result['rating'] == 'FALSE': | |
| # print(f"\n{'='*80}") | |
| # print(f"β FALSE νκ°λ μ§λ¬Έ") | |
| # print(f" Mode: {mode}") | |
| # print(f" Question: {question}") | |
| # print(f" Correct Answers: {' | '.join(correct_answers)}") | |
| # print(f" Model Response: {response}") | |
| # print(f"\n LLM νκ° μλ΅:") | |
| # print(f" {fresheval_response}") | |
| # print(f" μ΅μ’ Rating: {eval_result['rating']}") | |
| # print(f"{'='*80}\n") | |
| return row_dict | |
| def evaluate_dataframe(self, df: pd.DataFrame, mode: str) -> pd.DataFrame: | |
| """λ°μ΄ν°νλ μ νκ°""" | |
| freshevals = [] | |
| current_date = get_current_date_str() | |
| len_df = len(df) | |
| for index, row in df.iterrows(): | |
| print(f'{mode} νκ° μ€... {index+1}/{len_df}') | |
| row_dict = self.evaluate_single_row(row, mode, current_date) | |
| freshevals.append(row_dict) | |
| return pd.DataFrame(freshevals) |