Spaces:
Build error
Build error
| # Author: Firqa Aqila Noor Arasyi | |
| # Date: 2023-12-04 | |
| import os | |
| import io | |
| import json | |
| import pandas as pd | |
| import streamlit as st | |
| from stqdm import stqdm | |
| from ast import literal_eval | |
| from tempfile import NamedTemporaryFile | |
| from json_repair import repair_json | |
| import PyPDF2 | |
| import pdf2image | |
| import pytesseract | |
| from utils import * | |
| from schema import * | |
| from summ import get_summ | |
| from datetime import datetime | |
| import time | |
| import base64 | |
| import string | |
| import random | |
| import numpy as np | |
| from langchain.llms import OpenAI | |
| from langchain.chains import RetrievalQA | |
| from langchain.vectorstores import Chroma | |
| from langchain.chat_models import ChatOpenAI | |
| from langchain.document_loaders import TextLoader | |
| from chromadb.utils import embedding_functions | |
| from unstructured.partition.pdf import partition_pdf | |
| from unstructured.staging.base import elements_to_json | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain.embeddings.openai import OpenAIEmbeddings | |
| from langchain.chains import create_extraction_chain | |
| from Bio import Entrez | |
| nltk.download("punkt") | |
| os.environ['OPENAI_API_KEY'] = os.getenv("OPENAI_API_KEY") | |
| Entrez.email = os.getenv("ENTREZ_EMAIL") | |
| Entrez.api_key = os.getenv("ENTREZ_API_KEY") | |
| fold = -1 | |
| buffer = io.BytesIO() | |
| st.cache_data() | |
| def convert_df(df): | |
| return df.to_csv().encode("utf-8") | |
| # Function to create a download link for an Excel file | |
| # def create_excel_download_link(df, file_name): | |
| # output = io.BytesIO() | |
| # with pd.ExcelWriter(output, engine='xlsxwriter') as writer: | |
| # df.to_excel(writer, sheet_name='Sheet1', index=False) | |
| # excel_data = output.getvalue() | |
| # st.download_button(label="Download Excel File", data=excel_data, key=file_name, file_name=f"{file_name}.xlsx") | |
| class Journal: | |
| def __init__(self, name, bytes): | |
| self.name = name | |
| self.bytes = bytes | |
| def __repr__(self): | |
| return f"Journal(name='{self.name}', bytes='{self.bytes}')" | |
| llm = ChatOpenAI(temperature=0, model="gpt-4-0125-preview") | |
| textex_chain = create_extraction_chain(textex_schema, llm) | |
| tablex_chain = create_extraction_chain(tablex_schema, llm) | |
| st.set_page_config(page_title="NutriGenMe Paper Extractor") | |
| st.title("NutriGenMe - Paper Extraction") | |
| st.markdown("<div style='text-align: left; color: white; font-size: 16px'>In its latest version, the app is equipped to extract essential information from papers, including tables in both horizontal and vertical orientations, images, and text exclusively.</div><br>", unsafe_allow_html=True) | |
| uploaded_files = st.file_uploader("Upload Paper(s) here :", type="pdf", accept_multiple_files=True) | |
| if uploaded_files: | |
| st.warning(""" | |
| Warning! Prior to proceeding, please take a moment to review the following : \n | |
| Certain guidelines apply when utilizing this application, particularly if you intend to extract information from tables, whether they are oriented horizontally or vertically. | |
| - If you intend to perform multiple PDF processes using Horizontal Table Extraction, ensure that all your PDF files adhere to a horizontal table format | |
| - If you plan to undertake multiple PDF processes with Vertical Table Extraction, ensure that all your PDF files conform to a vertical table format | |
| """, icon="β οΈ") | |
| col1, col2, col3 = st.columns(3) | |
| if uploaded_files: | |
| journals = [] | |
| strategy = "hi_res" | |
| model_name = "yolox" | |
| on_h, on_v, on_t = None, None, None | |
| parseButtonH, parseButtonV, parseButtonT = None, None, None | |
| # if uploaded_files: | |
| with col1: | |
| if on_v or on_t: | |
| on_h = st.toggle("Horizontal Table Extraction", disabled=True) | |
| else: | |
| on_h = st.toggle("Horizontal Table Extraction") | |
| if on_h: | |
| chunk_size_h = st.selectbox( | |
| 'Tokens amounts per process :', | |
| (120000, 96000, 64000, 32000), key='table_h' | |
| ) | |
| parseButtonH = st.button("Get Result", key='table_H') | |
| with col2: | |
| if on_h or on_t: | |
| on_v = st.toggle("Vertical Table Extraction", disabled=True) | |
| else: | |
| on_v = st.toggle("Vertical Table Extraction") | |
| if on_v: | |
| chunk_size_v = st.selectbox( | |
| 'Tokens amounts per process :', | |
| (120000, 96000, 64000, 32000), key='table_v' | |
| ) | |
| parseButtonV = st.button("Get Result", key='table_V') | |
| with col3: | |
| if on_h or on_v: | |
| on_t = st.toggle("Text Extraction ", disabled=True) | |
| else: | |
| on_t = st.toggle("Text Extraction ") | |
| if on_t: | |
| chunk_size_t = st.selectbox( | |
| 'Tokens amounts per process :', | |
| (120000, 96000, 64000, 32000), key='no_table' | |
| ) | |
| parseButtonT = st.button("Get Result", key="no_Table") | |
| if on_h: | |
| if parseButtonH: | |
| with st.status("Extraction in progress ...", expanded=True) as status: | |
| st.write("Getting Result ...") | |
| csv = pd.DataFrame() | |
| for uploaded_file in stqdm(uploaded_files): | |
| with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf: | |
| pdf.write(uploaded_file.getbuffer()) | |
| # st.write(pdf.name) | |
| L = [] | |
| # Entity Extraction | |
| st.write("β Extracting Entities ...") | |
| bytes_data = uploaded_file.read() | |
| journal = Journal(uploaded_file.name, bytes_data) | |
| images = pdf2image.convert_from_bytes(journal.bytes) | |
| extracted_text = "" | |
| for image in images[:-1]: | |
| text = pytesseract.image_to_string(image) | |
| text = clean_text(text) | |
| extracted_text += text + " " | |
| text = replace_quotes(extracted_text) | |
| text_chunk = split_text(text, chunk_size_h) | |
| chunkdf = [] | |
| for i, chunk in enumerate(text_chunk): | |
| inp = chunk | |
| try: | |
| # Assuming tablex_chain.run(inp)[0] returns a dictionary | |
| original_dict = tablex_chain.run(inp)[0] | |
| # Convert the dictionary to a JSON string | |
| json_str = json.dumps(original_dict) | |
| # Replace single quotes with double quotes in the JSON string | |
| json_str_fixed = json_str.replace("'", '"') | |
| # Use literal_eval to safely evaluate the JSON string as a Python dictionary | |
| fixed_dict = literal_eval(json_str_fixed) | |
| # Create a DataFrame from the fixed dictionary | |
| df = pd.DataFrame(fixed_dict, index=[0]).fillna('') | |
| except: | |
| try: | |
| df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0])).replace("\'", '\"')), index=[0]).fillna('') | |
| except: | |
| try: | |
| df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0]) + ']').replace("\'", '\"')), index=[0]).fillna('') | |
| except SyntaxError: | |
| df = pd.DataFrame(literal_eval('[' + str(json.dumps(tablex_chain.run(inp)[0]) + ']').replace("\'", '\"')), index=[0]).fillna('') | |
| # df = pd.DataFrame(repair_json(tablex_chain.run(inp)[0])) | |
| chunkdf.append(df) | |
| concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') | |
| st.write("β Entities Extraction Done ..") | |
| time.sleep(0.1) | |
| st.write("β Generating Summary ...") | |
| summary = get_summ(pdf.name) | |
| st.write("β Generating Summary Done ..") | |
| time.sleep(0.1) | |
| st.write("β Table Extraction in progress ...") | |
| # Table Extraction | |
| # L = [] | |
| output_list = [] | |
| try: | |
| elements = partition_pdf(filename=pdf.name, strategy=strategy, infer_table_structure=True, model_name=model_name) | |
| except: | |
| elements = partition_pdf(filename=pdf.name, strategy=strategy, infer_table_structure=True) | |
| with NamedTemporaryFile(dir=".", suffix=".json") as f: | |
| elements_to_json(elements, filename=f"{f.name.split('/')[-1]}") | |
| json_file_path = os.path.abspath(f.name) # Get the absolute file path | |
| with open(json_file_path, "r", encoding="utf-8") as jsonfile: | |
| data = json.load(jsonfile) | |
| extracted_elements = [] | |
| for entry in data: | |
| if entry["type"] == "Table": | |
| extracted_elements.append(entry["metadata"]["text_as_html"]) | |
| with NamedTemporaryFile(dir='.' , suffix='.txt') as txt_file: | |
| text_file_path = os.path.abspath(txt_file.name) | |
| with open(text_file_path, "w", encoding="utf-8") as txtfile: | |
| for element in extracted_elements: | |
| txtfile.write(element + "\n\n") | |
| loader = TextLoader(text_file_path) | |
| documents = loader.load() | |
| # split it into chunks | |
| text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200, separator="\n") | |
| docs = text_splitter.split_documents(documents) | |
| embeddings = OpenAIEmbeddings() | |
| db = Chroma.from_documents(docs, embeddings) | |
| llm_table = ChatOpenAI(model_name="gpt-4-0125-preview", temperature=0) | |
| qa_chain = RetrievalQA.from_chain_type(llm_table, retriever=db.as_retriever()) | |
| # List of questions | |
| questions = [ | |
| """Mention all genes / locus name with respective rsID / SNP and potential diseases in a curly brackets like this: | |
| Example 1 : {"Genes" : "FTO", "SNPs" : "rs9939609", "Diseases" : "Obesity"} | |
| """, | |
| """Mention all genes / locus name with respective potential diseases in a curly brackets like this: | |
| Example 2 : {"Genes" : "FTO", "SNPs" : "" (if not available), "Diseases" : "Obesity"} | |
| """, | |
| """Mention all rsIDs / SNPs / Variant with respective potential diseases / traits in a curly brackets like this: | |
| Example 3 : {"Genes" : "", "SNPs" : "rs9939609", "Diseases" : "Obesity"} | |
| """ | |
| ] | |
| try: | |
| for query in questions: | |
| response = qa_chain({"query" : query}) | |
| output_list.append(response) | |
| except Exception as e: | |
| pass | |
| db.delete_collection() | |
| # 1 | |
| for i in range(len(output_list[0]['result'].split('\n'))): | |
| # st.write(output_list[0]['result'].split('\n')) | |
| if output_list[0]['result'].split('\n')[i] != "": | |
| try: | |
| row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i]))[0] | |
| st.write(row) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| # 'Population' : concat['population_race'][0], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| # 'Sample Size' : concat['sample_size'][0] | |
| }} | |
| if len(row['Genes'].strip().split(',')) > 1: | |
| for g in row['Genes'].strip().split(','): | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| # 'Population' : concat['population_race'][0], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| # 'Sample Size' : concat['sample_size'][0], | |
| 'Genes' : g.strip().upper().replace('Unknown', ''), | |
| 'SNPs' : row['SNPs'].replace('Unknown', ''), | |
| "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') | |
| }) | |
| else: | |
| L.append(row) | |
| except KeyError: | |
| row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i])) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| # 'Population' : concat['population_race'][0], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| # 'Sample Size' : concat['sample_size'][0] | |
| } | |
| } | |
| if len(row['Genes'].strip().split(',')) > 1: | |
| for g in row['Genes'].strip().split(','): | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| 'Genes' : g.strip().upper().replace('Unknown', ''), | |
| 'SNPs' : row['SNPs'].replace('Unknown', ''), | |
| "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') | |
| }) | |
| else: | |
| L.append(row) | |
| except SyntaxError: | |
| row = literal_eval(repair_json(output_list[0]['result'].split('\n')[i])) | |
| row = f"""{row}""" | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| # 'Population' : concat['population_race'][0], | |
| # 'Sample Size' : concat['sample_size'][0] | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except ValueError: | |
| if type(output_list[0]['result'].split('\n')[i]) is dict: | |
| row = repair_json(output_list[0]['result'].split('\n')[i]) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| # 2 | |
| for i in range(len(output_list[1]['result'].split('\n'))): | |
| if output_list[1]['result'].split('\n')[i] != "": | |
| try: | |
| row = literal_eval(repair_json(output_list[1]['result'].split('\n')[i]))[0] | |
| st.write(row) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if row['SNPs'] != "Not available": | |
| row.update({ | |
| 'SNPs' : "Not available" | |
| }) | |
| if len(row['Genes'].strip().split(',')) > 1: | |
| for g in row['Genes'].strip().split(','): | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| 'Genes' : g.strip().upper().replace('Unknown', ''), | |
| "SNPs" : "Not available", | |
| "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') | |
| }) | |
| else: | |
| L.append(row) | |
| except KeyError: | |
| row = literal_eval(repair_json(output_list[1]['result'].split('\n')[i])) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if 'SNPs' in list(row.keys()): | |
| if row['SNPs'] != "Not available": | |
| row.update({ | |
| 'SNPs' : "Not available" | |
| }) | |
| else: | |
| row.update({ | |
| 'SNPs' : "Not available" | |
| }) | |
| if 'Genes' in list(row.keys()): | |
| if len(row['Genes'].strip().split(',')) > 1: | |
| for g in row['Genes'].strip().split(','): | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| 'Genes' : g.strip().upper().replace('Unknown', ''), | |
| "SNPs" : "Not available", | |
| "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', '').replace('Unknown', '') | |
| }) | |
| else: | |
| L.append(row) | |
| except SyntaxError: | |
| row = f"""{row}""" | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except ValueError: | |
| if type(output_list[1]['result'].split('\n')[i]) is dict: | |
| row = output_list[1]['result'].split('\n')[i] | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| # 3 | |
| for i in range(len(output_list[2]['result'].split('\n'))): | |
| if output_list[2]['result'].split('\n')[i] != "": | |
| try: | |
| row = literal_eval(repair_json(output_list[2]['result'].split('\n')[i]))[0] | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except KeyError: | |
| row = literal_eval(repair_json(output_list[2]['result'].split('\n')[i])) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except SyntaxError: | |
| row = f"""{row}""" | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except ValueError: | |
| if type(output_list[2]['result'].split('\n')[i]) is dict: | |
| row = output_list[2]['result'].split('\n')[i] | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| st.write("β Table Extraction Done ...") | |
| status.update(label="Gene and SNPs succesfully collected.") | |
| L = [{key: ''.join(['' if item == 'Unknow' else item for item in value]) for key, value in d.items()} for d in L] | |
| L = [{key: ''.join(['Not Available' if item == '' else item for item in value]) for key, value in d.items()} for d in L] | |
| csv = pd.DataFrame(L) | |
| st.dataframe(csv) | |
| generated_key = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(16)) | |
| # if st.button("Download Excel File", key=generated_key): | |
| # excel_link = create_excel_download_link(csv, uploaded_file.name.replace('.pdf', '')) | |
| # st.markdown(excel_link, unsafe_allow_html=True) | |
| with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: | |
| # Write each dataframe to a different worksheet | |
| csv.to_excel(writer, sheet_name='Result') | |
| writer.close() | |
| # time_now = datetime.now() | |
| # current_time = time_now.strftime("%H:%M:%S") | |
| csv = convert_df(csv) | |
| st.download_button( | |
| label="Save Result", | |
| data=buffer, | |
| file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx', | |
| mime='application/vnd.ms-excel', | |
| key=generated_key | |
| ) | |
| if on_v: | |
| if parseButtonV: | |
| with st.status("Extraction in progress ...", expanded=True) as status: | |
| st.write("Getting Result ...") | |
| csv = pd.DataFrame() | |
| for uploaded_file in stqdm(uploaded_files): | |
| L = [] | |
| with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf: | |
| pdf.write(uploaded_file.getbuffer()) | |
| # Open the PDF file in read-binary mode | |
| with open(pdf.name, 'rb') as pdf_file: | |
| # Create a PDF reader object | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| # Create a PDF writer object to write the rotated pages to a new PDF | |
| pdf_writer = PyPDF2.PdfWriter() | |
| # Iterate through each page in the original PDF | |
| for page_num in range(len(pdf_reader.pages)): | |
| # Get the page object | |
| page = pdf_reader.pages[page_num] | |
| # Rotate the page 90 degrees clockwise (use -90 for counterclockwise) | |
| page.rotate(90) | |
| # Add the rotated page to the PDF writer | |
| pdf_writer.add_page(page) | |
| with NamedTemporaryFile(dir='.', suffix=".pdf") as rotated_pdf: | |
| pdf_writer.write(rotated_pdf.name) | |
| # Entity Extraction | |
| st.write("β Extracting Entities ...") | |
| bytes_data = uploaded_file.read() | |
| journal = Journal(uploaded_file.name, bytes_data) | |
| images = pdf2image.convert_from_bytes(journal.bytes) | |
| extracted_text = "" | |
| for image in images[:-1]: | |
| text = pytesseract.image_to_string(image) | |
| text = clean_text(text) | |
| extracted_text += text + " " | |
| text = replace_quotes(extracted_text) | |
| text_chunk = split_text(text, chunk_size_v) | |
| chunkdf = [] | |
| for i, chunk in enumerate(text_chunk): | |
| inp = chunk | |
| # Assuming tablex_chain.run(inp)[0] returns a dictionary | |
| original_dict = tablex_chain.run(inp)[0] | |
| # Convert the dictionary to a JSON string | |
| json_str = json.dumps(original_dict) | |
| # Replace single quotes with double quotes in the JSON string | |
| json_str_fixed = json_str.replace("'", '"') | |
| # Use literal_eval to safely evaluate the JSON string as a Python dictionary | |
| fixed_dict = literal_eval(json_str_fixed) | |
| # Create a DataFrame from the fixed dictionary | |
| df = pd.DataFrame(fixed_dict, index=[0]).fillna('') | |
| # df = pd.DataFrame(literal_eval(str(json.dumps(tablex_chain.run(inp)[0])).replace("\'", '\"')), index=[0]).fillna('') | |
| chunkdf.append(df) | |
| concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') | |
| st.write("β Entities Extraction Done ..") | |
| time.sleep(0.1) | |
| st.write("β Generating Summary ...") | |
| summary = get_summ(pdf.name) | |
| st.write("β Generating Summary Done ..") | |
| time.sleep(0.1) | |
| st.write("β Table Extraction in progress ...") | |
| # Table Extraction | |
| output_list = [] | |
| elements = partition_pdf(filename=rotated_pdf.name, strategy=strategy, infer_table_structure=True, model_name=model_name) | |
| with NamedTemporaryFile(dir=".", suffix=".json") as f: | |
| elements_to_json(elements, filename=f"{f.name.split('/')[-1]}") | |
| json_file_path = os.path.abspath(f.name) # Get the absolute file path | |
| with open(json_file_path, "r", encoding="utf-8") as jsonfile: | |
| data = json.load(jsonfile) | |
| extracted_elements = [] | |
| for entry in data: | |
| if entry["type"] == "Table": | |
| extracted_elements.append(entry["metadata"]["text_as_html"]) | |
| with NamedTemporaryFile(dir='.' , suffix='.txt') as txt_file: | |
| text_file_path = os.path.abspath(txt_file.name) | |
| with open(text_file_path, "w", encoding="utf-8") as txtfile: | |
| for element in extracted_elements: | |
| txtfile.write(element + "\n\n") | |
| loader = TextLoader(text_file_path) | |
| documents = loader.load() | |
| # split it into chunks | |
| text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=200, separator="\n") | |
| docs = text_splitter.split_documents(documents) | |
| embeddings = OpenAIEmbeddings() | |
| db = Chroma.from_documents(docs, embeddings) | |
| llm_table = ChatOpenAI(model_name="gpt-3.5-turbo-16k", temperature=0) | |
| qa_chain = RetrievalQA.from_chain_type(llm_table, retriever=db.as_retriever()) | |
| # List of questions | |
| questions = [ | |
| """Mention all genes / locus name with respective rsID / SNP and potential diseases in a curly brackets like this: | |
| Example 1 : {"Genes" : "FTO", "SNPs" : "rs9939609", "Diseases" : "Obesity"} | |
| """, | |
| """Mention all genes / locus name with respective potential diseases in a curly brackets like this: | |
| Example 2 : {"Genes" : "FTO", "SNPs" : "" (if not available), "Diseases" : "Obesitya"} | |
| """, | |
| """Mention all rsIDs / SNPs / Variant with respective potential diseases / traits in a curly brackets like this: | |
| Example 3 : {"Genes" : "", "SNPs" : "rs9939609", "Diseases" : "Obesity"} | |
| """ | |
| ] | |
| try: | |
| for query in questions: | |
| response = qa_chain({"query" : query}) | |
| output_list.append(response) | |
| except Exception as e: | |
| pass | |
| db.delete_collection() | |
| # 1 | |
| for i in range(len(output_list[0]['result'].split('\n'))): | |
| if output_list[0]['result'].split('\n')[i] != "": | |
| try: | |
| row = literal_eval(output_list[0]['result'].split('\n')[i])[0] | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| }} | |
| if len(row['Genes'].strip().split(',')) > 1: | |
| for g in row['Genes'].strip().split(','): | |
| L.append({ | |
| 'Genes' : g.strip().upper(), | |
| 'SNPs' : row['SNPs'], | |
| "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : ' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title(), | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| }) | |
| else: | |
| L.append(row) | |
| except KeyError: | |
| row = literal_eval(output_list[0]['result'].split('\n')[i]) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| }} | |
| if len(row['Genes'].strip().split(',')) > 1: | |
| for g in row['Genes'].strip().split(','): | |
| L.append({ | |
| 'Genes' : g.strip().upper(), | |
| 'SNPs' : row['SNPs'], | |
| "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| }) | |
| else: | |
| L.append(row) | |
| except ValueError: | |
| if type(output_list[0]['result'].split('\n')[i]) is dict: | |
| row = output_list[0]['result'].split('\n')[i] | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except SyntaxError: | |
| row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i])) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| # 2 | |
| for i in range(len(output_list[1]['result'].split('\n'))): | |
| if output_list[1]['result'].split('\n')[i] != "": | |
| try: | |
| row = literal_eval(output_list[1]['result'].split('\n')[i])[0] | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| }} | |
| if row['SNPs'] != "Not available": | |
| row.update({ | |
| 'SNPs' : "Not available" | |
| }) | |
| if len(row['Genes'].strip().split(',')) > 1: | |
| for g in row['Genes'].strip().split(','): | |
| L.append({ | |
| 'Genes' : g.strip().upper(), | |
| "SNPs" : "Not available", | |
| "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| }) | |
| else: | |
| L.append(row) | |
| except KeyError: | |
| row = literal_eval(output_list[1]['result'].split('\n')[i]) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| }} | |
| if row['SNPs'] != "Not available": | |
| row.update({ | |
| 'SNPs' : "Not available" | |
| }) | |
| if len(row['Genes'].strip().split(',')) > 1: | |
| for g in row['Genes'].strip().split(','): | |
| L.append({ | |
| 'Genes' : g.strip().upper(), | |
| "SNPs" : "Not available", | |
| "Diseases" : ''.join(list(row['Diseases'].title() if row['Diseases'] not in ['T2D', 'T2DM', 'NAFLD', 'CVD'] else row['Diseases'])).replace('Unknown', ''), | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| }) | |
| else: | |
| L.append(row) | |
| except ValueError: | |
| if type(output_list[1]['result'].split('\n')[i]) is dict: | |
| row = output_list[1]['result'].split('\n')[i] | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except SyntaxError: | |
| row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i])) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| # 3 | |
| for i in range(len(output_list[2]['result'].split('\n'))): | |
| if output_list[2]['result'].split('\n')[i] != "": | |
| try: | |
| row = literal_eval(output_list[2]['result'].split('\n')[i])[0] | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except KeyError: | |
| row = literal_eval(output_list[2]['result'].split('\n')[i]) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except ValueError: | |
| if type(output_list[2]['result'].split('\n')[i]) is dict: | |
| row = output_list[2]['result'].split('\n')[i] | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| except SyntaxError: | |
| row = literal_eval("""{}""".format(output_list[2]['result'].split('\n')[i])) | |
| row = {**row, **{ | |
| 'Title' : concat['title'][0], | |
| 'Authors' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher_name'][0] if 'publisher_name' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['year_of_publication'].values.tolist())) if 'year_of_publication' in concat.columns else concat.assign(year_of_publication='')['year_of_publication'], | |
| 'Population' : upper_abbreviation(' '.join(concat['population_race'].values.tolist()).replace('Unknown', '').title()) if 'population_race' in concat.columns else concat.assign(population_race='')['population_race'], | |
| 'Sample Size' : sample_size_postproc(' '.join(concat['sample_size'].values.tolist()).replace('Unknown', '').title()) if 'sample_size' in concat.columns else concat.assign(sample_size='')['sample_size'], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).replace('Unknown', '').title()) if 'study_methodology' in concat.columns else concat.assign(study_methodology='')['study_methodology'], | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).replace('Unknown', '').title()) if 'study_level' in concat.columns else concat.assign(study_level='')['study_level'], | |
| 'Recommendation' : summary, | |
| } | |
| } | |
| if not row['SNPs'].startswith("rs"): | |
| row.update({ | |
| 'SNPs' : "-" | |
| }) | |
| else: | |
| L.append(row) | |
| st.write("β Table Extraction Done") | |
| status.update(label="Gene and SNPs succesfully collected.") | |
| L = [{key: ''.join(['' if item == 'Unknow' else item for item in value]) for key, value in d.items()} for d in L] | |
| L = [{key: ''.join(['Not Available' if item == '' else item for item in value]) for key, value in d.items()} for d in L] | |
| csv = pd.DataFrame(L) | |
| st.dataframe(csv) | |
| with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: | |
| # Write each dataframe to a different worksheet | |
| csv.to_excel(writer, sheet_name='Result') | |
| writer.close() | |
| time_now = datetime.now() | |
| current_time = time_now.strftime("%H:%M:%S") | |
| csv = convert_df(csv) | |
| st.download_button( | |
| label="Save Result", | |
| data=buffer, | |
| file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx', | |
| mime='application/vnd.ms-excel' | |
| ) | |
| if on_t: | |
| if parseButtonT: | |
| with st.status("Extraction in progress ...", expanded=True) as status: | |
| st.write("Getting Result ...") | |
| csv = pd.DataFrame() | |
| for uploaded_file in stqdm(uploaded_files): | |
| L = [] | |
| with NamedTemporaryFile(dir='.', suffix=".pdf") as pdf: | |
| pdf.write(uploaded_file.getbuffer()) | |
| # Entity Extraction | |
| st.write("β Extracting Entities ...") | |
| bytes_data = uploaded_file.read() | |
| journal = Journal(uploaded_file.name, bytes_data) | |
| images = pdf2image.convert_from_bytes(journal.bytes) | |
| extracted_text = "" | |
| for image in images[:-1]: | |
| text = pytesseract.image_to_string(image) | |
| text = clean_text(text) | |
| extracted_text += text + " " | |
| text = replace_quotes(extracted_text) | |
| text_chunk = split_text(text, chunk_size_t) | |
| chunkdf = [] | |
| for i, chunk in enumerate(text_chunk): | |
| inp = chunk | |
| df = pd.DataFrame(literal_eval(str(json.dumps(textex_chain.run(inp)[0])).replace("\'", "\"")), index=[0]).fillna('') | |
| chunkdf.append(df) | |
| concat = pd.concat(chunkdf, axis=0).reset_index().drop('index', axis=1).fillna('') | |
| st.write("β Entities Extraction Done ..") | |
| time.sleep(0.1) | |
| st.write("β Generating Summary ...") | |
| if 'SNPs' in list(concat.columns): | |
| concat['SNPs'] = concat['SNPs'].apply(lambda x: x if x.startswith('rs') else '') | |
| else: | |
| concat['SNPs'] = '' | |
| for col in list(concat.columns): | |
| concat[col] = concat[col].apply(lambda x: x if x not in ['N/A', 'not mentioned', 'Not mentioned', 'Unknown'] else '') | |
| summary = get_summ(pdf.name) | |
| time.sleep(0.1) | |
| st.write("β Generating Summary Done...") | |
| for i in range(len(concat)): | |
| if (len(concat['genes_locus'][i].split(',')) >= 1) and concat['SNPs'][i] == '': | |
| for g in concat['genes_locus'][i].split(','): | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Author' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher'][0] if 'publisher' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())), | |
| 'Genes' : g.upper(), | |
| 'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()), | |
| 'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()), | |
| 'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())), | |
| 'SNPs' : concat['SNPs'][i], | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()), | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()), | |
| 'Recommendation' : summary, | |
| }) | |
| elif (len(concat['SNPs'][i].split(',')) >= 1): | |
| for s in concat['SNPs'][i].split(','): | |
| try: | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Author' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher'][0] if 'publisher' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())), | |
| 'Genes' : get_geneName(s.strip()).upper(), | |
| 'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()), | |
| 'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()), | |
| 'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())), | |
| 'SNPs' : s, | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()), | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()), | |
| 'Recommendation' : summary, | |
| }) | |
| except Exception as e: | |
| L.append({ | |
| 'Title' : concat['title'][0], | |
| 'Author' : concat['authors'][0] if 'authors' in list(concat.columns) else '', | |
| 'Publisher Name' : concat['publisher'][0] if 'publisher' in list(concat.columns) else '', | |
| 'Publication Year' : get_valid_year(' '.join(concat['publication_year'].values.tolist())), | |
| 'Genes' : '', | |
| 'Population' : upper_abbreviation(' '.join(np.unique(concat['population_race'].values.tolist())).title()), | |
| 'Diseases' : upper_abbreviation(' '.join(concat['diseases'].values.tolist()).title()), | |
| 'Sample Size' : sample_size_postproc(upper_abbreviation(' '.join(concat['sample_size'].values.tolist()).title())), | |
| 'SNPs' : s, | |
| 'Study Methodology' : upper_abbreviation(' '.join(concat['study_methodology'].values.tolist()).title()), | |
| 'Study Level' : upper_abbreviation(' '.join(concat['study_level'].values.tolist()).title()), | |
| 'Recommendation' : summary, | |
| }) | |
| csv = pd.concat([csv, pd.DataFrame(L)], ignore_index=True).drop_duplicates(subset='Genes') | |
| status.update(label="Gene and SNPs succesfully collected.") | |
| st.dataframe(csv) | |
| with pd.ExcelWriter(buffer, engine='xlsxwriter') as writer: | |
| # Write each dataframe to a different worksheet | |
| csv.to_excel(writer, sheet_name='Result') | |
| writer.close() | |
| time_now = datetime.now() | |
| current_time = time_now.strftime("%H:%M:%S") | |
| csv = convert_df(csv) | |
| st.download_button( | |
| label="Save Result", | |
| data=buffer, | |
| file_name=f'{uploaded_file.name}'.replace('.pdf', '') + '.xlsx', | |
| mime='application/vnd.ms-excel' | |
| ) |