Spaces:
Sleeping
Sleeping
| from fastapi import ( | |
| FastAPI, | |
| HTTPException, | |
| Request, | |
| Form, | |
| Depends, | |
| status, | |
| APIRouter, | |
| Header, | |
| ) | |
| from fastapi.responses import ( | |
| HTMLResponse, | |
| RedirectResponse, | |
| JSONResponse, | |
| StreamingResponse, | |
| ) | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.security import HTTPBasic | |
| from datetime import datetime, timedelta | |
| import random | |
| import folium | |
| import uuid as uuid_module | |
| from folium.plugins import MarkerCluster | |
| from typing import Optional | |
| from sqlalchemy import create_engine | |
| from sqlalchemy.orm import sessionmaker, Session | |
| from models import Base, User, StatusRecord, SystemSetting, Device, StatusRecordBatch | |
| import io | |
| import csv | |
| from typing import Dict | |
| # Database setup | |
| SQLALCHEMY_DATABASE_URL = "sqlite:///./database.db" | |
| engine = create_engine( | |
| SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} | |
| ) | |
| SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) | |
| Base.metadata.create_all(bind=engine) | |
| # Create default admin user and system settings | |
| def create_default_data(): | |
| db = SessionLocal() | |
| try: | |
| # Create default admin user if not exists | |
| if not db.query(User).filter(User.username == "admin").first(): | |
| admin_user = User( | |
| username="admin", | |
| email="[email protected]", | |
| password="admin", | |
| is_admin=True, | |
| is_active=True, | |
| ) | |
| db.add(admin_user) | |
| # Create default system settings if not exists | |
| if not db.query(SystemSetting).first(): | |
| default_settings = SystemSetting() | |
| db.add(default_settings) | |
| db.commit() | |
| except Exception as e: | |
| db.rollback() | |
| print(f"Error creating default data: {str(e)}") | |
| finally: | |
| db.close() | |
| create_default_data() | |
| app = FastAPI() | |
| templates = Jinja2Templates(directory="templates") | |
| security = HTTPBasic() | |
| # Create APIRouters for grouping | |
| admin_router = APIRouter(prefix="/admin", tags=["admin"]) | |
| api_router = APIRouter(prefix="/api", tags=["api"]) | |
| # Dependency to get the database session | |
| def get_db(): | |
| db = SessionLocal() | |
| try: | |
| yield db | |
| finally: | |
| db.close() | |
| def get_current_user(request: Request, db: Session = Depends(get_db)) -> Optional[User]: | |
| username = request.cookies.get("username") | |
| if username: | |
| return db.query(User).filter(User.username == username).first() | |
| return None | |
| def login_required(request: Request, db: Session = Depends(get_db)): | |
| username = request.cookies.get("username") | |
| if not username: | |
| return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| user = db.query(User).filter(User.username == username).first() | |
| if not user: | |
| return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| return user | |
| # Device authentication function | |
| def authenticate_device( | |
| device_id: str, device_password: str, db: Session = Depends(get_db) | |
| ): | |
| device = db.query(Device).filter(Device.device_id == device_id).first() | |
| if not device or device.password != device_password: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Invalid device credentials", | |
| ) | |
| return device | |
| async def login_page(request: Request): | |
| return templates.TemplateResponse("login.html", {"request": request}) | |
| async def login( | |
| request: Request, | |
| username: str = Form(...), | |
| password: str = Form(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| user = ( | |
| db.query(User) | |
| .filter( | |
| User.username == username, User.password == password, User.is_active == True | |
| ) | |
| .first() | |
| ) | |
| if user: | |
| user.last_login = datetime.now() | |
| db.commit() | |
| response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) | |
| response.set_cookie(key="username", value=username, httponly=True) | |
| return response | |
| return templates.TemplateResponse( | |
| "login.html", | |
| {"request": request, "error": "Invalid credentials or inactive account"}, | |
| ) | |
| async def logout(): | |
| response = RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| response.delete_cookie("username") | |
| return response | |
| async def register_page(request: Request): | |
| return templates.TemplateResponse("register.html", {"request": request}) | |
| async def register( | |
| username: str = Form(...), | |
| email: str = Form(...), | |
| password: str = Form(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| existing_user = db.query(User).filter(User.username == username).first() | |
| if existing_user: | |
| return RedirectResponse( | |
| url="/register?error=1", status_code=status.HTTP_302_FOUND | |
| ) | |
| new_user = User(username=username, email=email, password=password) | |
| db.add(new_user) | |
| db.commit() | |
| return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| # Admin routes | |
| async def admin_page(request: Request, db: Session = Depends(get_db)): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| users = db.query(User).all() | |
| devices = db.query(Device).all() | |
| return templates.TemplateResponse( | |
| "admin.html", {"request": request, "users": users, "devices": devices} | |
| ) | |
| async def delete_user(request: Request, username: str, db: Session = Depends(get_db)): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| user = db.query(User).filter(User.username == username).first() | |
| if user: | |
| db.delete(user) | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| async def edit_user( | |
| request: Request, | |
| username: str, | |
| new_username: str = Form(...), | |
| email: str = Form(...), | |
| is_admin: bool = Form(False), | |
| is_active: bool = Form(False), | |
| db: Session = Depends(get_db), | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| user = db.query(User).filter(User.username == username).first() | |
| if user: | |
| if ( | |
| new_username != username | |
| and db.query(User).filter(User.username == new_username).first() | |
| ): | |
| raise HTTPException(status_code=400, detail="Username already exists") | |
| user.username = new_username | |
| user.email = email | |
| user.is_admin = is_admin | |
| user.is_active = is_active | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| async def add_device( | |
| request: Request, | |
| name: str = Form(...), | |
| description: str = Form(...), | |
| device_id: str = Form(...), | |
| password: str = Form(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| existing_device = db.query(Device).filter(Device.device_id == device_id).first() | |
| if existing_device: | |
| raise HTTPException(status_code=400, detail="Device ID already exists") | |
| new_device = Device( | |
| name=name, description=description, device_id=device_id, password=password | |
| ) | |
| db.add(new_device) | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| async def edit_device( | |
| request: Request, | |
| device_id: str, | |
| name: str = Form(...), | |
| description: str = Form(...), | |
| new_device_id: str = Form(...), | |
| password: str = Form(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| device = db.query(Device).filter(Device.device_id == device_id).first() | |
| if not device: | |
| raise HTTPException(status_code=404, detail="Device not found") | |
| if ( | |
| new_device_id != device_id | |
| and db.query(Device).filter(Device.device_id == new_device_id).first() | |
| ): | |
| raise HTTPException(status_code=400, detail="New Device ID already exists") | |
| device.name = name | |
| device.description = description | |
| device.device_id = new_device_id | |
| device.password = password | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| async def delete_device( | |
| request: Request, device_id: str, db: Session = Depends(get_db) | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| if not current_user.is_admin: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, detail="Not authorized" | |
| ) | |
| device = db.query(Device).filter(Device.device_id == device_id).first() | |
| if device: | |
| db.delete(device) | |
| db.commit() | |
| return RedirectResponse(url="/admin", status_code=status.HTTP_302_FOUND) | |
| # API routes | |
| def generate_data( | |
| device_id: str = Header(...), | |
| device_password: str = Header(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| authenticate_device(device_id, device_password, db) | |
| base_latitude = 35.6837 | |
| base_longitude = 139.6805 | |
| start_date = datetime(2024, 8, 1) | |
| end_date = datetime(2024, 8, 7) | |
| delta = end_date - start_date | |
| for _ in range(100): | |
| random_days = random.randint(0, delta.days) | |
| random_seconds = random.randint(0, 86400) | |
| random_time = start_date + timedelta(days=random_days, seconds=random_seconds) | |
| random_latitude = base_latitude + random.uniform(-0.01, 0.01) | |
| random_longitude = base_longitude + random.uniform(-0.01, 0.01) | |
| random_connect_status = random.choice([0, 1]) | |
| status_record = StatusRecord( | |
| device_id=device_id, | |
| latitude=random_latitude, | |
| longitude=random_longitude, | |
| timestamp=random_time, | |
| connect_status=random_connect_status, | |
| ) | |
| db.add(status_record) | |
| db.commit() | |
| return {"message": "Demo data generated successfully"} | |
| def delete_all_data( | |
| device_id: str = Header(...), | |
| device_password: str = Header(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| Delete all status records from the database. | |
| Requires device authentication. | |
| """ | |
| authenticate_device(device_id, device_password, db) | |
| try: | |
| db.query(StatusRecord).delete() | |
| db.commit() | |
| return {"message": "All data deleted successfully"} | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| def delete_device_data( | |
| device_id: str, | |
| auth_device_id: str = Header(...), | |
| device_password: str = Header(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| Delete status records for a specific device ID. | |
| Requires device authentication. | |
| """ | |
| authenticate_device(auth_device_id, device_password, db) | |
| try: | |
| deleted_count = ( | |
| db.query(StatusRecord).filter(StatusRecord.device_id == device_id).delete() | |
| ) | |
| db.commit() | |
| if deleted_count == 0: | |
| return {"message": f"No data found for device ID: {device_id}"} | |
| return {"message": f"Data for device ID {device_id} deleted successfully"} | |
| except Exception as e: | |
| db.rollback() | |
| raise HTTPException(status_code=500, detail=f"An error occurred: {str(e)}") | |
| async def upload_data_batch( | |
| records: StatusRecordBatch, | |
| device_id: str = Header(...), | |
| device_password: str = Header(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| Upload multiple status records in a single request. | |
| Requires device authentication and unique UUIDs for each record. | |
| Uses the device_id from the header for all records. | |
| """ | |
| authenticate_device(device_id, device_password, db) | |
| successful_uploads = 0 | |
| failed_uploads = 0 | |
| error_messages = [] | |
| failed_records = [] | |
| for record in records.records: | |
| try: | |
| # Validate UUID | |
| uuid_obj = uuid_module.UUID(record.uuid) | |
| # Validate timestamp | |
| timestamp_dt = datetime.strptime(record.timestamp, "%Y-%m-%d %H:%M:%S") | |
| status_record = StatusRecord( | |
| uuid=str(uuid_obj), | |
| device_id=device_id, | |
| latitude=record.latitude, | |
| longitude=record.longitude, | |
| timestamp=timestamp_dt, | |
| connect_status=record.connect_status, | |
| ) | |
| db.add(status_record) | |
| successful_uploads += 1 | |
| except ValueError as ve: | |
| failed_uploads += 1 | |
| error_messages.append(f"Invalid data format: {str(ve)}") | |
| failed_records.append(str(uuid_obj)) | |
| except IntegrityError: | |
| db.rollback() | |
| failed_uploads += 1 | |
| error_messages.append(f"Duplicate UUID: {record.uuid}") | |
| failed_records.append(str(uuid_obj)) | |
| except Exception as e: | |
| db.rollback() | |
| failed_uploads += 1 | |
| error_messages.append(f"Error processing record: {str(e)}") | |
| failed_records.append(str(uuid_obj)) | |
| try: | |
| db.commit() | |
| except Exception as e: | |
| db.rollback() | |
| return JSONResponse( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| content={"message": f"Error committing to database: {str(e)}"}, | |
| ) | |
| return JSONResponse( | |
| status_code=status.HTTP_200_OK, | |
| content={ | |
| "status": "ok", | |
| "message": "Batch upload completed", | |
| "successful_uploads": successful_uploads, | |
| "failed_uploads": failed_uploads, | |
| "errors": error_messages, | |
| "failed_records": failed_records, | |
| }, | |
| ) | |
| def health_check( | |
| device_id: str = Header(...), | |
| device_password: str = Header(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| Perform a health check on the API. | |
| Requires device authentication. | |
| Returns a 200 status code if successful. | |
| Returns a 401 Unauthorized error if authentication fails. | |
| """ | |
| try: | |
| authenticate_device(device_id, device_password, db) | |
| return JSONResponse(content={"status": "ok"}, status_code=status.HTTP_200_OK) | |
| except HTTPException as e: | |
| if e.status_code == status.HTTP_401_UNAUTHORIZED: | |
| return JSONResponse( | |
| content={"status": "error", "detail": "Unauthorized"}, | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| ) | |
| raise e | |
| def get_config( | |
| device_id: str = Header(...), | |
| device_password: str = Header(...), | |
| db: Session = Depends(get_db), | |
| ): | |
| """ | |
| Retrieve the system configuration from SystemSetting. | |
| Requires device authentication. | |
| """ | |
| authenticate_device(device_id, device_password, db) | |
| system_setting = db.query(SystemSetting).first() | |
| if not system_setting: | |
| raise HTTPException(status_code=404, detail="System settings not found") | |
| return { | |
| "check_connect_period": system_setting.check_connect_period, | |
| "data_sync_period": system_setting.data_sync_period, | |
| "get_config_period": system_setting.get_config_period, | |
| "point_distance": system_setting.point_distance, | |
| } | |
| def show_map( | |
| request: Request, | |
| start_date: str = None, | |
| end_date: str = None, | |
| db: Session = Depends(get_db), | |
| ): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| query = db.query(StatusRecord) | |
| if start_date and end_date: | |
| start_datetime = datetime.strptime(start_date, "%Y-%m-%d") | |
| end_datetime = datetime.strptime(end_date, "%Y-%m-%d") | |
| query = query.filter( | |
| StatusRecord.timestamp.between(start_datetime, end_datetime) | |
| ) | |
| status_data = [ | |
| (s.latitude, s.longitude, s.connect_status, s.device_id) for s in query.all() | |
| ] | |
| m = folium.Map(location=[35.6837, 139.6805], zoom_start=12) | |
| marker_cluster = MarkerCluster().add_to(m) | |
| for lat, lon, connect_status, device_id in status_data: | |
| color = "green" if connect_status == 1 else "red" | |
| folium.CircleMarker( | |
| location=[lat, lon], | |
| radius=10, | |
| popup=f"{device_id}", | |
| color=color, | |
| fill=True, | |
| fill_opacity=0.6, | |
| ).add_to(marker_cluster) | |
| map_html = m._repr_html_() | |
| return templates.TemplateResponse( | |
| "map.html", | |
| { | |
| "request": request, | |
| "map_html": map_html, | |
| "start_date": start_date, | |
| "end_date": end_date, | |
| "current_user": current_user, | |
| }, | |
| ) | |
| async def download_csv(request: Request, db: Session = Depends(get_db)): | |
| current_user = login_required(request, db) | |
| if isinstance(current_user, RedirectResponse): | |
| return current_user | |
| status_records = db.query(StatusRecord).all() | |
| output = io.StringIO() | |
| writer = csv.writer(output) | |
| writer.writerow( | |
| ["UUID", "Device ID", "Latitude", "Longitude", "Timestamp", "Connect Status"] | |
| ) | |
| for record in status_records: | |
| writer.writerow( | |
| [ | |
| record.uuid, | |
| record.device_id, | |
| record.latitude, | |
| record.longitude, | |
| record.timestamp, | |
| record.connect_status, | |
| ] | |
| ) | |
| response = StreamingResponse(iter([output.getvalue()]), media_type="text/csv") | |
| response.headers["Content-Disposition"] = "attachment; filename=status_records.csv" | |
| return response | |
| async def http_exception_handler(request: Request, exc: HTTPException): | |
| if exc.status_code == status.HTTP_401_UNAUTHORIZED: | |
| return RedirectResponse(url="/login", status_code=status.HTTP_302_FOUND) | |
| return templates.TemplateResponse( | |
| "error.html", | |
| {"request": request, "detail": exc.detail}, | |
| status_code=exc.status_code, | |
| ) | |
| # Include the routers | |
| app.include_router(admin_router) | |
| app.include_router(api_router) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |