ml-sharp / model_utils.py
ronedgecomb's picture
Initial commit
a1b6914 verified
"""SHARP inference + optional CUDA video rendering utilities.
Design goals:
- Reuse SHARP's own predict/render pipeline (no subprocess calls).
- Be robust on Hugging Face Spaces + ZeroGPU.
- Cache model weights and predictor construction across requests.
Public API (used by the Gradio app):
- TrajectoryType
- predict_and_maybe_render_gpu(...)
"""
from __future__ import annotations
import os
import threading
import time
import uuid
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Final, Literal
import torch
try:
import spaces
except Exception: # pragma: no cover
spaces = None # type: ignore[assignment]
try:
# Prefer HF cache / Hub downloads (works with Spaces `preload_from_hub`).
from huggingface_hub import hf_hub_download, try_to_load_from_cache
except Exception: # pragma: no cover
hf_hub_download = None # type: ignore[assignment]
try_to_load_from_cache = None # type: ignore[assignment]
from sharp.cli.predict import DEFAULT_MODEL_URL, predict_image
from sharp.cli.render import render_gaussians as sharp_render_gaussians
from sharp.models import PredictorParams, create_predictor
from sharp.utils import camera, io
from sharp.utils.gaussians import Gaussians3D, SceneMetaData, save_ply
from sharp.utils.gsplat import GSplatRenderer
TrajectoryType = Literal["swipe", "shake", "rotate", "rotate_forward"]
# -----------------------------------------------------------------------------
# Helpers
# -----------------------------------------------------------------------------
def _now_ms() -> int:
return int(time.time() * 1000)
def _ensure_dir(path: Path) -> Path:
path.mkdir(parents=True, exist_ok=True)
return path
def _make_even(x: int) -> int:
return x if x % 2 == 0 else x + 1
def _select_device(preference: str = "auto") -> torch.device:
"""Select the best available device for inference (CPU/CUDA/MPS)."""
if preference not in {"auto", "cpu", "cuda", "mps"}:
raise ValueError("device preference must be one of: auto|cpu|cuda|mps")
if preference == "cpu":
return torch.device("cpu")
if preference == "cuda":
return torch.device("cuda" if torch.cuda.is_available() else "cpu")
if preference == "mps":
return torch.device("mps" if torch.backends.mps.is_available() else "cpu")
# auto
if torch.cuda.is_available():
return torch.device("cuda")
if torch.backends.mps.is_available():
return torch.device("mps")
return torch.device("cpu")
# -----------------------------------------------------------------------------
# Prediction outputs
# -----------------------------------------------------------------------------
@dataclass(frozen=True, slots=True)
class PredictionOutputs:
"""Outputs of SHARP inference (plus derived metadata for rendering)."""
ply_path: Path
gaussians: Gaussians3D
metadata_for_render: SceneMetaData
input_resolution_hw: tuple[int, int]
focal_length_px: float
# -----------------------------------------------------------------------------
# Patch SHARP VideoWriter to properly close the optional depth writer
# -----------------------------------------------------------------------------
class _PatchedVideoWriter(io.VideoWriter):
"""Ensure depth writer is closed so files can be safely cleaned up."""
def __init__(
self, output_path: Path, fps: float = 30.0, render_depth: bool = True
) -> None:
super().__init__(output_path, fps=fps, render_depth=render_depth)
# Ensure attribute exists for downstream code paths.
if not hasattr(self, "depth_writer"):
self.depth_writer = None # type: ignore[attribute-defined-outside-init]
def close(self):
super().close()
depth_writer = getattr(self, "depth_writer", None)
try:
if depth_writer is not None:
depth_writer.close()
except Exception:
pass
@contextmanager
def _patched_sharp_videowriter():
"""Temporarily patch `sharp.utils.io.VideoWriter` used by `sharp.cli.render`."""
original = io.VideoWriter
io.VideoWriter = _PatchedVideoWriter # type: ignore[assignment]
try:
yield
finally:
io.VideoWriter = original # type: ignore[assignment]
# -----------------------------------------------------------------------------
# Model wrapper
# -----------------------------------------------------------------------------
class ModelWrapper:
"""Cached SHARP model wrapper for Gradio/Spaces."""
def __init__(
self,
*,
outputs_dir: str | Path = "outputs",
checkpoint_url: str = DEFAULT_MODEL_URL,
checkpoint_path: str | Path | None = None,
device_preference: str = "auto",
keep_model_on_device: bool | None = None,
hf_repo_id: str | None = None,
hf_filename: str | None = None,
hf_revision: str | None = None,
) -> None:
self.outputs_dir = _ensure_dir(Path(outputs_dir))
self.checkpoint_url = checkpoint_url
env_ckpt = os.getenv("SHARP_CHECKPOINT_PATH") or os.getenv("SHARP_CHECKPOINT")
if checkpoint_path:
self.checkpoint_path = Path(checkpoint_path)
elif env_ckpt:
self.checkpoint_path = Path(env_ckpt)
else:
self.checkpoint_path = None
# Optional Hugging Face Hub fallback (useful when direct CDN download fails).
self.hf_repo_id = hf_repo_id or os.getenv("SHARP_HF_REPO_ID", "apple/Sharp")
self.hf_filename = hf_filename or os.getenv(
"SHARP_HF_FILENAME", "sharp_2572gikvuh.pt"
)
self.hf_revision = hf_revision or os.getenv("SHARP_HF_REVISION") or None
self.device_preference = device_preference
# For ZeroGPU, it's safer to not keep large tensors on CUDA across calls.
if keep_model_on_device is None:
keep_env = (
os.getenv("SHARP_KEEP_MODEL_ON_DEVICE")
)
self.keep_model_on_device = keep_env == "1"
else:
self.keep_model_on_device = keep_model_on_device
self._lock = threading.RLock()
self._predictor: torch.nn.Module | None = None
self._predictor_device: torch.device | None = None
self._state_dict: dict | None = None
def has_cuda(self) -> bool:
return torch.cuda.is_available()
def _load_state_dict(self) -> dict:
with self._lock:
if self._state_dict is not None:
return self._state_dict
# 1) Explicit local checkpoint path
if self.checkpoint_path is not None:
try:
self._state_dict = torch.load(
self.checkpoint_path,
weights_only=True,
map_location="cpu",
)
return self._state_dict
except Exception as e:
raise RuntimeError(
"Failed to load SHARP checkpoint from local path.\n\n"
f"Path:\n {self.checkpoint_path}\n\n"
f"Original error:\n {type(e).__name__}: {e}"
) from e
# 2) HF cache (no-network): best match for Spaces `preload_from_hub`.
hf_cache_error: Exception | None = None
if try_to_load_from_cache is not None:
try:
cached = try_to_load_from_cache(
repo_id=self.hf_repo_id,
filename=self.hf_filename,
revision=self.hf_revision,
repo_type="model",
)
except TypeError:
cached = try_to_load_from_cache(self.hf_repo_id, self.hf_filename) # type: ignore[misc]
try:
if isinstance(cached, str) and Path(cached).exists():
self._state_dict = torch.load(
cached, weights_only=True, map_location="cpu"
)
return self._state_dict
except Exception as e:
hf_cache_error = e
# 3) HF Hub download (reuse cache when available; may download otherwise).
hf_error: Exception | None = None
if hf_hub_download is not None:
# Attempt "local only" mode if supported (avoids network).
try:
import inspect
if "local_files_only" in inspect.signature(hf_hub_download).parameters:
ckpt_path = hf_hub_download(
repo_id=self.hf_repo_id,
filename=self.hf_filename,
revision=self.hf_revision,
local_files_only=True,
)
if Path(ckpt_path).exists():
self._state_dict = torch.load(
ckpt_path, weights_only=True, map_location="cpu"
)
return self._state_dict
except Exception:
pass
try:
ckpt_path = hf_hub_download(
repo_id=self.hf_repo_id,
filename=self.hf_filename,
revision=self.hf_revision,
)
self._state_dict = torch.load(
ckpt_path,
weights_only=True,
map_location="cpu",
)
return self._state_dict
except Exception as e:
hf_error = e
# 4) Default upstream CDN (torch hub cache). Last resort.
url_error: Exception | None = None
try:
self._state_dict = torch.hub.load_state_dict_from_url(
self.checkpoint_url,
progress=True,
map_location="cpu",
)
return self._state_dict
except Exception as e:
url_error = e
# If we got here: all options failed.
hint_lines = [
"Failed to load SHARP checkpoint.",
"",
"Tried (in order):",
f" 1) HF cache (preload_from_hub): repo_id={self.hf_repo_id}, filename={self.hf_filename}, revision={self.hf_revision or 'None'}",
f" 2) HF Hub download: repo_id={self.hf_repo_id}, filename={self.hf_filename}, revision={self.hf_revision or 'None'}",
f" 3) URL (torch hub): {self.checkpoint_url}",
"",
"If network access is restricted, set a local checkpoint path:",
" - SHARP_CHECKPOINT_PATH=/path/to/sharp_2572gikvuh.pt",
"",
"Original errors:",
]
if try_to_load_from_cache is None:
hint_lines.append(" HF cache: huggingface_hub not installed")
elif hf_cache_error is not None:
hint_lines.append(
f" HF cache: {type(hf_cache_error).__name__}: {hf_cache_error}"
)
else:
hint_lines.append(" HF cache: (not found in cache)")
if hf_hub_download is None:
hint_lines.append(" HF download: huggingface_hub not installed")
else:
hint_lines.append(f" HF download: {type(hf_error).__name__}: {hf_error}")
hint_lines.append(f" URL: {type(url_error).__name__}: {url_error}")
raise RuntimeError("\n".join(hint_lines))
def _get_predictor(self, device: torch.device) -> torch.nn.Module:
with self._lock:
if self._predictor is None:
state_dict = self._load_state_dict()
predictor = create_predictor(PredictorParams())
predictor.load_state_dict(state_dict)
predictor.eval()
self._predictor = predictor
self._predictor_device = torch.device("cpu")
assert self._predictor is not None
assert self._predictor_device is not None
if self._predictor_device != device:
self._predictor.to(device)
self._predictor_device = device
return self._predictor
def _maybe_move_model_back_to_cpu(self) -> None:
if self.keep_model_on_device:
return
with self._lock:
if self._predictor is not None and self._predictor_device is not None:
if self._predictor_device.type != "cpu":
self._predictor.to("cpu")
self._predictor_device = torch.device("cpu")
if torch.cuda.is_available():
torch.cuda.empty_cache()
def _make_output_stem(self, input_path: Path) -> str:
return f"{input_path.stem}-{_now_ms()}-{uuid.uuid4().hex[:8]}"
def predict_to_ply(self, image_path: str | Path) -> PredictionOutputs:
"""Run SHARP inference and export a .ply file."""
image_path = Path(image_path)
if not image_path.exists():
raise FileNotFoundError(f"Image does not exist: {image_path}")
device = _select_device(self.device_preference)
predictor = self._get_predictor(device)
image_np, _, f_px = io.load_rgb(image_path)
height, width = image_np.shape[:2]
with torch.no_grad():
gaussians = predict_image(predictor, image_np, f_px, device)
stem = self._make_output_stem(image_path)
ply_path = self.outputs_dir / f"{stem}.ply"
# save_ply expects (height, width).
save_ply(gaussians, f_px, (height, width), ply_path)
# SceneMetaData expects (width, height) for resolution.
metadata_for_render = SceneMetaData(
focal_length_px=float(f_px),
resolution_px=(int(width), int(height)),
color_space="linearRGB",
)
self._maybe_move_model_back_to_cpu()
return PredictionOutputs(
ply_path=ply_path,
gaussians=gaussians,
metadata_for_render=metadata_for_render,
input_resolution_hw=(int(height), int(width)),
focal_length_px=float(f_px),
)
def _render_video_impl(
self,
*,
gaussians: Gaussians3D,
metadata: SceneMetaData,
output_path: Path,
trajectory_type: TrajectoryType,
num_frames: int,
fps: int,
output_long_side: int | None,
) -> Path:
if not torch.cuda.is_available():
raise RuntimeError("Rendering requires CUDA (gsplat).")
if num_frames < 2:
raise ValueError("num_frames must be >= 2")
if fps < 1:
raise ValueError("fps must be >= 1")
# Keep aligned with upstream CLI pipeline where possible.
if output_long_side is None and int(fps) == 30:
params = camera.TrajectoryParams(
type=trajectory_type,
num_steps=int(num_frames),
num_repeats=1,
)
with _patched_sharp_videowriter():
sharp_render_gaussians(
gaussians=gaussians,
metadata=metadata,
params=params,
output_path=output_path,
)
depth_path = output_path.with_suffix(".depth.mp4")
try:
if depth_path.exists():
depth_path.unlink()
except Exception:
pass
return output_path
# Adapted pipeline for custom output resolution / FPS.
src_w, src_h = metadata.resolution_px
src_f = float(metadata.focal_length_px)
if output_long_side is None:
out_w, out_h, out_f = src_w, src_h, src_f
else:
long_side = max(src_w, src_h)
scale = float(output_long_side) / float(long_side)
out_w = _make_even(max(2, int(round(src_w * scale))))
out_h = _make_even(max(2, int(round(src_h * scale))))
out_f = src_f * scale
traj_params = camera.TrajectoryParams(
type=trajectory_type,
num_steps=int(num_frames),
num_repeats=1,
)
device = torch.device("cuda")
gaussians_cuda = gaussians.to(device)
intrinsics = torch.tensor(
[
[out_f, 0.0, (out_w - 1) / 2.0, 0.0],
[0.0, out_f, (out_h - 1) / 2.0, 0.0],
[0.0, 0.0, 1.0, 0.0],
[0.0, 0.0, 0.0, 1.0],
],
device=device,
dtype=torch.float32,
)
cam_model = camera.create_camera_model(
gaussians_cuda,
intrinsics,
resolution_px=(out_w, out_h),
lookat_mode=traj_params.lookat_mode,
)
trajectory = camera.create_eye_trajectory(
gaussians_cuda,
traj_params,
resolution_px=(out_w, out_h),
f_px=out_f,
)
renderer = GSplatRenderer(color_space=metadata.color_space)
# IMPORTANT: Keep render_depth=True (avoids upstream AttributeError).
video_writer = _PatchedVideoWriter(output_path, fps=float(fps), render_depth=True)
for eye_position in trajectory:
cam_info = cam_model.compute(eye_position)
rendering = renderer(
gaussians_cuda,
extrinsics=cam_info.extrinsics[None].to(device),
intrinsics=cam_info.intrinsics[None].to(device),
image_width=cam_info.width,
image_height=cam_info.height,
)
color = (rendering.color[0].permute(1, 2, 0) * 255.0).to(dtype=torch.uint8)
depth = rendering.depth[0]
video_writer.add_frame(color, depth)
video_writer.close()
depth_path = output_path.with_suffix(".depth.mp4")
try:
if depth_path.exists():
depth_path.unlink()
except Exception:
pass
return output_path
def render_video(
self,
*,
gaussians: Gaussians3D,
metadata: SceneMetaData,
output_stem: str,
trajectory_type: TrajectoryType = "rotate_forward",
num_frames: int = 60,
fps: int = 30,
output_long_side: int | None = None,
) -> Path:
"""Render a camera trajectory as an MP4 (CUDA-only)."""
output_path = self.outputs_dir / f"{output_stem}.mp4"
return self._render_video_impl(
gaussians=gaussians,
metadata=metadata,
output_path=output_path,
trajectory_type=trajectory_type,
num_frames=num_frames,
fps=fps,
output_long_side=output_long_side,
)
def predict_and_maybe_render(
self,
image_path: str | Path,
*,
trajectory_type: TrajectoryType,
num_frames: int,
fps: int,
output_long_side: int | None,
render_video: bool = True,
) -> tuple[Path | None, Path]:
"""One-shot helper for the UI: returns (video_path, ply_path)."""
pred = self.predict_to_ply(image_path)
if not render_video:
return None, pred.ply_path
if not torch.cuda.is_available():
return None, pred.ply_path
output_stem = pred.ply_path.with_suffix("").name
video_path = self.render_video(
gaussians=pred.gaussians,
metadata=pred.metadata_for_render,
output_stem=output_stem,
trajectory_type=trajectory_type,
num_frames=num_frames,
fps=fps,
output_long_side=output_long_side,
)
return video_path, pred.ply_path
# -----------------------------------------------------------------------------
# ZeroGPU entrypoints
# -----------------------------------------------------------------------------
#
# IMPORTANT: Do NOT decorate bound instance methods with `@spaces.GPU` on ZeroGPU.
# The wrapper uses multiprocessing queues and pickles args/kwargs. If `self` is
# included, Python will try to pickle the whole instance. ModelWrapper contains
# a threading.RLock (not pickleable) and the model itself should not be pickled.
#
# Expose module-level functions that accept only pickleable arguments and
# create/cache the ModelWrapper inside the GPU worker process.
DEFAULT_OUTPUTS_DIR: Final[Path] = _ensure_dir(Path(__file__).resolve().parent / "outputs")
_GLOBAL_MODEL: ModelWrapper | None = None
_GLOBAL_MODEL_INIT_LOCK: Final[threading.Lock] = threading.Lock()
def get_global_model(*, outputs_dir: str | Path = DEFAULT_OUTPUTS_DIR) -> ModelWrapper:
global _GLOBAL_MODEL
with _GLOBAL_MODEL_INIT_LOCK:
if _GLOBAL_MODEL is None:
_GLOBAL_MODEL = ModelWrapper(outputs_dir=outputs_dir)
return _GLOBAL_MODEL
def predict_and_maybe_render(
image_path: str | Path,
*,
trajectory_type: TrajectoryType,
num_frames: int,
fps: int,
output_long_side: int | None,
render_video: bool = True,
) -> tuple[Path | None, Path]:
model = get_global_model()
return model.predict_and_maybe_render(
image_path,
trajectory_type=trajectory_type,
num_frames=num_frames,
fps=fps,
output_long_side=output_long_side,
render_video=render_video,
)
# Export the GPU-wrapped callable (or a no-op wrapper locally).
if spaces is not None:
predict_and_maybe_render_gpu = spaces.GPU(duration=180)(predict_and_maybe_render)
else: # pragma: no cover
predict_and_maybe_render_gpu = predict_and_maybe_render