File size: 6,831 Bytes
493df70 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
import os
import base64
import mimetypes
from PIL import Image
import io
from transformers.video_utils import VideoMetadata
def encode_pil_to_jpeg_data_url(pil_image):
from io import BytesIO
buf = BytesIO()
pil_image.save(buf, format="JPEG")
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
return f"data:image/jpeg;base64,{b64}"
def sample_video_frames_to_data_urls(video_path_local, fps=1, nframe=0, nframe_max=-1):
"""
Sample frames from a video and return base64-encoded data URLs along with metadata.
Args:
video_path_local: Path to the video file
fps: Target frames per second for sampling (if > 0, uses fps-based sampling)
nframe: Number of frames to sample (used if fps <= 0)
nframe_max: Maximum number of frames to sample
Returns:
tuple: (frame_data_urls, metadata)
- frame_data_urls: List of base64-encoded frame images
- metadata: VideoMetadata dataclass containing info about the sampled frames:
- total_num_frames: Number of sampled frames
- fps: Effective frame rate of the sampled frames
- duration: Duration covered by the sampled frames (in seconds)
- video_backend: Backend used for video processing ('decord')
"""
import numpy as np
from PIL import Image
import decord
vid = decord.VideoReader(video_path_local)
total_frames = len(vid)
video_fps = vid.get_avg_fps()
total_duration = total_frames / max(1e-6, video_fps)
if fps > 0:
required_frames = int(total_duration * fps)
desired_frames = max(1, required_frames)
if nframe_max > 0 and desired_frames > nframe_max:
desired_frames = nframe_max
if desired_frames >= total_frames:
indices = list(range(total_frames))
elif desired_frames == 1:
indices = [0] # Always use first frame for single frame sampling
else:
# Generate evenly spaced indices and ensure uniqueness
raw_indices = np.linspace(0, total_frames - 1, desired_frames)
indices = list(np.unique(np.round(raw_indices).astype(int)))
else:
desired_frames = max(1, int(nframe) if nframe and nframe > 0 else 8)
if nframe_max > 0 and desired_frames > nframe_max:
desired_frames = nframe_max
if desired_frames >= total_frames:
indices = list(range(total_frames))
elif desired_frames == 1:
indices = [0] # Always use first frame for single frame sampling
else:
# Generate evenly spaced indices and ensure uniqueness
raw_indices = np.linspace(0, total_frames - 1, desired_frames)
indices = list(np.unique(np.round(raw_indices).astype(int)))
images = [Image.fromarray(vid[i].asnumpy()) for i in indices]
frame_urls = [encode_pil_to_jpeg_data_url(im) for im in images]
# Calculate timestamps for each sampled frame
timestamps = [float(idx) / video_fps for idx in indices]
# Calculate metadata for the sampled frames
sampled_num_frames = len(indices)
# Duration is the time span from first to last frame
if len(timestamps) > 1:
sampled_duration = timestamps[-1] - timestamps[0]
sampled_fps = (sampled_num_frames - 1) / sampled_duration if sampled_duration > 0 else 1.0
else:
# Single frame case
sampled_duration = None
sampled_fps = None
metadata = VideoMetadata(
total_num_frames=sampled_num_frames,
fps=sampled_fps,
duration=sampled_duration,
video_backend=None,
)
return frame_urls, metadata
def maybe_path_or_url_to_data_urls(path_or_url, fps=1, nframe=0, nframe_max=-1):
"""
Convert a path or URL to data URLs, handling videos, images, and remote files.
Args:
path_or_url: Path or URL to the media file
fps: Target frames per second for video sampling (if > 0, uses fps-based sampling)
nframe: Number of frames to sample from video (used if fps <= 0)
nframe_max: Maximum number of frames to sample
Returns:
tuple: (data_urls, metadata)
- data_urls: List of base64-encoded data URLs
- metadata: VideoMetadata dataclass with video metadata or None for images
"""
val = str(path_or_url or "")
low = val.lower()
# Handle data URLs
if low.startswith("data:"):
if low.startswith("data:video/mp4"):
header, _, b64part = val.partition(",")
if not b64part:
return [val], None
import tempfile
tmp = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False)
try:
tmp.write(base64.b64decode(b64part))
tmp.flush(); tmp.close()
return sample_video_frames_to_data_urls(tmp.name, fps=fps, nframe=nframe, nframe_max=nframe_max)
finally:
try:
os.unlink(tmp.name)
except Exception:
pass
return [val], None
# Remote URL
if low.startswith("http://") or low.startswith("https://"):
if low.endswith(".mp4"):
try:
import tempfile, urllib.request
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmpf:
urllib.request.urlretrieve(val, tmpf.name)
local_path = tmpf.name
result = sample_video_frames_to_data_urls(local_path, fps=fps, nframe=nframe, nframe_max=nframe_max)
try:
os.unlink(local_path)
except Exception:
pass
return result
except Exception:
return [val], None
return [val], None
# Local path
if os.path.exists(val):
mime, _ = mimetypes.guess_type(val)
if mime and mime.startswith("image/"):
with open(val, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
return [f"data:{mime};base64,{b64}"], None
if mime == "video/mp4" or (mime is None and val.endswith(".mp4")):
return sample_video_frames_to_data_urls(val, fps=fps, nframe=nframe, nframe_max=nframe_max)
# Fallback: treat as binary image
with open(val, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
return [f"data:image/jpeg;base64,{b64}"], None
return [val], None
def pil_image_from_base64(b64_str: str) -> Image.Image:
# Handle data URLs like "data:image/png;base64,...."
if b64_str.startswith('data:'):
b64_str = b64_str.split(',', 1)[1]
img_bytes = base64.b64decode(b64_str)
return Image.open(io.BytesIO(img_bytes))
|