Diggz10 commited on
Commit
4d2155b
·
verified ·
1 Parent(s): d74e228

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +180 -23
app.py CHANGED
@@ -1,8 +1,10 @@
1
- # app.py — Voice Clarity Booster with Presets, Dual-Stage (smart CPU/GPU guard),
2
- # A/B alternating, Delta (Original−Enhanced), Loudness Match, and safe fallbacks.
3
  #
4
- # Key change: On CPU or for long clips, SepFormer/Dual-Stage auto-falls back to MetricGAN+
5
- # instead of hanging. Metrics show the fallback reason.
 
 
6
 
7
  import os
8
  import tempfile
@@ -50,9 +52,9 @@ except Exception:
50
  # -----------------------------
51
  USE_GPU = torch.cuda.is_available()
52
  # On CPU, SepFormer is extremely slow; avoid for long clips (or disable).
53
- MAX_SEPFORMER_SEC_CPU = float(os.getenv("MAX_SEPFORMER_SEC_CPU", 12)) # hard limit for CPU
54
- MAX_SEPFORMER_SEC_GPU = float(os.getenv("MAX_SEPFORMER_SEC_GPU", 180)) # just in case
55
- ALLOW_SEPFORMER_CPU = os.getenv("ALLOW_SEPFORMER_CPU", "0") == "1" # override at your risk
56
 
57
  _DEVICE = "cuda" if USE_GPU else "cpu"
58
  _ENHANCER_METRICGAN: Optional[SpectralMaskEnhancement] = None
@@ -117,6 +119,12 @@ def _highpass(wav: torch.Tensor, sr: int, cutoff_hz: float) -> torch.Tensor:
117
  return torchaudio.functional.highpass_biquad(wav, sr, cutoff_hz)
118
 
119
 
 
 
 
 
 
 
120
  def _presence_boost(wav: torch.Tensor, sr: int, gain_db: float) -> torch.Tensor:
121
  if abs(gain_db) < 1e-6:
122
  return wav
