08b6543b79
- 添加视频处理性能计时和统计功能 - 实现帧处理时间监控和慢帧警告 - 添加音频文件静音修剪功能 - 优化Windows平台音频播放实现 - 调整默认日志输出频率减少冗余信息 - 修复MediaPipe GPU委托在Windows上的兼容性问题
228 lines
6.3 KiB
Python
228 lines
6.3 KiB
Python
# app/audio/generate.py
|
|
from __future__ import annotations
|
|
|
|
import platform
|
|
import shutil
|
|
import subprocess
|
|
import wave
|
|
from pathlib import Path
|
|
|
|
from loguru import logger
|
|
|
|
|
|
def generate_rep_audio_files(
|
|
*,
|
|
max_count: int,
|
|
rate: int,
|
|
output_dir: Path,
|
|
overwrite: bool = False,
|
|
trim_leading_silence: bool = True,
|
|
trim_silence_threshold: int = 500,
|
|
trim_silence_padding_ms: int = 20,
|
|
) -> None:
|
|
"""
|
|
确保 0~max_count 的运动次数语音 wav 文件存在。
|
|
|
|
默认生成到:
|
|
|
|
resources/audio/reps/0.aiff # macOS
|
|
resources/audio/reps/0.wav # Windows / Linux
|
|
...
|
|
resources/audio/reps/200.aiff 或 200.wav
|
|
|
|
服务启动时调用一次即可。
|
|
"""
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
system = platform.system().lower()
|
|
suffix = ".aiff" if system == "darwin" else ".wav"
|
|
|
|
missing_counts = [
|
|
count
|
|
for count in range(0, max_count + 1)
|
|
if overwrite or not _audio_path(output_dir, count, suffix=suffix).exists()
|
|
]
|
|
|
|
if not missing_counts:
|
|
logger.info("Rep audio files already prepared: {}", output_dir)
|
|
else:
|
|
logger.info(
|
|
"Preparing rep audio files, system={}, count={}, output_dir={}",
|
|
system,
|
|
len(missing_counts),
|
|
output_dir,
|
|
)
|
|
|
|
if system == "darwin":
|
|
_generate_with_macos_say(
|
|
counts=missing_counts,
|
|
output_dir=output_dir,
|
|
rate=rate,
|
|
)
|
|
else:
|
|
_generate_with_pyttsx3(
|
|
counts=missing_counts,
|
|
output_dir=output_dir,
|
|
rate=rate,
|
|
)
|
|
|
|
logger.info("Rep audio files prepared: {}", output_dir)
|
|
|
|
if trim_leading_silence and suffix == ".wav":
|
|
_trim_leading_silence_files(
|
|
counts=list(range(0, max_count + 1)),
|
|
output_dir=output_dir,
|
|
suffix=suffix,
|
|
threshold=trim_silence_threshold,
|
|
padding_ms=trim_silence_padding_ms,
|
|
)
|
|
|
|
|
|
def _generate_with_macos_say(
|
|
*,
|
|
counts: list[int],
|
|
output_dir: Path,
|
|
rate: int,
|
|
) -> None:
|
|
"""macOS 使用 say 命令生成 wav。"""
|
|
if platform.system().lower() != "darwin":
|
|
raise RuntimeError("say command is only available on macOS")
|
|
|
|
if shutil.which("say") is None:
|
|
raise RuntimeError("macOS say command not found")
|
|
|
|
for count in counts:
|
|
audio_file = _audio_path(output_dir, count, suffix=".aiff")
|
|
|
|
try:
|
|
subprocess.run(
|
|
[
|
|
"say",
|
|
"-r",
|
|
str(rate),
|
|
"--file-format=AIFF",
|
|
"-o",
|
|
str(audio_file),
|
|
str(count),
|
|
],
|
|
stdout=subprocess.DEVNULL,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
except subprocess.CalledProcessError as exc:
|
|
message = exc.stderr.strip() or f"exit status {exc.returncode}"
|
|
raise RuntimeError(f"Failed to generate {audio_file}: {message}") from exc
|
|
|
|
|
|
def _generate_with_pyttsx3(
|
|
*,
|
|
counts: list[int],
|
|
output_dir: Path,
|
|
rate: int,
|
|
) -> None:
|
|
"""Windows / Linux 使用 pyttsx3 生成 wav。"""
|
|
try:
|
|
import pyttsx3
|
|
except Exception as exc:
|
|
raise RuntimeError(f"pyttsx3 unavailable: {exc}") from exc
|
|
|
|
engine = pyttsx3.init()
|
|
engine.setProperty("rate", rate)
|
|
engine.setProperty("volume", 1.0)
|
|
|
|
for count in counts:
|
|
audio_file = _audio_path(output_dir, count, suffix=".wav")
|
|
engine.save_to_file(str(count), str(audio_file))
|
|
|
|
engine.runAndWait()
|
|
|
|
|
|
def _audio_path(output_dir: Path, count: int, *, suffix: str) -> Path:
|
|
return output_dir / f"{count}{suffix}"
|
|
|
|
|
|
def _trim_leading_silence_files(
|
|
*,
|
|
counts: list[int],
|
|
output_dir: Path,
|
|
suffix: str,
|
|
threshold: int,
|
|
padding_ms: int,
|
|
) -> None:
|
|
trimmed = 0
|
|
total_removed_ms = 0.0
|
|
|
|
for count in counts:
|
|
audio_file = _audio_path(output_dir, count, suffix=suffix)
|
|
if not audio_file.exists():
|
|
continue
|
|
removed_ms = _trim_leading_silence(audio_file, threshold=threshold, padding_ms=padding_ms)
|
|
if removed_ms > 0:
|
|
trimmed += 1
|
|
total_removed_ms += removed_ms
|
|
|
|
logger.info(
|
|
"Rep audio leading silence trim complete: files_trimmed={}, total_removed_ms={:.1f}, threshold={}, padding_ms={}",
|
|
trimmed,
|
|
total_removed_ms,
|
|
threshold,
|
|
padding_ms,
|
|
)
|
|
|
|
|
|
def _trim_leading_silence(audio_file: Path, *, threshold: int, padding_ms: int) -> float:
|
|
with wave.open(str(audio_file), "rb") as reader:
|
|
params = reader.getparams()
|
|
frames = reader.readframes(params.nframes)
|
|
|
|
frame_size = params.sampwidth * params.nchannels
|
|
if params.nframes <= 0 or frame_size <= 0:
|
|
return 0.0
|
|
|
|
chunk_frames = max(1, params.framerate // 100)
|
|
leading_frames = 0
|
|
offset = 0
|
|
chunk_size = chunk_frames * frame_size
|
|
|
|
while offset < len(frames):
|
|
chunk = frames[offset : offset + chunk_size]
|
|
if _pcm_rms(chunk, params.sampwidth) > threshold:
|
|
break
|
|
chunk_frame_count = len(chunk) // frame_size
|
|
leading_frames += chunk_frame_count
|
|
offset += chunk_size
|
|
|
|
padding_frames = int(params.framerate * max(0, padding_ms) / 1000)
|
|
remove_frames = max(0, leading_frames - padding_frames)
|
|
if remove_frames <= 0:
|
|
return 0.0
|
|
|
|
start = min(len(frames), remove_frames * frame_size)
|
|
trimmed_frames = frames[start:]
|
|
if not trimmed_frames:
|
|
return 0.0
|
|
|
|
with wave.open(str(audio_file), "wb") as writer:
|
|
writer.setparams(params)
|
|
writer.writeframes(trimmed_frames)
|
|
|
|
return remove_frames / params.framerate * 1000
|
|
|
|
|
|
def _pcm_rms(chunk: bytes, sample_width: int) -> float:
|
|
if not chunk:
|
|
return 0.0
|
|
|
|
if sample_width == 2:
|
|
sample_count = len(chunk) // 2
|
|
if sample_count == 0:
|
|
return 0.0
|
|
total = 0
|
|
for i in range(0, sample_count * 2, 2):
|
|
sample = int.from_bytes(chunk[i : i + 2], "little", signed=True)
|
|
total += sample * sample
|
|
return (total / sample_count) ** 0.5
|
|
|
|
peak = max(abs(byte - 128) for byte in chunk)
|
|
return float(peak)
|