Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from ast import literal_eval | |
| import pandas as pd | |
| import re | |
| from src.display.formatting import has_no_nan_values, make_clickable_model | |
| from src.display.utils import AutoEvalColumn, EvalQueueColumn | |
| from src.leaderboard.read_evals import get_raw_eval_results | |
| from src.about import ( | |
| nc_tasks, | |
| nr_tasks, | |
| lp_tasks, | |
| ) | |
| def sanitize_string(input_string): | |
| # Remove leading and trailing whitespace | |
| input_string = input_string.strip() | |
| # Remove leading whitespace on each line | |
| sanitized_string = re.sub(r'(?m)^\s+', '', input_string) | |
| return sanitized_string | |
| ''' | |
| def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: | |
| """Creates a dataframe from all the individual experiment results""" | |
| raw_data = get_raw_eval_results(results_path, requests_path) | |
| all_data_json = [v.to_dict() for v in raw_data] | |
| df = pd.DataFrame.from_records(all_data_json) | |
| #df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) | |
| #df = df[cols].round(decimals=2) | |
| # filter out if any of the benchmarks have not been produced | |
| #df = df[has_no_nan_values(df, benchmark_cols)] | |
| return raw_data, df | |
| ''' | |
| # Function to extract the numerical part before '+' | |
| def extract_x(value): | |
| return float(value.split('+')[0]) | |
| # Function to highlight the highest (or lowest) value based on X | |
| def make_bold(df, cols, ascending): | |
| df_highlight = df.copy() | |
| def apply_highlight(s): | |
| if ascending: | |
| max_idx = s.apply(extract_x).idxmin() | |
| else: | |
| max_idx = s.apply(extract_x).idxmax() | |
| return ['font-weight: bold' if i == max_idx else '' for i in range(len(s))] | |
| styler = df_highlight.style.apply(lambda x: apply_highlight(x) if x.name in cols else ['']*len(x), axis=0) | |
| return styler | |
| def format_number(num): | |
| return f"{num:.3f}" | |
| def get_leaderboard_df(EVAL_REQUESTS_PATH, task_type) -> pd.DataFrame: | |
| if task_type in ['Node Classification', 'Entity Classification']: | |
| ascending = False | |
| tasks = nc_tasks | |
| task_type = ['Node Classification', 'Entity Classification'] | |
| elif task_type in ['Node Regression', 'Entity Regression']: | |
| ascending = True | |
| tasks = nr_tasks | |
| task_type = ['Node Regression', 'Entity Regression'] | |
| elif task_type in ['Link Prediction', 'Recommendation']: | |
| ascending = False | |
| tasks = lp_tasks | |
| task_type = ['Link Prediction', 'Recommendation'] | |
| model_result_filepaths = [] | |
| for root,_, files in os.walk(EVAL_REQUESTS_PATH): | |
| if len(files) == 0 or any([not f.endswith(".json") for f in files]): | |
| continue | |
| for file in files: | |
| model_result_filepaths.append(os.path.join(root, file)) | |
| model_res = [] | |
| for model in model_result_filepaths: | |
| import json | |
| with open(model) as f: | |
| out = json.load(f) | |
| if ('task' in out) and (out['task'] in task_type): | |
| model_res.append(out) | |
| for model in model_res: | |
| model["test"] = literal_eval(model["test"].split('}')[0]+'}') | |
| model["valid"] = literal_eval(model["valid"].split('}')[0]+'}') | |
| #model["params"] = int(model["params"]) | |
| model['submitted_time'] = model['submitted_time'].split('T')[0] | |
| #model['paper_url'] = '[Link](' + model['paper_url'] + ')' | |
| #model['github_url'] = '[Link](' + model['github_url'] + ')' | |
| name2short_name = {task.value.benchmark: task.value.benchmark for task in tasks} | |
| for model in model_res: | |
| model.update({ | |
| name2short_name[i]: (f"{format_number(model['test'][i][0])} ± {format_number(model['test'][i][1])}" if i in model['test'] else '-') | |
| for i in name2short_name | |
| }) | |
| columns_to_show = ['model', 'author', 'email', 'paper_url', 'github_url', 'submitted_time'] + list(name2short_name.values()) | |
| # Check if model_res is empty | |
| if len(model_res) > 0: | |
| df_res = pd.DataFrame([{col: model[col] for col in columns_to_show} for model in model_res]) | |
| else: | |
| # Initialize an empty DataFrame with the desired columns | |
| df_res = pd.DataFrame(columns=columns_to_show) | |
| #df_res = pd.DataFrame([{col: model[col] for col in columns_to_show} for model in model_res]) | |
| ranks = df_res[list(name2short_name.values())].rank(ascending = ascending) | |
| df_res.rename(columns={'model': 'Model', 'author': 'Author', 'email': 'Email', 'paper_url': 'Paper URL', 'github_url': 'Github URL', 'submitted_time': 'Time'}, inplace=True) | |
| df_res.Model.replace('Relbench User Study', 'Human Data Scientist', inplace=True) | |
| df_res['Average Rank⬆️'] = ranks.mean(axis=1) | |
| df_res.sort_values(by='Average Rank⬆️', ascending=True, inplace=True) | |
| #df_res = make_bold(df_res, list(name2short_name.values()), ascending = ascending) | |
| print(df_res) | |
| return df_res | |
| def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: | |
| """Creates the different dataframes for the evaluation queues requestes""" | |
| entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] | |
| all_evals = [] | |
| for entry in entries: | |
| if ".json" in entry: | |
| file_path = os.path.join(save_path, entry) | |
| with open(file_path) as fp: | |
| data = json.load(fp) | |
| data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
| data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
| all_evals.append(data) | |
| elif ".md" not in entry: | |
| # this is a folder | |
| sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if not e.startswith(".")] | |
| for sub_entry in sub_entries: | |
| file_path = os.path.join(save_path, entry, sub_entry) | |
| with open(file_path) as fp: | |
| data = json.load(fp) | |
| data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
| data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
| all_evals.append(data) | |
| pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] | |
| running_list = [e for e in all_evals if e["status"] == "RUNNING"] | |
| finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] | |
| df_pending = pd.DataFrame.from_records(pending_list, columns=cols) | |
| df_running = pd.DataFrame.from_records(running_list, columns=cols) | |
| df_finished = pd.DataFrame.from_records(finished_list, columns=cols) | |
| return df_finished[cols], df_running[cols], df_pending[cols] | |