Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, Request, HTTPException, Form, Depends, status | |
| from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
| import shutil | |
| import os | |
| import uuid | |
| import base64 | |
| from pathlib import Path | |
| import uvicorn | |
| from typing import List, Optional | |
| import secrets | |
| from starlette.middleware.sessions import SessionMiddleware | |
| from fastapi.security import OAuth2PasswordRequestForm | |
| from fastapi.responses import JSONResponse | |
| import json | |
| import io | |
| from huggingface_hub import HfApi, HfFolder, create_repo | |
| from huggingface_hub.utils import RepositoryNotFoundError | |
| from huggingface_hub.hf_api import RepoFile | |
| import tempfile | |
| # Create FastAPI app | |
| app = FastAPI(title="Image Uploader") | |
| # Add session middleware | |
| app.add_middleware( | |
| SessionMiddleware, | |
| secret_key="YOUR_SECRET_KEY_CHANGE_THIS_IN_PRODUCTION" | |
| ) | |
| # Create uploads directory if it doesn't exist | |
| UPLOAD_DIR = Path("static/uploads") | |
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| # Create metadata directory for storing hashtags | |
| METADATA_DIR = Path("static/metadata") | |
| METADATA_DIR.mkdir(parents=True, exist_ok=True) | |
| METADATA_FILE = METADATA_DIR / "image_metadata.json" | |
| # Initialize metadata file if it doesn't exist | |
| if not METADATA_FILE.exists(): | |
| with open(METADATA_FILE, "w") as f: | |
| json.dump({}, f) | |
| # Mount static directory | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| # Set up Jinja2 templates | |
| templates = Jinja2Templates(directory="templates") | |
| # Set up security | |
| security = HTTPBasic() | |
| # Hardcoded credentials (in a real app, use proper hashed passwords in a database) | |
| USERNAME = "detomo" | |
| PASSWORD = "itweek2025" | |
| # Hugging Face Dataset configuration | |
| HF_USERNAME = os.environ.get("HF_USERNAME", "") # Set this in Hugging Face Space settings | |
| HF_TOKEN = os.environ.get("HF_TOKEN", "") # Set this in Hugging Face Space settings | |
| DATASET_REPO = os.environ.get("HF_DATASET_REPO", "image-uploader-data") | |
| IMAGES_PATH = "images" | |
| METADATA_PATH = "metadata" | |
| # Set HF cache directory to a writable location | |
| # This is necessary for Hugging Face Spaces which has permission issues with the default cache location | |
| os.environ["HF_HOME"] = os.path.join(tempfile.gettempdir(), "huggingface") | |
| os.environ["HUGGINGFACE_HUB_CACHE"] = os.path.join(tempfile.gettempdir(), "huggingface", "hub") | |
| os.makedirs(os.environ["HF_HOME"], exist_ok=True) | |
| os.makedirs(os.environ["HUGGINGFACE_HUB_CACHE"], exist_ok=True) | |
| # Initialize HfApi | |
| hf_api = HfApi(token=HF_TOKEN) | |
| # Create or ensure repository exists | |
| def ensure_repo_exists(): | |
| try: | |
| # Check if repo exists | |
| hf_api.repo_info(repo_id=f"{HF_USERNAME}/{DATASET_REPO}", repo_type="dataset") | |
| print(f"Repository {HF_USERNAME}/{DATASET_REPO} exists") | |
| except RepositoryNotFoundError: | |
| # Create repo if it doesn't exist | |
| try: | |
| print(f"Creating repository {HF_USERNAME}/{DATASET_REPO}") | |
| create_repo(f"{HF_USERNAME}/{DATASET_REPO}", repo_type="dataset", token=HF_TOKEN) | |
| # Initialize metadata | |
| metadata = json.dumps({}) | |
| hf_api.upload_file( | |
| path_or_fileobj=io.BytesIO(metadata.encode()), | |
| path_in_repo=f"{METADATA_PATH}/image_metadata.json", | |
| repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print(f"Repository created and initialized") | |
| except Exception as e: | |
| print(f"Error creating repository: {e}") | |
| except Exception as e: | |
| print(f"Error checking repository: {e}") | |
| # Initialize repository if in production | |
| if os.environ.get("ENV", "development") == "production": | |
| print("Running in production mode, checking Hugging Face repository...") | |
| if HF_USERNAME and HF_TOKEN: | |
| print(f"Using Hugging Face credentials for user: {HF_USERNAME}") | |
| try: | |
| ensure_repo_exists() | |
| except Exception as e: | |
| print(f"Error ensuring repository exists: {e}") | |
| else: | |
| print("Warning: HF_USERNAME or HF_TOKEN not set. Running without Hugging Face integration.") | |
| def get_file_extension(filename: str) -> str: | |
| """Get the file extension from a filename.""" | |
| return os.path.splitext(filename)[1].lower() | |
| def is_valid_image(extension: str) -> bool: | |
| """Check if the file extension is a valid image type.""" | |
| return extension in ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp'] | |
| def authenticate(request: Request): | |
| """Check if user is authenticated.""" | |
| is_authenticated = request.session.get("authenticated", False) | |
| return is_authenticated | |
| def verify_auth(request: Request): | |
| """Verify authentication.""" | |
| if not authenticate(request): | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Not authenticated", | |
| headers={"WWW-Authenticate": "Basic"}, | |
| ) | |
| return True | |
| def get_image_metadata(): | |
| """Get all image metadata including hashtags.""" | |
| # In production, get metadata from Hugging Face | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
| try: | |
| print(f"Fetching metadata from Hugging Face repository {HF_USERNAME}/{DATASET_REPO}") | |
| metadata_file = hf_api.hf_hub_download( | |
| repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
| filename=f"{METADATA_PATH}/image_metadata.json", | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| local_dir=os.path.join(tempfile.gettempdir(), "hf_downloads") | |
| ) | |
| print(f"Metadata downloaded to {metadata_file}") | |
| with open(metadata_file, "r") as f: | |
| return json.load(f) | |
| except Exception as e: | |
| print(f"Error fetching metadata from Hugging Face: {e}") | |
| # Return empty dict if failed | |
| return {} | |
| # Local development fallback | |
| elif METADATA_FILE.exists(): | |
| with open(METADATA_FILE, "r") as f: | |
| return json.load(f) | |
| return {} | |
| def save_image_metadata(metadata): | |
| """Save image metadata to the JSON file.""" | |
| # In production, save to Hugging Face | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
| try: | |
| print(f"Saving metadata to Hugging Face repository {HF_USERNAME}/{DATASET_REPO}") | |
| metadata_str = json.dumps(metadata) | |
| hf_api.upload_file( | |
| path_or_fileobj=io.BytesIO(metadata_str.encode()), | |
| path_in_repo=f"{METADATA_PATH}/image_metadata.json", | |
| repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print(f"Metadata saved successfully") | |
| except Exception as e: | |
| print(f"Error saving metadata to Hugging Face: {e}") | |
| # Still save locally as fallback | |
| with open(METADATA_FILE, "w") as f: | |
| json.dump(metadata, f) | |
| else: | |
| # Local development or fallback | |
| with open(METADATA_FILE, "w") as f: | |
| json.dump(metadata, f) | |
| def add_hashtags_to_image(filename, hashtags, original_filename=None): | |
| """Add hashtags to an image.""" | |
| metadata = get_image_metadata() | |
| # If file exists in metadata, update its hashtags, otherwise create new entry | |
| if filename in metadata: | |
| metadata[filename]["hashtags"] = hashtags | |
| if original_filename: | |
| metadata[filename]["original_filename"] = original_filename | |
| else: | |
| metadata_entry = {"hashtags": hashtags, "is_new": True} | |
| if original_filename: | |
| metadata_entry["original_filename"] = original_filename | |
| metadata[filename] = metadata_entry | |
| save_image_metadata(metadata) | |
| def mark_image_as_viewed(filename): | |
| """Mark an image as viewed (not new)""" | |
| metadata = get_image_metadata() | |
| if filename in metadata: | |
| metadata[filename]["is_new"] = False | |
| save_image_metadata(metadata) | |
| def upload_to_hf(file_content, filename): | |
| """Upload a file to Hugging Face Dataset Repository.""" | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
| try: | |
| print(f"Uploading file {filename} to Hugging Face repository {HF_USERNAME}/{DATASET_REPO}") | |
| hf_api.upload_file( | |
| path_or_fileobj=io.BytesIO(file_content), | |
| path_in_repo=f"{IMAGES_PATH}/{filename}", | |
| repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print(f"File {filename} uploaded successfully") | |
| return True | |
| except Exception as e: | |
| print(f"Error uploading to Hugging Face: {e}") | |
| return False | |
| return False | |
| def delete_from_hf(filename): | |
| """Delete a file from Hugging Face Dataset Repository.""" | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
| try: | |
| print(f"Deleting file {filename} from Hugging Face repository {HF_USERNAME}/{DATASET_REPO}") | |
| hf_api.delete_file( | |
| path_in_repo=f"{IMAGES_PATH}/{filename}", | |
| repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| print(f"File {filename} deleted successfully") | |
| return True | |
| except Exception as e: | |
| print(f"Error deleting from Hugging Face: {e}") | |
| return False | |
| return False | |
| def get_hf_image_url(filename): | |
| """Get the URL for an image in the Hugging Face repo.""" | |
| if HF_USERNAME: | |
| return f"https://huggingface.co/datasets/{HF_USERNAME}/{DATASET_REPO}/resolve/main/{IMAGES_PATH}/{filename}" | |
| return None | |
| def list_hf_images(): | |
| """List all images in the Hugging Face repo.""" | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
| try: | |
| print(f"Listing files from Hugging Face repository {HF_USERNAME}/{DATASET_REPO}") | |
| files = hf_api.list_repo_files( | |
| repo_id=f"{HF_USERNAME}/{DATASET_REPO}", | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| ) | |
| # Filter only image files in the images directory | |
| image_files = [f for f in files if f.startswith(f"{IMAGES_PATH}/")] | |
| image_basenames = [os.path.basename(f) for f in image_files] | |
| print(f"Found {len(image_basenames)} images") | |
| return image_basenames | |
| except Exception as e: | |
| print(f"Error listing files from Hugging Face: {e}") | |
| return [] | |
| return [] | |
| async def login_page(request: Request): | |
| """Render the login page.""" | |
| # If already authenticated, redirect to home | |
| if authenticate(request): | |
| return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) | |
| return templates.TemplateResponse( | |
| "login.html", | |
| {"request": request} | |
| ) | |
| async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()): | |
| """Handle login form submission.""" | |
| if form_data.username == USERNAME and form_data.password == PASSWORD: | |
| request.session["authenticated"] = True | |
| return RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) | |
| else: | |
| return templates.TemplateResponse( | |
| "login.html", | |
| {"request": request, "error": "Invalid username or password"} | |
| ) | |
| async def logout(request: Request): | |
| """Handle logout.""" | |
| request.session.pop("authenticated", None) | |
| return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| async def home(request: Request, search: Optional[str] = None, tag: Optional[str] = None): | |
| """Render the home page with authentication check and optional search/filter.""" | |
| # Check if user is authenticated | |
| if not authenticate(request): | |
| return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| # Get all uploaded images and their metadata | |
| uploaded_images = [] | |
| metadata = get_image_metadata() | |
| # In production environment, get images from Hugging Face | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME: | |
| hf_images = list_hf_images() | |
| for file_name in hf_images: | |
| # Get hashtags from metadata if available | |
| hashtags = [] | |
| is_new = False | |
| original_filename = file_name | |
| if file_name in metadata: | |
| hashtags = metadata[file_name].get("hashtags", []) | |
| is_new = metadata[file_name].get("is_new", False) | |
| original_filename = metadata[file_name].get("original_filename", file_name) | |
| # If searching/filtering, check if this image should be included | |
| if search and search.lower() not in original_filename.lower() and not any(search.lower() in tag.lower() for tag in hashtags): | |
| continue | |
| if tag and tag not in hashtags: | |
| continue | |
| image_url = get_hf_image_url(file_name) | |
| uploaded_images.append({ | |
| "name": file_name, | |
| "url": image_url, | |
| "embed_url": image_url, | |
| "hashtags": hashtags, | |
| "is_new": is_new, | |
| "original_filename": original_filename | |
| }) | |
| # Local development fallback | |
| elif UPLOAD_DIR.exists(): | |
| for file in UPLOAD_DIR.iterdir(): | |
| if is_valid_image(get_file_extension(file.name)): | |
| image_url = f"/static/uploads/{file.name}" | |
| # Get hashtags from metadata if available | |
| hashtags = [] | |
| is_new = False | |
| original_filename = file.name | |
| if file.name in metadata: | |
| hashtags = metadata[file.name].get("hashtags", []) | |
| is_new = metadata[file.name].get("is_new", False) | |
| original_filename = metadata[file.name].get("original_filename", file.name) | |
| # If searching/filtering, check if this image should be included | |
| if search and search.lower() not in original_filename.lower() and not any(search.lower() in tag.lower() for tag in hashtags): | |
| continue | |
| if tag and tag not in hashtags: | |
| continue | |
| uploaded_images.append({ | |
| "name": file.name, | |
| "url": image_url, | |
| "embed_url": f"{request.base_url}static/uploads/{file.name}", | |
| "hashtags": hashtags, | |
| "is_new": is_new, | |
| "original_filename": original_filename | |
| }) | |
| # Get all unique hashtags for the filter dropdown | |
| all_hashtags = set() | |
| for img_data in metadata.values(): | |
| if "hashtags" in img_data: | |
| all_hashtags.update(img_data["hashtags"]) | |
| return templates.TemplateResponse( | |
| "index.html", | |
| { | |
| "request": request, | |
| "uploaded_images": uploaded_images, | |
| "all_hashtags": sorted(list(all_hashtags)), | |
| "current_search": search, | |
| "current_tag": tag | |
| } | |
| ) | |
| async def upload_image( | |
| request: Request, | |
| files: List[UploadFile] = File(...), | |
| hashtags: str = Form("") | |
| ): | |
| """Handle multiple image uploads with hashtags.""" | |
| # Check if user is authenticated | |
| if not authenticate(request): | |
| return JSONResponse( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| content={"detail": "Not authenticated"} | |
| ) | |
| # Process hashtags into a list | |
| hashtag_list = [] | |
| if hashtags: | |
| # Split by spaces or commas and remove empty strings/whitespace | |
| hashtag_list = [tag.strip() for tag in hashtags.replace(',', ' ').split() if tag.strip()] | |
| results = [] | |
| duplicates = [] | |
| # First, check for duplicate filenames | |
| metadata = get_image_metadata() | |
| all_files = {} | |
| # In production, check for duplicates in Hugging Face repo | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME: | |
| hf_images = list_hf_images() | |
| for filename in hf_images: | |
| # Get original filename from metadata if available | |
| original_name = filename | |
| if filename in metadata and "original_filename" in metadata[filename]: | |
| original_name = metadata[filename]["original_filename"] | |
| all_files[original_name.lower()] = filename | |
| # Local development fallback | |
| elif UPLOAD_DIR.exists(): | |
| for file in UPLOAD_DIR.iterdir(): | |
| if is_valid_image(get_file_extension(file.name)): | |
| # Get original filename from metadata if available | |
| original_name = file.name | |
| if file.name in metadata and "original_filename" in metadata[file.name]: | |
| original_name = metadata[file.name]["original_filename"] | |
| all_files[original_name.lower()] = file.name | |
| # Check for duplicates in current upload batch | |
| for file in files: | |
| file_lower = file.filename.lower() | |
| if file_lower in all_files: | |
| # Found a duplicate | |
| duplicates.append({ | |
| "new_file": file.filename, | |
| "existing_file": all_files[file_lower], | |
| "original_name": file.filename | |
| }) | |
| # If we found duplicates, return them to the frontend for confirmation | |
| if duplicates: | |
| return { | |
| "success": False, | |
| "duplicates": duplicates, | |
| "message": "Duplicate filenames detected", | |
| "action_required": "confirm_replace" | |
| } | |
| # No duplicates, proceed with upload | |
| for file in files: | |
| # Check if the file is an image | |
| extension = get_file_extension(file.filename) | |
| if not is_valid_image(extension): | |
| continue # Skip non-image files | |
| # Preserve original filename in metadata but make it safe for filesystem | |
| original_filename = file.filename | |
| # Generate a unique filename to prevent overwrites | |
| unique_filename = f"{uuid.uuid4()}{extension}" | |
| # Read file content for upload | |
| file.file.seek(0) | |
| file_content = await file.read() | |
| # Save file based on environment | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
| # Upload to Hugging Face | |
| upload_success = upload_to_hf(file_content, unique_filename) | |
| if not upload_success: | |
| # Fallback to local storage if HF upload fails | |
| file_path = UPLOAD_DIR / unique_filename | |
| with file_path.open("wb") as buffer: | |
| buffer.write(file_content) | |
| else: | |
| # Local development storage | |
| file_path = UPLOAD_DIR / unique_filename | |
| with file_path.open("wb") as buffer: | |
| buffer.write(file_content) | |
| # Save hashtags and original filename | |
| add_hashtags_to_image(unique_filename, hashtag_list, original_filename) | |
| # For base64 encoding | |
| base64_encoded = base64.b64encode(file_content).decode("utf-8") | |
| # Determine MIME type | |
| mime_type = { | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.bmp': 'image/bmp', | |
| '.webp': 'image/webp' | |
| }.get(extension, 'application/octet-stream') | |
| # Get appropriate image URL based on environment | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME: | |
| image_url = get_hf_image_url(unique_filename) | |
| full_url = image_url | |
| else: | |
| image_url = f"/static/uploads/{unique_filename}" | |
| full_url = f"{request.base_url}static/uploads/{unique_filename}" | |
| results.append({ | |
| "success": True, | |
| "file_name": unique_filename, | |
| "original_filename": original_filename, | |
| "file_url": image_url, | |
| "full_url": full_url, | |
| "embed_html": f'<img src="{full_url}" alt="{original_filename}" />', | |
| "base64_data": f"data:{mime_type};base64,{base64_encoded[:20]}...{base64_encoded[-20:]}", | |
| "base64_embed": f'<img src="data:{mime_type};base64,{base64_encoded}" alt="{original_filename}" />', | |
| "hashtags": hashtag_list | |
| }) | |
| if len(results) == 1: | |
| return results[0] | |
| else: | |
| return {"success": True, "uploaded_count": len(results), "files": results} | |
| async def upload_with_replace( | |
| request: Request, | |
| files: List[UploadFile] = File(...), | |
| hashtags: str = Form(""), | |
| replace_files: str = Form("") | |
| ): | |
| """Handle upload with replacement of duplicate files.""" | |
| # Check if user is authenticated | |
| if not authenticate(request): | |
| return JSONResponse( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| content={"detail": "Not authenticated"} | |
| ) | |
| # Process hashtags into a list | |
| hashtag_list = [] | |
| if hashtags: | |
| # Split by spaces or commas and remove empty strings/whitespace | |
| hashtag_list = [tag.strip() for tag in hashtags.replace(',', ' ').split() if tag.strip()] | |
| # Parse the replacement files JSON | |
| files_to_replace = [] | |
| if replace_files: | |
| try: | |
| files_to_replace = json.loads(replace_files) | |
| except json.JSONDecodeError: | |
| files_to_replace = [] | |
| # Create a map of original names to replacement decisions | |
| replace_map = {item["original_name"].lower(): item["existing_file"] for item in files_to_replace} | |
| results = [] | |
| for file in files: | |
| # Check if the file is an image | |
| extension = get_file_extension(file.filename) | |
| if not is_valid_image(extension): | |
| continue # Skip non-image files | |
| # Preserve original filename in metadata | |
| original_filename = file.filename | |
| file_lower = original_filename.lower() | |
| # Read file content | |
| file.file.seek(0) | |
| file_content = await file.read() | |
| # Check if this file should replace an existing one | |
| if file_lower in replace_map: | |
| # Delete the old file | |
| old_filename = replace_map[file_lower] | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
| # Delete from Hugging Face | |
| delete_success = delete_from_hf(old_filename) | |
| if not delete_success: | |
| raise HTTPException(status_code=404, detail="Image not found or could not be deleted") | |
| else: | |
| # Delete from local storage | |
| old_file = UPLOAD_DIR / old_filename | |
| if old_file.exists(): | |
| os.remove(old_file) | |
| # Remove from metadata | |
| metadata = get_image_metadata() | |
| if old_filename in metadata: | |
| del metadata[old_filename] | |
| save_image_metadata(metadata) | |
| # Generate a unique filename to prevent overwrites | |
| unique_filename = f"{uuid.uuid4()}{extension}" | |
| # Save file based on environment | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
| # Upload to Hugging Face | |
| upload_success = upload_to_hf(file_content, unique_filename) | |
| if not upload_success: | |
| # Fallback to local storage if HF upload fails | |
| file_path = UPLOAD_DIR / unique_filename | |
| with file_path.open("wb") as buffer: | |
| buffer.write(file_content) | |
| else: | |
| # Local development storage | |
| file_path = UPLOAD_DIR / unique_filename | |
| with file_path.open("wb") as buffer: | |
| buffer.write(file_content) | |
| # Save hashtags and original filename | |
| add_hashtags_to_image(unique_filename, hashtag_list, original_filename) | |
| # For base64 encoding | |
| base64_encoded = base64.b64encode(file_content).decode("utf-8") | |
| # Determine MIME type | |
| mime_type = { | |
| '.jpg': 'image/jpeg', | |
| '.jpeg': 'image/jpeg', | |
| '.png': 'image/png', | |
| '.gif': 'image/gif', | |
| '.bmp': 'image/bmp', | |
| '.webp': 'image/webp' | |
| }.get(extension, 'application/octet-stream') | |
| # Get appropriate image URL based on environment | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME: | |
| image_url = get_hf_image_url(unique_filename) | |
| full_url = image_url | |
| else: | |
| image_url = f"/static/uploads/{unique_filename}" | |
| full_url = f"{request.base_url}static/uploads/{unique_filename}" | |
| results.append({ | |
| "success": True, | |
| "file_name": unique_filename, | |
| "original_filename": original_filename, | |
| "file_url": image_url, | |
| "full_url": full_url, | |
| "embed_html": f'<img src="{full_url}" alt="{original_filename}" />', | |
| "base64_data": f"data:{mime_type};base64,{base64_encoded[:20]}...{base64_encoded[-20:]}", | |
| "base64_embed": f'<img src="data:{mime_type};base64,{base64_encoded}" alt="{original_filename}" />', | |
| "hashtags": hashtag_list | |
| }) | |
| if len(results) == 1: | |
| return results[0] | |
| else: | |
| return {"success": True, "uploaded_count": len(results), "files": results} | |
| async def view_image(request: Request, file_name: str): | |
| """View a specific image with authentication check.""" | |
| # Check if user is authenticated | |
| if not authenticate(request): | |
| return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| # Mark image as viewed (not new) | |
| mark_image_as_viewed(file_name) | |
| # Determine image URL based on environment | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME: | |
| image_url = get_hf_image_url(file_name) | |
| embed_url = image_url | |
| # Check if file exists in Hugging Face | |
| if not image_url or file_name not in list_hf_images(): | |
| raise HTTPException(status_code=404, detail="Image not found") | |
| else: | |
| # Check local file | |
| file_path = UPLOAD_DIR / file_name | |
| if not file_path.exists(): | |
| raise HTTPException(status_code=404, detail="Image not found") | |
| image_url = f"/static/uploads/{file_name}" | |
| embed_url = f"{request.base_url}static/uploads/{file_name}" | |
| # Get metadata | |
| metadata = get_image_metadata() | |
| hashtags = [] | |
| original_filename = file_name | |
| if file_name in metadata: | |
| hashtags = metadata[file_name].get("hashtags", []) | |
| original_filename = metadata[file_name].get("original_filename", file_name) | |
| return templates.TemplateResponse( | |
| "view.html", | |
| { | |
| "request": request, | |
| "image_url": image_url, | |
| "file_name": file_name, | |
| "original_filename": original_filename, | |
| "embed_url": embed_url, | |
| "hashtags": hashtags | |
| } | |
| ) | |
| async def update_hashtags(request: Request, file_name: str, hashtags: str = Form("")): | |
| """Update hashtags for an image.""" | |
| # Check if user is authenticated | |
| if not authenticate(request): | |
| return JSONResponse( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| content={"detail": "Not authenticated"} | |
| ) | |
| # Check if file exists | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME: | |
| if file_name not in list_hf_images(): | |
| raise HTTPException(status_code=404, detail="Image not found") | |
| else: | |
| file_path = UPLOAD_DIR / file_name | |
| if not file_path.exists(): | |
| raise HTTPException(status_code=404, detail="Image not found") | |
| # Process hashtags | |
| hashtag_list = [] | |
| if hashtags: | |
| hashtag_list = [tag.strip() for tag in hashtags.replace(',', ' ').split() if tag.strip()] | |
| # Update hashtags in metadata | |
| add_hashtags_to_image(file_name, hashtag_list) | |
| # Redirect back to the image view | |
| return RedirectResponse(url=f"/view/{file_name}", status_code=status.HTTP_303_SEE_OTHER) | |
| async def delete_image(request: Request, file_name: str): | |
| """Delete an image with authentication check.""" | |
| # Check if user is authenticated | |
| if not authenticate(request): | |
| return JSONResponse( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| content={"detail": "Not authenticated"} | |
| ) | |
| # Delete file based on environment | |
| if os.environ.get("ENV", "development") == "production" and HF_USERNAME and HF_TOKEN: | |
| # Delete from Hugging Face | |
| delete_success = delete_from_hf(file_name) | |
| if not delete_success: | |
| raise HTTPException(status_code=404, detail="Image not found or could not be deleted") | |
| else: | |
| # Delete from local storage | |
| file_path = UPLOAD_DIR / file_name | |
| if not file_path.exists(): | |
| raise HTTPException(status_code=404, detail="Image not found") | |
| os.remove(file_path) | |
| # Remove from metadata | |
| metadata = get_image_metadata() | |
| if file_name in metadata: | |
| del metadata[file_name] | |
| save_image_metadata(metadata) | |
| return {"success": True, "message": f"Image {file_name} has been deleted"} | |
| # Health check endpoint for Hugging Face Spaces | |
| async def health_check(): | |
| return {"status": "ok"} | |
| if __name__ == "__main__": | |
| # For local development | |
| uvicorn.run("app:app", host="127.0.0.1", port=8000, reload=True) | |
| # For production/Hugging Face (uncomment when deploying) | |
| # uvicorn.run("app:app", host="0.0.0.0", port=7860) |