@@ -138,6 +146,20 @@ def _align_lengths(a: np.ndarray, b: np.ndarray) -> Tuple[np.ndarray, np.ndarray
138
  return a[:n], b[:n]
139
 
140
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
  def _loudness_match_to_ref(ref: np.ndarray, cand: np.ndarray, sr: int) -> Tuple[np.ndarray, str]:
142
  """Match cand loudness to ref (LUFS if available, else RMS)."""
143
  if len(ref) < sr // 10 or len(cand) < sr // 10:
@@ -180,6 +202,59 @@ def _make_ab_alternating(orig: np.ndarray, enh: np.ndarray, sr: int, seg_sec: fl
180
  return np.concatenate(out, axis=0).astype(np.float32)
181
 
182
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
183
  # -----------------------------
184
  # Model runners (with guards)
185
  # -----------------------------
@@ -250,6 +325,50 @@ def _run_dual_stage(path_16k: str, dur_sec: float) -> Tuple[Optional[torch.Tenso
250
  pass
251
 
252
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
253
  # -----------------------------
254
  # Core pipeline
255
  # -----------------------------
@@ -261,9 +380,9 @@ def _enhance_numpy_audio(
261
  lowcut_hz: float = 0.0,
262
  out_sr: Optional[int] = None,
263
  loudness_match: bool = True,
264
- ) -> Tuple[int, np.ndarray, np.ndarray, str]:
265
  """
266
- Returns: (sr_out, enhanced, delta, metrics_text)
267
  """
268
  sr_in, wav_np = audio
269
  wav_mono = _sanitize(_to_mono(wav_np))
@@ -271,7 +390,7 @@ def _enhance_numpy_audio(
271
  if wav_mono.size < 32:
272
  sr_out = sr_in if sr_in else 16000
273
  silence = np.zeros(int(sr_out * 1.0), dtype=np.float32)
274
- return sr_out, silence, silence, "Input too short; returned silence."
275
 
276
  dry_t = torch.from_numpy(wav_mono).unsqueeze(0) # [1, T @ sr_in]
277
  wav_16k = _resample_torch(dry_t, sr_in, 16000)
@@ -332,12 +451,9 @@ def _enhance_numpy_audio(
332
 
333
  enhanced = _sanitize(enhanced)
334
 
335
- # Delta
336
- delta = _sanitize(dry_out - enhanced)
337
-
338
  # Metrics
339
  eps = 1e-9
340
- rms_delta = np.sqrt(np.mean(delta**2) + eps)
341
  metrics = (
342
  f"Mode: {mode} | Dry/Wet: {dry_wet*100:.0f}% | Presence: {presence_db:+.1f} dB | "
343
  f"Low-cut: {lowcut_hz:.0f} Hz | Loudness match: {loud_text} | Device: {'GPU' if USE_GPU else 'CPU'} | "
@@ -345,9 +461,9 @@ def _enhance_numpy_audio(
345
  )
346
  if fallback_note:
347
  metrics += f"\n{fallback_note}"
348
- metrics += f"\nΔ RMS: {20*np.log10(rms_delta+eps):+.2f} dBFS"
349
 
350
- return sr_out, enhanced, delta, metrics
351
 
352
 
353
  # -----------------------------
@@ -433,13 +549,18 @@ def gradio_enhance(
433
  lowcut_hz: float,
434
  output_sr: str,
435
  loudness_match: bool,
 
 
 
436
  ):
437
  if audio is None:
438
  return None, None, None, "No audio provided."
439
  out_sr = None
440
  if output_sr in {"44100", "48000"}:
441
  out_sr = int(output_sr)
442
- sr_out, enhanced, delta, metrics = _enhance_numpy_audio(
 
 
443
  audio,
444
  mode=mode,
445
  dry_wet=dry_wet_pct / 100.0,
@@ -448,18 +569,44 @@ def gradio_enhance(
448
  out_sr=out_sr,
449
  loudness_match=bool(loudness_match),
450
  )
451
- # Build A/B alternating track
 
452
  sr_in, wav_np = audio
453
  orig_mono = _sanitize(_to_mono(wav_np))
454
  orig_at_out = _resample_torch(torch.from_numpy(orig_mono).unsqueeze(0), sr_in, sr_out).squeeze(0).numpy().astype(np.float32)
455
- orig_at_out, enhanced = _align_lengths(orig_at_out, enhanced)
456
- ab_alt = _make_ab_alternating(orig_at_out, enhanced, sr_out, seg_sec=2.0)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
457
  return (sr_out, enhanced), (sr_out, ab_alt), (sr_out, delta), metrics
458
 
459
 
460
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
461
  gr.Markdown(
462
- f"## Voice Clarity Booster — Presets, A/B, Delta, Loudness Match \n"
463
  f"**Device:** {'GPU' if USE_GPU else 'CPU'} · "
464
  f"SepFormer limits — CPU≤{MAX_SEPFORMER_SEC_CPU:.0f}s, GPU≤{MAX_SEPFORMER_SEC_GPU:.0f}s"
465
  + ("" if USE_GPU or ALLOW_SEPFORMER_CPU else " · (SepFormer disabled on CPU)")
@@ -505,6 +652,16 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
505
  label="Output Sample Rate",
506
  )
507
 
 
 
 
 
 
 
 
 
 
 
508
  preset.change(
509
  _apply_preset,
510
  inputs=[preset],
@@ -516,12 +673,12 @@ with gr.Blocks(theme=gr.themes.Soft()) as demo:
516
  with gr.Column(scale=1):
517
  out_audio = gr.Audio(type="numpy", label="Enhanced (autoplay)", autoplay=True)
518
  ab_audio = gr.Audio(type="numpy", label="A/B Alternating (2s O → 2s E)")
519
- delta_audio = gr.Audio(type="numpy", label="Delta: Original − Enhanced")
520
  metrics = gr.Markdown("")
521
 
522
  btn.click(
523
  gradio_enhance,
524
- inputs=[in_audio, mode, dry_wet, presence, lowcut, out_sr, loudmatch],
525
  outputs=[out_audio, ab_audio, delta_audio, metrics],
526
  )
527
 
 
1
+ # app.py — Voice Clarity Booster with Presets, CPU/GPU-smart Dual-Stage,
2
+ # A/B alternating, Loudness Match, and a *polished Delta* (noise-only) option.
3
  #
4
+ # New:
5
+ # - Delta Mode: Raw Difference | Spectral Residual (noise-only)
6
+ # - Delta Alignment (cross-correlation) to reduce phase/latency smear
7
+ # - Delta Gain (dB) + HPF/LPF + RMS leveling for listenable delta
8
 
9
  import os
10
  import tempfile
 
52
  # -----------------------------
53
  USE_GPU = torch.cuda.is_available()
54
  # On CPU, SepFormer is extremely slow; avoid for long clips (or disable).
55
+ MAX_SEPFORMER_SEC_CPU = float(os.getenv("MAX_SEPFORMER_SEC_CPU", 12))
56
+ MAX_SEPFORMER_SEC_GPU = float(os.getenv("MAX_SEPFORMER_SEC_GPU", 180))
57
+ ALLOW_SEPFORMER_CPU = os.getenv("ALLOW_SEPFORMER_CPU", "0") == "1"
58
 
59
  _DEVICE = "cuda" if USE_GPU else "cpu"
60
  _ENHANCER_METRICGAN: Optional[SpectralMaskEnhancement] = None
 
119
  return torchaudio.functional.highpass_biquad(wav, sr, cutoff_hz)
120
 
121
 
122
+ def _lowpass(wav: torch.Tensor, sr: int, cutoff_hz: float) -> torch.Tensor:
123
+ if cutoff_hz is None or cutoff_hz <= 0:
124
+ return wav
125
+ return torchaudio.functional.lowpass_biquad(wav, sr, cutoff_hz)
126
+
127
+
128
  def _presence_boost(wav: torch.Tensor, sr: int, gain_db: float) -> torch.Tensor:
129
  if abs(gain_db) < 1e-6:
130
  return wav
 
146
  return a[:n], b[:n]
147
 
148
 
149
+ def _rms(x: np.ndarray, eps: float = 1e-9) -> float:
150
+ return float(np.sqrt(np.mean(x**2) + eps))
151
+
152
+
153
+ def _rms_target(x: np.ndarray, target_dbfs: float = -20.0) -> np.ndarray:
154
+ """Scale to approx target dBFS RMS, then hard-limit peaks."""
155
+ target_amp = 10.0 ** (target_dbfs / 20.0)
156
+ cur = _rms(x)
157
+ if cur > 0:
158
+ x = x * (target_amp / cur)
159
+ x = np.clip(x, -1.0, 1.0).astype(np.float32)
160
+ return x
161
+
162
+
163
  def _loudness_match_to_ref(ref: np.ndarray, cand: np.ndarray, sr: int) -> Tuple[np.ndarray, str]:
164
  """Match cand loudness to ref (LUFS if available, else RMS)."""
165
  if len(ref) < sr // 10 or len(cand) < sr // 10:
 
202
  return np.concatenate(out, axis=0).astype(np.float32)
203
 
204
 
205
+ # -----------------------------
206
+ # Alignment for delta (cross-correlation)
207
+ # -----------------------------
208
+ def _next_pow_two(n: int) -> int:
209
+ n -= 1
210
+ shift = 1
211
+ while (n + 1) & n:
212
+ n |= n >> shift
213
+ shift <<= 1
214
+ return n + 1
215
+
216
+
217
+ def _align_by_xcorr(a: np.ndarray, b: np.ndarray, max_shift: int) -> Tuple[np.ndarray, np.ndarray, int]:
218
+ """
219
+ Align b to a using FFT cross-correlation. Only accept shifts within ±max_shift.
220
+ Returns (a_aligned, b_aligned, shift) where positive shift means b lags a and is shifted forward.
221
+ """
222
+ # Pad to same length
223
+ n = max(len(a), len(b))
224
+ a_pad = np.zeros(n, dtype=np.float32); a_pad[:len(a)] = a
225
+ b_pad = np.zeros(n, dtype=np.float32); b_pad[:len(b)] = b
226
+
227
+ N = _next_pow_two(2 * n - 1)
228
+ A = np.fft.rfft(a_pad, N)
229
+ B = np.fft.rfft(b_pad, N)
230
+ corr = np.fft.irfft(A * np.conj(B), N)
231
+ # lags: 0..N-1, convert so center at zero lag
232
+ corr = np.concatenate((corr[-(n-1):], corr[:n]))
233
+ lags = np.arange(-(n-1), n)
234
+
235
+ # Limit to window
236
+ w = (lags >= -max_shift) & (lags <= max_shift)
237
+ lag = int(lags[w][np.argmax(corr[w])])
238
+
239
+ if lag > 0:
240
+ # b lags behind a -> shift b forward
241
+ b_shift = np.concatenate((b[lag:], np.zeros(lag, dtype=np.float32)))
242
+ a_shift = a[:len(b_shift)]
243
+ b_shift = b_shift[:len(a_shift)]
244
+ return a_shift, b_shift, lag
245
+ elif lag < 0:
246
+ # a lags -> shift a forward
247
+ lag = -lag
248
+ a_shift = np.concatenate((a[lag:], np.zeros(lag, dtype=np.float32)))
249
+ b_shift = b[:len(a_shift)]
250
+ a_shift = a_shift[:len(b_shift)]
251
+ return a_shift, b_shift, -lag
252
+ else:
253
+ # no shift
254
+ a2, b2 = _align_lengths(a, b)
255
+ return a2, b2, 0
256
+
257
+
258
  # -----------------------------
259
  # Model runners (with guards)
260
  # -----------------------------
 
325
  pass
326
 
327
 
328
+ # -----------------------------
329
+ # Spectral residual delta (cleaner noise-only preview)
330
+ # -----------------------------
331
+ def _delta_spectral_residual(orig: np.ndarray, enh: np.ndarray, sr: int) -> np.ndarray:
332
+ """
333
+ Build a noise-focused residual via STFT magnitudes:
334
+ R_mag = ReLU(|X| - |Y|)
335
+ use original phase for iSTFT reconstruction
336
+ Then gentle HPF/LPF and RMS leveling for listenability.
337
+ """
338
+ # Torch tensors
339
+ x = torch.from_numpy(orig).to(torch.float32)
340
+ y = torch.from_numpy(enh).to(torch.float32)
341
+
342
+ n_fft = 1024
343
+ hop = 256
344
+ win = torch.hann_window(n_fft)
345
+
346
+ # STFTs
347
+ X = torch.stft(x, n_fft=n_fft, hop_length=hop, window=win, return_complex=True, center=True)
348
+ Y = torch.stft(y, n_fft=n_fft, hop_length=hop, window=win, return_complex=True, center=True)
349
+
350
+ # Positive residual magnitudes
351
+ R_mag = torch.relu(torch.abs(X) - torch.abs(Y))
352
+
353
+ # Mild temporal smoothing (moving average across time)
354
+ R_mag = torch.nn.functional.avg_pool1d(
355
+ R_mag.unsqueeze(0), kernel_size=3, stride=1, padding=1
356
+ ).squeeze(0)
357
+
358
+ # Reconstruct residual with original phase
359
+ phase = torch.angle(X)
360
+ R_complex = torch.polar(R_mag, phase)
361
+ r = torch.istft(R_complex, n_fft=n_fft, hop_length=hop, window=win, length=len(orig))
362
+
363
+ # HPF/LPF + light RMS leveling for comfort
364
+ r_t = r.unsqueeze(0)
365
+ r_t = _highpass(r_t, sr, cutoff_hz=80.0)
366
+ r_t = _lowpass(r_t, sr, cutoff_hz=9000.0)
367
+ r_np = r_t.squeeze(0).numpy().astype(np.float32)
368
+ r_np = _rms_target(r_np, target_dbfs=-24.0)
369
+ return r_np
370
+
371
+
372
  # -----------------------------
373
  # Core pipeline
374
  # -----------------------------
 
380
  lowcut_hz: float = 0.0,
381
  out_sr: Optional[int] = None,
382
  loudness_match: bool = True,
383
+ ) -> Tuple[int, np.ndarray, str]:
384
  """
385
+ Returns: (sr_out, enhanced, metrics_text)
386
  """
387
  sr_in, wav_np = audio
388
  wav_mono = _sanitize(_to_mono(wav_np))
 
390
  if wav_mono.size < 32:
391
  sr_out = sr_in if sr_in else 16000
392
  silence = np.zeros(int(sr_out * 1.0), dtype=np.float32)
393
+ return sr_out, silence, "Input too short; returned silence."
394
 
395
  dry_t = torch.from_numpy(wav_mono).unsqueeze(0) # [1, T @ sr_in]
396
  wav_16k = _resample_torch(dry_t, sr_in, 16000)
 
451
 
452
  enhanced = _sanitize(enhanced)
453
 
 
 
 
454
  # Metrics
455
  eps = 1e-9
456
+ rms_delta_hint = np.sqrt(np.mean((dry_out - enhanced)**2) + eps)
457
  metrics = (
458
  f"Mode: {mode} | Dry/Wet: {dry_wet*100:.0f}% | Presence: {presence_db:+.1f} dB | "
459
  f"Low-cut: {lowcut_hz:.0f} Hz | Loudness match: {loud_text} | Device: {'GPU' if USE_GPU else 'CPU'} | "
 
461
  )
462
  if fallback_note:
463
  metrics += f"\n{fallback_note}"
464
+ metrics += f"\nΔ (raw) RMS: {20*np.log10(rms_delta_hint+eps):+.2f} dBFS"
465
 
466
+ return sr_out, enhanced, metrics
467
 
468
 
469
  # -----------------------------
 
549
  lowcut_hz: float,
550
  output_sr: str,
551
  loudness_match: bool,
552
+ delta_mode: str,
553
+ delta_align: bool,
554
+ delta_gain_db: float,
555
  ):
556
  if audio is None:
557
  return None, None, None, "No audio provided."
558
  out_sr = None
559
  if output_sr in {"44100", "48000"}:
560
  out_sr = int(output_sr)
561
+
562
+ # Enhance
563
+ sr_out, enhanced, metrics = _enhance_numpy_audio(
564
  audio,
565
  mode=mode,
566
  dry_wet=dry_wet_pct / 100.0,
 
569
  out_sr=out_sr,
570
  loudness_match=bool(loudness_match),
571
  )
572
+
573
+ # Build A/B and Delta (polished)
574
  sr_in, wav_np = audio
575
  orig_mono = _sanitize(_to_mono(wav_np))
576
  orig_at_out = _resample_torch(torch.from_numpy(orig_mono).unsqueeze(0), sr_in, sr_out).squeeze(0).numpy().astype(np.float32)
577
+
578
+ # Optional alignment to reduce phase/latency offsets
579
+ a_for_ab, b_for_ab = _align_lengths(orig_at_out, enhanced)
580
+ if delta_align:
581
+ max_shift = int(0.05 * sr_out) # up to 50 ms
582
+ a_for_ab, b_for_ab, lag = _align_by_xcorr(a_for_ab, b_for_ab, max_shift=max_shift)
583
+ metrics += f"\nDelta alignment: shift={lag} samples"
584
+
585
+ # A/B alternating
586
+ ab_alt = _make_ab_alternating(a_for_ab, b_for_ab, sr_out, seg_sec=2.0)
587
+
588
+ # Delta (noise-focused if selected)
589
+ if delta_mode.startswith("Spectral"):
590
+ delta = _delta_spectral_residual(a_for_ab, b_for_ab, sr_out)
591
+ else:
592
+ delta = a_for_ab - b_for_ab
593
+ # Gentle polish on raw difference
594
+ d_t = torch.from_numpy(delta).unsqueeze(0)
595
+ d_t = _highpass(d_t, sr_out, cutoff_hz=80.0)
596
+ d_t = _lowpass(d_t, sr_out, cutoff_hz=9000.0)
597
+ delta = d_t.squeeze(0).numpy().astype(np.float32)
598
+ delta = _rms_target(delta, target_dbfs=-24.0)
599
+
600
+ # Apply user delta gain
601
+ delta *= 10.0 ** (delta_gain_db / 20.0)
602
+ delta = np.clip(delta, -1.0, 1.0).astype(np.float32)
603
+
604
  return (sr_out, enhanced), (sr_out, ab_alt), (sr_out, delta), metrics
605
 
606
 
607
  with gr.Blocks(theme=gr.themes.Soft()) as demo:
608
  gr.Markdown(
609
+ f"## Voice Clarity Booster — Presets, A/B, *Polished Delta*, Loudness Match \n"
610
  f"**Device:** {'GPU' if USE_GPU else 'CPU'} · "
611
  f"SepFormer limits — CPU≤{MAX_SEPFORMER_SEC_CPU:.0f}s, GPU≤{MAX_SEPFORMER_SEC_GPU:.0f}s"
612
  + ("" if USE_GPU or ALLOW_SEPFORMER_CPU else " · (SepFormer disabled on CPU)")
 
652
  label="Output Sample Rate",
653
  )
654
 
655
+ # Delta controls
656
+ gr.Markdown("### Delta (what changed)")
657
+ delta_mode = gr.Dropdown(
658
+ choices=["Spectral Residual (noise-only)", "Raw Difference"],
659
+ value="Spectral Residual (noise-only)",
660
+ label="Delta Mode",
661
+ )
662
+ delta_align = gr.Checkbox(value=True, label="Align original & enhanced for delta (recommended)")
663
+ delta_gain = gr.Slider(minimum=-12, maximum=24, value=6, step=1, label="Delta Gain (dB)")
664
+
665
  preset.change(
666
  _apply_preset,
667
  inputs=[preset],
 
673
  with gr.Column(scale=1):
674
  out_audio = gr.Audio(type="numpy", label="Enhanced (autoplay)", autoplay=True)
675
  ab_audio = gr.Audio(type="numpy", label="A/B Alternating (2s O → 2s E)")
676
+ delta_audio = gr.Audio(type="numpy", label="Delta (polished)")
677
  metrics = gr.Markdown("")
678
 
679
  btn.click(
680
  gradio_enhance,
681
+ inputs=[in_audio, mode, dry_wet, presence, lowcut, out_sr, loudmatch, delta_mode, delta_align, delta_gain],
682
  outputs=[out_audio, ab_audio, delta_audio, metrics],
683
  )
684