|
|
""" |
|
|
将2个视频的面部图像拼接成一个视频 |
|
|
""" |
|
|
|
|
|
import os |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import argparse |
|
|
import random |
|
|
import torch |
|
|
import torchvision |
|
|
from torchvision.transforms import InterpolationMode |
|
|
from decord import VideoReader |
|
|
from einops import rearrange |
|
|
|
|
|
def load_video_using_decord(video_path, bbox_path, max_num_frames=81, target_fps=24): |
|
|
reader = VideoReader(video_path) |
|
|
video_length = len(reader) |
|
|
|
|
|
num_frames = (video_length - 1) // 4 * 4 + 1 if video_length < max_num_frames else max_num_frames |
|
|
start_idx = random.randint(0,video_length - num_frames - 1,) if video_length > num_frames else 0 |
|
|
|
|
|
batch_idx = np.arange(start_idx, start_idx + num_frames) |
|
|
|
|
|
|
|
|
raw_frames = reader.get_batch(batch_idx).asnumpy() |
|
|
frames = torch.from_numpy(raw_frames).permute(0, 3, 1, 2).contiguous() |
|
|
del raw_frames |
|
|
frames = frames / 255.0 |
|
|
|
|
|
h, w = frames.shape[-2:] |
|
|
face_mask_start, face_mask_end, face_center, bboxs, bbox_infos, face_mask_global = read_face_bbox( |
|
|
bbox_path, |
|
|
h, |
|
|
w, |
|
|
video_length, |
|
|
start_idx, |
|
|
start_idx + num_frames, |
|
|
expand_ratio_w = random.uniform(0.3, 0.5), |
|
|
expand_ratio_h = random.uniform(0.2, 0.4) |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
print(f"视频原始尺寸: {frames.shape[-2]}x{frames.shape[-1]}") |
|
|
|
|
|
|
|
|
if hasattr(reader, 'get_avg_fps'): |
|
|
original_fps = reader.get_avg_fps() |
|
|
if abs(original_fps - target_fps) > 0.1: |
|
|
print(f"视频原始fps: {original_fps}, 目标fps: {target_fps}") |
|
|
|
|
|
|
|
|
frames = rearrange(frames, "T C H W -> C T H W") |
|
|
|
|
|
|
|
|
del reader |
|
|
|
|
|
|
|
|
face_masks = { |
|
|
'start': face_mask_start, |
|
|
'end': face_mask_end, |
|
|
'global': face_mask_global |
|
|
} |
|
|
return frames, face_masks, start_idx, num_frames, bboxs, bbox_infos |
|
|
|
|
|
def read_face_bbox( |
|
|
bboxs_path, |
|
|
h, |
|
|
w, |
|
|
video_length = None, |
|
|
start_idx = None, |
|
|
end_idx = None, |
|
|
bbox_type = "xywh", |
|
|
expand_ratio_w = 0.3, |
|
|
expand_ratio_h = 0.2 |
|
|
): |
|
|
face_mask_start = None |
|
|
face_mask_end = None |
|
|
face_center = None |
|
|
bboxs = None |
|
|
bbox_infos = None |
|
|
if bboxs_path is not None: |
|
|
bboxs = np.load(bboxs_path) |
|
|
|
|
|
|
|
|
if start_idx is not None and end_idx is not None: |
|
|
|
|
|
video_frames = end_idx - start_idx |
|
|
|
|
|
|
|
|
if len(bboxs) == 1: |
|
|
|
|
|
bbox_start_idx = 0 |
|
|
bbox_end_idx = 0 |
|
|
else: |
|
|
|
|
|
bbox_start_idx = int(start_idx * (len(bboxs) - 1) / (video_length - 1)) if video_length > 1 else 0 |
|
|
bbox_end_idx = int(end_idx * (len(bboxs) - 1) / (video_length - 1)) if video_length > 1 else 0 |
|
|
bbox_start_idx = min(bbox_start_idx, len(bboxs) - 1) |
|
|
bbox_end_idx = min(bbox_end_idx, len(bboxs) - 1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
start_bbox = bboxs[bbox_start_idx] |
|
|
end_bbox = bboxs[bbox_end_idx] |
|
|
|
|
|
|
|
|
if bbox_type == "xywh": |
|
|
start_x_min, start_y_min = start_bbox[0], start_bbox[1] |
|
|
start_x_max = start_bbox[2] + start_bbox[0] |
|
|
start_y_max = start_bbox[3] + start_bbox[1] |
|
|
elif bbox_type == "xxyy": |
|
|
start_x_min, start_y_min = start_bbox[0], start_bbox[1] |
|
|
start_x_max, start_y_max = start_bbox[2], start_bbox[3] |
|
|
|
|
|
|
|
|
start_width = start_x_max - start_x_min |
|
|
start_height = start_y_max - start_y_min |
|
|
start_center_x = (start_x_min + start_x_max) / 2 |
|
|
start_center_y = (start_y_min + start_y_max) / 2 |
|
|
|
|
|
|
|
|
expanded_width = start_width * (1 + 2 * expand_ratio_w) |
|
|
expanded_height = start_height * (1 + 2 * expand_ratio_h) |
|
|
start_x_min = max(0, start_center_x - expanded_width / 2) |
|
|
start_x_max = min(w, start_center_x + expanded_width / 2) |
|
|
start_y_min = max(0, start_center_y - expanded_height / 2) |
|
|
start_y_max = min(h, start_center_y + expanded_height / 2) |
|
|
|
|
|
start_ori_mask = torch.zeros((h, w)) |
|
|
start_ori_mask[int(start_y_min):int(start_y_max), int(start_x_min):int(start_x_max)] = 1 |
|
|
start_face_center = [(start_x_min + start_x_max)/2, (start_y_min + start_y_max)/2] |
|
|
|
|
|
start_bbox_info = { |
|
|
'center': [start_face_center[0] / w, start_face_center[1] / h], |
|
|
'width': (start_x_max - start_x_min) / w, |
|
|
'height': (start_y_max - start_y_min) / h, |
|
|
'bbox': [start_x_min/w, start_y_min/h, start_x_max/w, start_y_max/h] |
|
|
} |
|
|
|
|
|
|
|
|
face_mask_start = crop_and_resize(start_ori_mask, face_center=start_face_center, normalize=False, interpolation=InterpolationMode.NEAREST).squeeze() |
|
|
del start_ori_mask |
|
|
|
|
|
|
|
|
if bbox_type == "xywh": |
|
|
end_x_min, end_y_min = end_bbox[0], end_bbox[1] |
|
|
end_x_max = end_bbox[2] + end_bbox[0] |
|
|
end_y_max = end_bbox[3] + end_bbox[1] |
|
|
elif bbox_type == "xxyy": |
|
|
end_x_min, end_y_min = end_bbox[0], end_bbox[1] |
|
|
end_x_max, end_y_max = end_bbox[2], end_bbox[3] |
|
|
|
|
|
|
|
|
end_width = end_x_max - end_x_min |
|
|
end_height = end_y_max - end_y_min |
|
|
end_center_x = (end_x_min + end_x_max) / 2 |
|
|
end_center_y = (end_y_min + end_y_max) / 2 |
|
|
|
|
|
|
|
|
expanded_width = end_width * (1 + 2 * expand_ratio_w) |
|
|
expanded_height = end_height * (1 + 2 * expand_ratio_h) |
|
|
end_x_min = max(0, end_center_x - expanded_width / 2) |
|
|
end_x_max = min(w, end_center_x + expanded_width / 2) |
|
|
end_y_min = max(0, end_center_y - expanded_height / 2) |
|
|
end_y_max = min(h, end_center_y + expanded_height / 2) |
|
|
|
|
|
end_ori_mask = torch.zeros((h, w)) |
|
|
end_ori_mask[int(end_y_min):int(end_y_max), int(end_x_min):int(end_x_max)] = 1 |
|
|
end_face_center = [(end_x_min + end_x_max)/2, (end_y_min + end_y_max)/2] |
|
|
end_bbox_info = { |
|
|
'center': [end_face_center[0] / w, end_face_center[1] / h], |
|
|
'width': (end_x_max - end_x_min) / w, |
|
|
'height': (end_y_max - end_y_min) / h, |
|
|
'bbox': [end_x_min/w, end_y_min/h, end_x_max/w, end_y_max/h] |
|
|
} |
|
|
|
|
|
|
|
|
face_mask_end = crop_and_resize(end_ori_mask, face_center=end_face_center, normalize=False, interpolation=InterpolationMode.NEAREST).squeeze() |
|
|
del end_ori_mask |
|
|
|
|
|
|
|
|
face_center = start_face_center |
|
|
|
|
|
|
|
|
relevant_start_idx = 0 |
|
|
relevant_end_idx = len(bboxs) - 1 |
|
|
|
|
|
relevant_bboxs = bboxs[relevant_start_idx:relevant_end_idx + 1] |
|
|
|
|
|
|
|
|
global_x_min = relevant_bboxs[:, 0].min() |
|
|
global_y_min = relevant_bboxs[:, 1].min() |
|
|
if bbox_type == "xywh": |
|
|
global_x_max = (relevant_bboxs[:, 2] + relevant_bboxs[:, 0]).max() |
|
|
global_y_max = (relevant_bboxs[:, 3] + relevant_bboxs[:, 1]).max() |
|
|
elif bbox_type == "xxyy": |
|
|
global_x_max = relevant_bboxs[:, 2].max() |
|
|
global_y_max = relevant_bboxs[:, 3].max() |
|
|
|
|
|
|
|
|
global_width = global_x_max - global_x_min |
|
|
global_height = global_y_max - global_y_min |
|
|
global_center_x = (global_x_min + global_x_max) / 2 |
|
|
global_center_y = (global_y_min + global_y_max) / 2 |
|
|
|
|
|
|
|
|
global_x_min = max(0, global_center_x - global_width / 2) |
|
|
global_x_max = min(w, global_center_x + global_width / 2) |
|
|
global_y_min = max(0, global_center_y - global_height / 2) |
|
|
global_y_max = min(h, global_center_y + global_height / 2) |
|
|
|
|
|
|
|
|
global_face_center = [(global_x_min + global_x_max)/2, (global_y_min + global_y_max)/2] |
|
|
global_bbox_info = { |
|
|
'center': [global_face_center[0] / w, global_face_center[1] / h], |
|
|
'width': (global_x_max - global_x_min) / w, |
|
|
'height': (global_y_max - global_y_min) / h, |
|
|
'bbox': [global_x_min/w, global_y_min/h, global_x_max/w, global_y_max/h] |
|
|
} |
|
|
|
|
|
|
|
|
global_ori_mask = torch.zeros((h, w)) |
|
|
global_ori_mask[int(global_y_min):int(global_y_max), int(global_x_min):int(global_x_max)] = 1 |
|
|
face_mask_global = crop_and_resize(global_ori_mask, face_center=global_face_center, normalize=False, interpolation=InterpolationMode.NEAREST).squeeze() |
|
|
del global_ori_mask |
|
|
|
|
|
|
|
|
bbox_infos = { |
|
|
'start': start_bbox_info, |
|
|
'end': end_bbox_info, |
|
|
'global': global_bbox_info |
|
|
} |
|
|
else: |
|
|
|
|
|
bbox_infos = None |
|
|
|
|
|
return face_mask_start, face_mask_end, face_center, bboxs, bbox_infos, face_mask_global if 'face_mask_global' in locals() else None |
|
|
|
|
|
|
|
|
def crop_and_resize( |
|
|
image, |
|
|
face_center=None, |
|
|
normalize=True, |
|
|
interpolation = InterpolationMode.BICUBIC, |
|
|
height = None, |
|
|
width = None |
|
|
): |
|
|
|
|
|
if not isinstance(image, torch.Tensor): |
|
|
image = torchvision.transforms.functional.to_tensor(image) |
|
|
|
|
|
ori_width, ori_height = image.shape[-1], image.shape[-2] |
|
|
if image.ndim != 4: |
|
|
image = image.view(1, -1, ori_height, ori_width) |
|
|
|
|
|
|
|
|
if height is None: |
|
|
height = ori_height |
|
|
if width is None: |
|
|
width = ori_width |
|
|
|
|
|
scale = max(width / ori_width, height / ori_height) |
|
|
image = torchvision.transforms.functional.resize( |
|
|
image, |
|
|
(round(ori_height*scale), round(ori_width*scale)), |
|
|
interpolation=interpolation |
|
|
) |
|
|
if face_center is not None: |
|
|
cx, cy = face_center[0] * scale, face_center[1] * scale |
|
|
image = torchvision.transforms.functional.crop( |
|
|
image, |
|
|
top = min(max(0, round(cy - height/2)), image.shape[-2] - height), |
|
|
left = min(max(0, round(cx - width/2)), image.shape[-1] - width), |
|
|
height = height, |
|
|
width = width |
|
|
) |
|
|
else: |
|
|
image = torchvision.transforms.functional.center_crop(image, (height, width)) |
|
|
|
|
|
if normalize: |
|
|
|
|
|
if image.shape[1] > 3: |
|
|
|
|
|
image = image.permute(1, 0, 2, 3) |
|
|
|
|
|
|
|
|
for t in range(image.shape[0]): |
|
|
image[t] = torchvision.transforms.functional.normalize( |
|
|
image[t], |
|
|
mean=[0.5, 0.5, 0.5], |
|
|
std=[0.5, 0.5, 0.5], |
|
|
) |
|
|
|
|
|
|
|
|
image = image.permute(1, 0, 2, 3) |
|
|
else: |
|
|
|
|
|
image = torchvision.transforms.functional.normalize( |
|
|
image, |
|
|
mean=[0.5, 0.5, 0.5], |
|
|
std=[0.5, 0.5, 0.5], |
|
|
) |
|
|
|
|
|
return image |
|
|
|
|
|
def debug_save_frame(video_tensor, output_path, frame_idx=0): |
|
|
""" |
|
|
调试函数:保存单帧图像用于检查裁剪效果 |
|
|
""" |
|
|
import matplotlib.pyplot as plt |
|
|
|
|
|
|
|
|
frame = video_tensor[:, frame_idx, :, :].permute(1, 2, 0).cpu().numpy() |
|
|
|
|
|
|
|
|
frame = (frame + 1) * 127.5 |
|
|
frame = np.clip(frame, 0, 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
plt.imsave(output_path, frame) |
|
|
print(f"调试帧已保存到: {output_path}") |
|
|
|
|
|
def save_video_tensor_to_file(video_tensor, output_path, fps=24): |
|
|
""" |
|
|
将视频tensor保存为视频文件,使用ffmpeg |
|
|
|
|
|
Args: |
|
|
video_tensor: 形状为 (C, T, H, W) 的视频tensor |
|
|
output_path: 输出视频路径 |
|
|
fps: 帧率 |
|
|
""" |
|
|
import subprocess |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
|
|
|
if video_tensor.is_cuda: |
|
|
video_tensor = video_tensor.cpu() |
|
|
|
|
|
|
|
|
video_np = video_tensor.permute(1, 2, 3, 0).numpy() |
|
|
|
|
|
|
|
|
video_np = (video_np + 1) * 127.5 |
|
|
video_np = np.clip(video_np, 0, 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
num_frames, height, width, channels = video_np.shape |
|
|
|
|
|
print(f"准备保存视频: 尺寸({width}x{height}), 帧数({num_frames}), fps({fps})") |
|
|
|
|
|
|
|
|
temp_dir = tempfile.mkdtemp(prefix="video_frames_") |
|
|
|
|
|
try: |
|
|
|
|
|
frame_paths = [] |
|
|
for i in range(num_frames): |
|
|
frame = video_np[i] |
|
|
frame_path = os.path.join(temp_dir, f"frame_{i:06d}.png") |
|
|
|
|
|
|
|
|
import matplotlib.pyplot as plt |
|
|
plt.imsave(frame_path, frame) |
|
|
frame_paths.append(frame_path) |
|
|
|
|
|
print(f"已保存 {num_frames} 帧到临时目录: {temp_dir}") |
|
|
|
|
|
|
|
|
ffmpeg_cmd = [ |
|
|
'ffmpeg', |
|
|
'-y', |
|
|
'-framerate', str(fps), |
|
|
'-i', os.path.join(temp_dir, 'frame_%06d.png'), |
|
|
'-c:v', 'libx264', |
|
|
'-preset', 'medium', |
|
|
'-crf', '23', |
|
|
'-pix_fmt', 'yuv420p', |
|
|
'-movflags', '+faststart', |
|
|
output_path |
|
|
] |
|
|
|
|
|
print(f"执行ffmpeg命令: {' '.join(ffmpeg_cmd)}") |
|
|
|
|
|
|
|
|
result = subprocess.run( |
|
|
ffmpeg_cmd, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
check=True |
|
|
) |
|
|
|
|
|
print(f"视频已成功保存到: {output_path}") |
|
|
|
|
|
|
|
|
if result.stdout: |
|
|
print("ffmpeg输出:", result.stdout) |
|
|
|
|
|
except subprocess.CalledProcessError as e: |
|
|
print(f"ffmpeg执行失败: {e}") |
|
|
print(f"错误输出: {e.stderr}") |
|
|
raise |
|
|
except Exception as e: |
|
|
print(f"保存视频时发生错误: {e}") |
|
|
raise |
|
|
finally: |
|
|
|
|
|
try: |
|
|
import shutil |
|
|
shutil.rmtree(temp_dir) |
|
|
print(f"已清理临时目录: {temp_dir}") |
|
|
except Exception as e: |
|
|
print(f"清理临时文件时发生错误: {e}") |
|
|
|
|
|
def save_video_tensor_to_file_efficient(video_tensor, output_path, fps=24): |
|
|
""" |
|
|
将视频tensor保存为视频文件,使用ffmpeg管道方式(更高效) |
|
|
|
|
|
Args: |
|
|
video_tensor: 形状为 (C, T, H, W) 的视频tensor |
|
|
output_path: 输出视频路径 |
|
|
fps: 帧率 |
|
|
""" |
|
|
import subprocess |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
if video_tensor.is_cuda: |
|
|
video_tensor = video_tensor.cpu() |
|
|
|
|
|
|
|
|
video_np = video_tensor.permute(1, 2, 3, 0).numpy() |
|
|
|
|
|
|
|
|
video_np = (video_np + 1) * 127.5 |
|
|
video_np = np.clip(video_np, 0, 255).astype(np.uint8) |
|
|
|
|
|
|
|
|
num_frames, height, width, channels = video_np.shape |
|
|
|
|
|
print(f"准备保存视频(高效模式): 尺寸({width}x{height}), 帧数({num_frames}), fps({fps})") |
|
|
|
|
|
|
|
|
ffmpeg_cmd = [ |
|
|
'ffmpeg', |
|
|
'-y', |
|
|
'-f', 'rawvideo', |
|
|
'-vcodec', 'rawvideo', |
|
|
'-s', f'{width}x{height}', |
|
|
'-pix_fmt', 'rgb24', |
|
|
'-r', str(fps), |
|
|
'-i', '-', |
|
|
'-c:v', 'libx264', |
|
|
'-preset', 'medium', |
|
|
'-crf', '23', |
|
|
'-pix_fmt', 'yuv420p', |
|
|
'-movflags', '+faststart', |
|
|
output_path |
|
|
] |
|
|
|
|
|
print(f"执行ffmpeg命令: {' '.join(ffmpeg_cmd)}") |
|
|
|
|
|
try: |
|
|
|
|
|
process = subprocess.Popen( |
|
|
ffmpeg_cmd, |
|
|
stdin=subprocess.PIPE, |
|
|
stdout=subprocess.PIPE, |
|
|
stderr=subprocess.PIPE |
|
|
) |
|
|
|
|
|
|
|
|
for i in range(num_frames): |
|
|
frame = video_np[i] |
|
|
|
|
|
frame_bytes = frame.tobytes() |
|
|
process.stdin.write(frame_bytes) |
|
|
|
|
|
|
|
|
stdout, stderr = process.communicate() |
|
|
|
|
|
if process.returncode == 0: |
|
|
print(f"视频已成功保存到: {output_path}") |
|
|
else: |
|
|
print(f"ffmpeg执行失败,返回码: {process.returncode}") |
|
|
print(f"错误输出: {stderr.decode()}") |
|
|
raise subprocess.CalledProcessError(process.returncode, ffmpeg_cmd) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"保存视频时发生错误: {e}") |
|
|
raise |
|
|
|
|
|
def concatenate_faces( |
|
|
video_path_1, |
|
|
video_path_2, |
|
|
bbox_path_1, |
|
|
bbox_path_2, |
|
|
output_path, |
|
|
fps=24, |
|
|
target_width=832, |
|
|
target_height=480, |
|
|
use_efficient_save=True |
|
|
): |
|
|
""" |
|
|
将两个视频的面部图像拼接成一个双人视频 |
|
|
|
|
|
Args: |
|
|
video_path_1: 第一个视频路径 |
|
|
video_path_2: 第二个视频路径 |
|
|
bbox_path_1: 第一个视频的bbox路径 |
|
|
bbox_path_2: 第二个视频的bbox路径 |
|
|
output_path: 输出视频路径 |
|
|
fps: 输出视频帧率 |
|
|
target_width: 目标视频宽度 |
|
|
target_height: 目标视频高度 |
|
|
""" |
|
|
|
|
|
frames_1, face_masks_1, start_idx_1, num_frames_1, bboxs_1, bbox_infos_1 = load_video_using_decord( |
|
|
video_path_1, |
|
|
bbox_path_1, |
|
|
target_fps=fps |
|
|
) |
|
|
|
|
|
frames_2, face_masks_2, start_idx_2, num_frames_2, bboxs_2, bbox_infos_2 = load_video_using_decord( |
|
|
video_path_2, |
|
|
bbox_path_2, |
|
|
target_fps=fps |
|
|
) |
|
|
|
|
|
|
|
|
reader1 = VideoReader(video_path_1) |
|
|
reader2 = VideoReader(video_path_2) |
|
|
|
|
|
fps1 = reader1.get_avg_fps() if hasattr(reader1, 'get_avg_fps') else None |
|
|
fps2 = reader2.get_avg_fps() if hasattr(reader2, 'get_avg_fps') else None |
|
|
|
|
|
if fps1 is not None and fps2 is not None: |
|
|
if abs(fps1 - fps2) > 0.1: |
|
|
print(f"警告: 两个视频的fps不同 - 视频1: {fps1}, 视频2: {fps2}") |
|
|
else: |
|
|
print(f"两个视频fps相同: {fps1}") |
|
|
|
|
|
del reader1, reader2 |
|
|
|
|
|
|
|
|
min_frames = min(frames_1.shape[1], frames_2.shape[1]) |
|
|
print(f"视频1帧数: {frames_1.shape[1]}, 视频2帧数: {frames_2.shape[1]}, 使用帧数: {min_frames}") |
|
|
frames_1 = frames_1[:, :min_frames, :, :] |
|
|
frames_2 = frames_2[:, :min_frames, :, :] |
|
|
|
|
|
|
|
|
person_width = target_width // 2 |
|
|
person_height = target_height |
|
|
|
|
|
print(f"目标尺寸: {target_height}x{target_width}") |
|
|
print(f"每个人占据: {person_height}x{person_width}") |
|
|
|
|
|
|
|
|
|
|
|
processed_frames_1 = crop_and_resize( |
|
|
frames_1, |
|
|
face_center=None, |
|
|
normalize=True, |
|
|
height=person_height, |
|
|
width=person_width |
|
|
) |
|
|
|
|
|
|
|
|
processed_frames_2 = crop_and_resize( |
|
|
frames_2, |
|
|
face_center=None, |
|
|
normalize=True, |
|
|
height=person_height, |
|
|
width=person_width |
|
|
) |
|
|
|
|
|
|
|
|
concatenated_frames = torch.zeros(frames_1.shape[0], min_frames, target_height, target_width) |
|
|
|
|
|
|
|
|
concatenated_frames[:, :, :, :person_width] = processed_frames_1 |
|
|
concatenated_frames[:, :, :, person_width:] = processed_frames_2 |
|
|
|
|
|
print(f"拼接信息:") |
|
|
print(f" 目标尺寸: {target_height}x{target_width}") |
|
|
print(f" 视频1处理后尺寸: {processed_frames_1.shape[-2]}x{processed_frames_1.shape[-1]}") |
|
|
print(f" 视频2处理后尺寸: {processed_frames_2.shape[-2]}x{processed_frames_2.shape[-1]}") |
|
|
print(f" 拼接后尺寸: {concatenated_frames.shape}") |
|
|
|
|
|
|
|
|
concatenated_face_masks = {} |
|
|
|
|
|
|
|
|
if face_masks_1['global'] is not None and face_masks_2['global'] is not None: |
|
|
|
|
|
mask1_processed = crop_and_resize( |
|
|
face_masks_1['global'].unsqueeze(0).unsqueeze(0), |
|
|
face_center=None, |
|
|
normalize=False, |
|
|
height=person_height, |
|
|
width=person_width |
|
|
).squeeze() |
|
|
|
|
|
mask2_processed = crop_and_resize( |
|
|
face_masks_2['global'].unsqueeze(0).unsqueeze(0), |
|
|
face_center=None, |
|
|
normalize=False, |
|
|
height=person_height, |
|
|
width=person_width |
|
|
).squeeze() |
|
|
|
|
|
|
|
|
concatenated_global_mask = torch.zeros(target_height, target_width) |
|
|
concatenated_global_mask[:, :person_width] = mask1_processed |
|
|
concatenated_global_mask[:, person_width:] = mask2_processed |
|
|
|
|
|
concatenated_face_masks['global'] = concatenated_global_mask |
|
|
concatenated_face_masks['person1'] = concatenated_global_mask[:, :person_width] |
|
|
concatenated_face_masks['person2'] = concatenated_global_mask[:, person_width:] |
|
|
|
|
|
|
|
|
if face_masks_1['start'] is not None and face_masks_2['start'] is not None: |
|
|
start1_processed = crop_and_resize( |
|
|
face_masks_1['start'].unsqueeze(0).unsqueeze(0), |
|
|
face_center=None, |
|
|
normalize=False, |
|
|
height=person_height, |
|
|
width=person_width |
|
|
).squeeze() |
|
|
|
|
|
start2_processed = crop_and_resize( |
|
|
face_masks_2['start'].unsqueeze(0).unsqueeze(0), |
|
|
face_center=None, |
|
|
normalize=False, |
|
|
height=person_height, |
|
|
width=person_width |
|
|
).squeeze() |
|
|
|
|
|
concatenated_start_mask = torch.zeros(target_height, target_width) |
|
|
concatenated_start_mask[:, :person_width] = start1_processed |
|
|
concatenated_start_mask[:, person_width:] = start2_processed |
|
|
concatenated_face_masks['start'] = concatenated_start_mask |
|
|
|
|
|
if face_masks_1['end'] is not None and face_masks_2['end'] is not None: |
|
|
end1_processed = crop_and_resize( |
|
|
face_masks_1['end'].unsqueeze(0).unsqueeze(0), |
|
|
face_center=None, |
|
|
normalize=False, |
|
|
height=person_height, |
|
|
width=person_width |
|
|
).squeeze() |
|
|
|
|
|
end2_processed = crop_and_resize( |
|
|
face_masks_2['end'].unsqueeze(0).unsqueeze(0), |
|
|
face_center=None, |
|
|
normalize=False, |
|
|
height=person_height, |
|
|
width=person_width |
|
|
).squeeze() |
|
|
|
|
|
concatenated_end_mask = torch.zeros(target_height, target_width) |
|
|
concatenated_end_mask[:, :person_width] = end1_processed |
|
|
concatenated_end_mask[:, person_width:] = end2_processed |
|
|
concatenated_face_masks['end'] = concatenated_end_mask |
|
|
|
|
|
|
|
|
output_dir = os.path.dirname(output_path) |
|
|
if output_dir and not os.path.exists(output_dir): |
|
|
os.makedirs(output_dir) |
|
|
print(f"创建输出目录: {output_dir}") |
|
|
|
|
|
|
|
|
debug_dir = os.path.join(os.path.dirname(output_path), "debug") |
|
|
if not os.path.exists(debug_dir): |
|
|
os.makedirs(debug_dir) |
|
|
|
|
|
debug_save_frame(processed_frames_1, os.path.join(debug_dir, "person1_frame0.png"), 0) |
|
|
debug_save_frame(processed_frames_2, os.path.join(debug_dir, "person2_frame0.png"), 0) |
|
|
debug_save_frame(concatenated_frames, os.path.join(debug_dir, "concatenated_frame0.png"), 0) |
|
|
|
|
|
|
|
|
print(f"使用fps: {fps} 保存视频") |
|
|
if use_efficient_save: |
|
|
save_video_tensor_to_file_efficient(concatenated_frames, output_path, fps) |
|
|
else: |
|
|
save_video_tensor_to_file(concatenated_frames, output_path, fps) |
|
|
|
|
|
|
|
|
result = { |
|
|
'concatenated_frames': concatenated_frames, |
|
|
'concatenated_face_masks': concatenated_face_masks, |
|
|
'person1_data': { |
|
|
'frames': processed_frames_1, |
|
|
'face_masks': face_masks_1, |
|
|
'start_idx': start_idx_1, |
|
|
'num_frames': num_frames_1, |
|
|
'bboxs': bboxs_1, |
|
|
'bbox_infos': bbox_infos_1 |
|
|
}, |
|
|
'person2_data': { |
|
|
'frames': processed_frames_2, |
|
|
'face_masks': face_masks_2, |
|
|
'start_idx': start_idx_2, |
|
|
'num_frames': num_frames_2, |
|
|
'bboxs': bboxs_2, |
|
|
'bbox_infos': bbox_infos_2 |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
concatenated_bbox_infos = {} |
|
|
if bbox_infos_1 is not None and bbox_infos_2 is not None: |
|
|
|
|
|
person1_bbox_infos = {} |
|
|
for key in ['start', 'end', 'global']: |
|
|
if key in bbox_infos_1: |
|
|
bbox_info = bbox_infos_1[key].copy() |
|
|
|
|
|
|
|
|
relative_width = person_width / target_width |
|
|
bbox_info['center'][0] = bbox_info['center'][0] * relative_width |
|
|
|
|
|
bbox_info['bbox'][0] = bbox_info['bbox'][0] * relative_width |
|
|
bbox_info['bbox'][2] = bbox_info['bbox'][2] * relative_width |
|
|
person1_bbox_infos[key] = bbox_info |
|
|
|
|
|
|
|
|
person2_bbox_infos = {} |
|
|
for key in ['start', 'end', 'global']: |
|
|
if key in bbox_infos_2: |
|
|
bbox_info = bbox_infos_2[key].copy() |
|
|
|
|
|
|
|
|
relative_offset = person_width / target_width |
|
|
relative_width = person_width / target_width |
|
|
bbox_info['center'][0] = relative_offset + bbox_info['center'][0] * relative_width |
|
|
|
|
|
bbox_info['bbox'][0] = relative_offset + bbox_info['bbox'][0] * relative_width |
|
|
bbox_info['bbox'][2] = relative_offset + bbox_info['bbox'][2] * relative_width |
|
|
person2_bbox_infos[key] = bbox_info |
|
|
|
|
|
concatenated_bbox_infos = { |
|
|
'person1': person1_bbox_infos, |
|
|
'person2': person2_bbox_infos |
|
|
} |
|
|
|
|
|
|
|
|
result['concatenated_bbox_infos'] = concatenated_bbox_infos |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def random_concat_test(jsonl_path, num_pairs=100, save_dir="./temp/concat_test/videos", |
|
|
base_dir="./data"): |
|
|
""" |
|
|
随机抽取视频对进行拼接测试 |
|
|
|
|
|
Args: |
|
|
jsonl_path: jsonl文件路径 |
|
|
num_pairs: 要测试的视频对数量 |
|
|
save_dir: 保存拼接结果的目录 |
|
|
base_dir: 数据集基础目录 |
|
|
""" |
|
|
import json |
|
|
import random |
|
|
import os |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
Path(save_dir).mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
|
|
|
print(f"正在读取jsonl文件: {jsonl_path}") |
|
|
with open(jsonl_path, 'r') as f: |
|
|
lines = f.readlines() |
|
|
|
|
|
|
|
|
videos = [] |
|
|
for line in lines: |
|
|
try: |
|
|
video_info = json.loads(line.strip()) |
|
|
videos.append(video_info) |
|
|
except json.JSONDecodeError as e: |
|
|
print(f"解析jsonl行时出错: {e}") |
|
|
continue |
|
|
|
|
|
print(f"总共读取到 {len(videos)} 个视频") |
|
|
|
|
|
|
|
|
if len(videos) < 2: |
|
|
print("视频数量不足,无法进行拼接测试") |
|
|
return |
|
|
|
|
|
|
|
|
selected_pairs = [] |
|
|
for i in range(num_pairs): |
|
|
|
|
|
pair = random.sample(videos, 2) |
|
|
selected_pairs.append(pair) |
|
|
|
|
|
print(f"已随机选择 {len(selected_pairs)} 对视频进行测试") |
|
|
|
|
|
|
|
|
success_count = 0 |
|
|
failed_pairs = [] |
|
|
|
|
|
for i, (video1, video2) in enumerate(selected_pairs): |
|
|
try: |
|
|
print(f"\n正在处理第 {i+1}/{len(selected_pairs)} 对视频:") |
|
|
print(f" 视频1: {video1['video']}") |
|
|
print(f" 视频2: {video2['video']}") |
|
|
|
|
|
|
|
|
video_path_1 = os.path.join(base_dir, video1['video']) |
|
|
video_path_2 = os.path.join(base_dir, video2['video']) |
|
|
bbox_path_1 = os.path.join(base_dir, video1['bboxs']) |
|
|
bbox_path_2 = os.path.join(base_dir, video2['bboxs']) |
|
|
|
|
|
|
|
|
if not all(os.path.exists(path) for path in [video_path_1, video_path_2, bbox_path_1, bbox_path_2]): |
|
|
print(" 文件不存在,跳过此对") |
|
|
failed_pairs.append((video1, video2, "文件不存在")) |
|
|
continue |
|
|
|
|
|
|
|
|
video1_name = os.path.splitext(os.path.basename(video1['video']))[0] |
|
|
video2_name = os.path.splitext(os.path.basename(video2['video']))[0] |
|
|
output_name = f"{video1_name}_{video2_name}.mp4" |
|
|
output_path = os.path.join(save_dir, output_name) |
|
|
|
|
|
|
|
|
result = concatenate_faces( |
|
|
video_path_1, |
|
|
video_path_2, |
|
|
bbox_path_1, |
|
|
bbox_path_2, |
|
|
output_path, |
|
|
fps=16, |
|
|
target_width=832, |
|
|
target_height=480, |
|
|
use_efficient_save=True |
|
|
) |
|
|
|
|
|
print(f" 拼接成功: {output_name}") |
|
|
success_count += 1 |
|
|
|
|
|
except Exception as e: |
|
|
print(f" 拼接失败: {str(e)}") |
|
|
failed_pairs.append((video1, video2, str(e))) |
|
|
|
|
|
|
|
|
print(f"\n=== 拼接测试结果 ===") |
|
|
print(f"总测试对数: {len(selected_pairs)}") |
|
|
print(f"成功对数: {success_count}") |
|
|
print(f"失败对数: {len(failed_pairs)}") |
|
|
print(f"成功率: {success_count/len(selected_pairs)*100:.2f}%") |
|
|
|
|
|
if failed_pairs: |
|
|
print(f"\n失败的对子:") |
|
|
for i, (video1, video2, error) in enumerate(failed_pairs[:10]): |
|
|
print(f" {i+1}. {os.path.basename(video1['video'])} + {os.path.basename(video2['video'])}: {error}") |
|
|
if len(failed_pairs) > 10: |
|
|
print(f" ... 还有 {len(failed_pairs) - 10} 个失败的对子") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
parser = argparse.ArgumentParser(description="将两个视频的面部图像拼接成双人视频") |
|
|
parser.add_argument("--video_path_1", type=str, default="./data/test_data/images_w_bbox/1.mp4", help="第一个视频路径") |
|
|
parser.add_argument("--video_path_2", type=str, default="./data/test_data/images_w_bbox/5.mp4", help="第二个视频路径") |
|
|
parser.add_argument("--bbox_path_1", type=str, default="./data/test_data/images_w_bbox/1.npy", help="第一个视频bbox路径") |
|
|
parser.add_argument("--bbox_path_2", type=str, default="./data/test_data/images_w_bbox/5.npy", help="第二个视频bbox路径") |
|
|
parser.add_argument("--output_path", type=str, default="./temp/concat_test/1-5.mp4", help="输出视频路径") |
|
|
parser.add_argument("--fps", type=int, default=24, help="输出视频帧率") |
|
|
parser.add_argument("--target_width", type=int, default=832, help="输出视频宽度") |
|
|
parser.add_argument("--target_height", type=int, default=480, help="输出视频高度") |
|
|
parser.add_argument("--use_efficient_save", action="store_true", help="使用高效的ffmpeg管道保存方式") |
|
|
parser.add_argument("--random_test", action="store_true", help="进行随机拼接测试") |
|
|
parser.add_argument("--jsonl_path", type=str, default="./metadata_wan_fps24.jsonl", help="jsonl文件路径") |
|
|
parser.add_argument("--num_pairs", type=int, default=100, help="随机测试的视频对数量") |
|
|
parser.add_argument("--save_dir", type=str, default="./temp/concat_test/videos", help="保存拼接结果的目录") |
|
|
args = parser.parse_args() |
|
|
|
|
|
if args.random_test: |
|
|
|
|
|
random_concat_test( |
|
|
jsonl_path=args.jsonl_path, |
|
|
num_pairs=args.num_pairs, |
|
|
save_dir=args.save_dir |
|
|
) |
|
|
else: |
|
|
|
|
|
try: |
|
|
result = concatenate_faces( |
|
|
args.video_path_1, |
|
|
args.video_path_2, |
|
|
args.bbox_path_1, |
|
|
args.bbox_path_2, |
|
|
args.output_path, |
|
|
args.fps, |
|
|
args.target_width, |
|
|
args.target_height, |
|
|
args.use_efficient_save |
|
|
) |
|
|
print("视频拼接完成!") |
|
|
print(f"拼接后视频尺寸: {result['concatenated_frames'].shape}") |
|
|
print(f"第一个人的数据包含: {list(result['person1_data'].keys())}") |
|
|
print(f"第二个人的数据包含: {list(result['person2_data'].keys())}") |
|
|
except Exception as e: |
|
|
print(f"视频拼接失败: {str(e)}") |
|
|
import traceback |
|
|
traceback.print_exc() |
|
|
|