505 lines
16 KiB
Python
505 lines
16 KiB
Python
|
|
# -*- coding: utf-8 -*-
|
|||
|
|
"""
|
|||
|
|
AudioRecorder – Aufnahme direkt als M4A (AAC via ffmpeg-Pipe).
|
|||
|
|
Kein WAV-Zwischenschritt. Fallback auf WAV nur wenn ffmpeg fehlt.
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import os
|
|||
|
|
import shutil
|
|||
|
|
import subprocess
|
|||
|
|
import tempfile
|
|||
|
|
import wave
|
|||
|
|
from datetime import datetime
|
|||
|
|
from typing import List, Optional
|
|||
|
|
|
|||
|
|
import numpy as np
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
import sounddevice as sd
|
|||
|
|
except Exception:
|
|||
|
|
sd = None
|
|||
|
|
|
|||
|
|
CHUNK_MAX_SECONDS = 600
|
|||
|
|
|
|||
|
|
_AUDIO_BACKUP_SUBDIR = "Audio_Backup"
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_audio_backup_dir() -> str:
|
|||
|
|
"""Gibt den sicheren Backup-Ordner für Audio zurück und erstellt ihn bei Bedarf."""
|
|||
|
|
docs = os.path.join(os.path.expanduser("~"), "Documents")
|
|||
|
|
if not os.path.isdir(docs):
|
|||
|
|
docs = os.path.expanduser("~")
|
|||
|
|
backup_dir = os.path.join(docs, "KG_Diktat_Ablage", _AUDIO_BACKUP_SUBDIR)
|
|||
|
|
os.makedirs(backup_dir, exist_ok=True)
|
|||
|
|
return backup_dir
|
|||
|
|
|
|||
|
|
|
|||
|
|
def persist_audio_safe(temp_path: str) -> str:
|
|||
|
|
"""Kopiert Audio in den sicheren Backup-Ordner. Gibt neuen Pfad zurück."""
|
|||
|
|
backup_dir = get_audio_backup_dir()
|
|||
|
|
ext = os.path.splitext(temp_path)[1] or ".m4a"
|
|||
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|||
|
|
safe_name = f"aufnahme_{ts}{ext}"
|
|||
|
|
safe_path = os.path.join(backup_dir, safe_name)
|
|||
|
|
shutil.copy2(temp_path, safe_path)
|
|||
|
|
return safe_path
|
|||
|
|
|
|||
|
|
|
|||
|
|
def cleanup_old_audio_backups(max_age_days: int = 30):
|
|||
|
|
"""Löscht Audio-Backups älter als max_age_days (nur erfolgreich transkribierte)."""
|
|||
|
|
backup_dir = get_audio_backup_dir()
|
|||
|
|
cutoff = datetime.now().timestamp() - max_age_days * 86400
|
|||
|
|
try:
|
|||
|
|
for f in os.listdir(backup_dir):
|
|||
|
|
fp = os.path.join(backup_dir, f)
|
|||
|
|
if os.path.isfile(fp) and os.path.getmtime(fp) < cutoff:
|
|||
|
|
try:
|
|||
|
|
os.remove(fp)
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
|
|||
|
|
_NO_WINDOW = getattr(subprocess, "CREATE_NO_WINDOW", 0)
|
|||
|
|
|
|||
|
|
_WINDOWS_SOUND_SETTINGS = "Einstellungen > System > Sound > Eingabe"
|
|||
|
|
|
|||
|
|
_mic_check_cache: dict = {}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _fail(msg: str, dev_name=None, dev_index=None) -> dict:
|
|||
|
|
return {"ok": False, "device_name": dev_name, "device_index": dev_index, "message": msg}
|
|||
|
|
|
|||
|
|
|
|||
|
|
def check_microphone(force: bool = False) -> dict:
|
|||
|
|
"""Prüft ob ein brauchbares Mikrofon verfügbar ist.
|
|||
|
|
|
|||
|
|
Returns dict:
|
|||
|
|
ok (bool), device_name (str|None), device_index (int|None),
|
|||
|
|
message (str – deutsch, benutzerfreundlich)
|
|||
|
|
"""
|
|||
|
|
if not force and _mic_check_cache.get("result"):
|
|||
|
|
return _mic_check_cache["result"]
|
|||
|
|
|
|||
|
|
def _cache(r):
|
|||
|
|
_mic_check_cache["result"] = r
|
|||
|
|
return r
|
|||
|
|
|
|||
|
|
if sd is None:
|
|||
|
|
return _cache(_fail(
|
|||
|
|
"Audio-Modul nicht verfügbar.\n\n"
|
|||
|
|
"Das Paket 'sounddevice' konnte nicht geladen werden.\n"
|
|||
|
|
"Aufnahme und Diktat sind nicht möglich."
|
|||
|
|
))
|
|||
|
|
|
|||
|
|
# --- Schritt 1: Default-Input-Device abfragen ---
|
|||
|
|
dev_index = None
|
|||
|
|
dev_name = None
|
|||
|
|
try:
|
|||
|
|
info = sd.query_devices(kind="input")
|
|||
|
|
dev_name = info["name"]
|
|||
|
|
dev_index = sd.default.device[0]
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
# --- Schritt 2: Fallback – alle Geräte durchsuchen ---
|
|||
|
|
if dev_name is None:
|
|||
|
|
try:
|
|||
|
|
all_devs = sd.query_devices()
|
|||
|
|
for i, d in enumerate(all_devs):
|
|||
|
|
try:
|
|||
|
|
if d["max_input_channels"] > 0:
|
|||
|
|
dev_name = d["name"]
|
|||
|
|
dev_index = i
|
|||
|
|
break
|
|||
|
|
except (KeyError, TypeError, IndexError):
|
|||
|
|
continue
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
if dev_name is None:
|
|||
|
|
return _cache(_fail(
|
|||
|
|
"Kein Mikrofon gefunden.\n\n"
|
|||
|
|
"Bitte schliessen Sie ein Mikrofon an oder\n"
|
|||
|
|
"aktivieren Sie es in den Windows-Einstellungen:\n\n"
|
|||
|
|
f" {_WINDOWS_SOUND_SETTINGS}"
|
|||
|
|
))
|
|||
|
|
|
|||
|
|
# --- Schritt 3: Kanäle prüfen ---
|
|||
|
|
try:
|
|||
|
|
info = sd.query_devices(dev_index) if dev_index is not None else sd.query_devices(kind="input")
|
|||
|
|
max_ch = info["max_input_channels"]
|
|||
|
|
except Exception:
|
|||
|
|
max_ch = 0
|
|||
|
|
|
|||
|
|
if max_ch < 1:
|
|||
|
|
return _cache(_fail(
|
|||
|
|
f"Gerät '{dev_name}' hat keine Eingangskanäle.\n\n"
|
|||
|
|
"Bitte ein anderes Mikrofon auswählen:\n\n"
|
|||
|
|
f" {_WINDOWS_SOUND_SETTINGS}",
|
|||
|
|
dev_name, dev_index,
|
|||
|
|
))
|
|||
|
|
|
|||
|
|
# --- Schritt 4: Kurzer Öffnungstest ---
|
|||
|
|
try:
|
|||
|
|
test_stream = sd.InputStream(
|
|||
|
|
device=dev_index,
|
|||
|
|
samplerate=16000,
|
|||
|
|
channels=1,
|
|||
|
|
dtype="float32",
|
|||
|
|
blocksize=1024,
|
|||
|
|
)
|
|||
|
|
test_stream.close()
|
|||
|
|
except Exception as e:
|
|||
|
|
err = str(e)
|
|||
|
|
return _cache(_fail(
|
|||
|
|
f"Mikrofon '{dev_name}' konnte nicht geöffnet werden.\n\n"
|
|||
|
|
"Mögliche Ursachen:\n"
|
|||
|
|
" - Mikrofon ist von einer anderen App belegt\n"
|
|||
|
|
" - Zugriff in Windows-Datenschutz blockiert\n"
|
|||
|
|
" - Gerät ist deaktiviert oder getrennt\n\n"
|
|||
|
|
f"Windows-Einstellungen:\n {_WINDOWS_SOUND_SETTINGS}\n\n"
|
|||
|
|
f"(Technisch: {err[:120]})",
|
|||
|
|
dev_name, dev_index,
|
|||
|
|
))
|
|||
|
|
|
|||
|
|
result = {
|
|||
|
|
"ok": True,
|
|||
|
|
"device_name": dev_name,
|
|||
|
|
"device_index": dev_index,
|
|||
|
|
"message": f"Mikrofon bereit: {dev_name}",
|
|||
|
|
}
|
|||
|
|
return _cache(result)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def invalidate_mic_cache():
|
|||
|
|
"""Setzt den Mikrofon-Cache zurück (z.B. nach Gerätewechsel)."""
|
|||
|
|
_mic_check_cache.clear()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _find_ffmpeg() -> Optional[str]:
|
|||
|
|
path = shutil.which("ffmpeg")
|
|||
|
|
if path:
|
|||
|
|
return path
|
|||
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|||
|
|
for candidate in (
|
|||
|
|
os.path.join(script_dir, "ffmpeg.exe"),
|
|||
|
|
os.path.join(script_dir, "_internal", "ffmpeg.exe"),
|
|||
|
|
):
|
|||
|
|
if os.path.isfile(candidate):
|
|||
|
|
return candidate
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
|
|||
|
|
class AudioRecorder:
|
|||
|
|
"""Nimmt Audio auf und streamt es direkt in ffmpeg (M4A/AAC).
|
|||
|
|
|
|||
|
|
Wenn ffmpeg verfuegbar: Audio wird waehrend der Aufnahme in Echtzeit
|
|||
|
|
als M4A kodiert – kein WAV-Zwischenschritt, sofort kleine Datei.
|
|||
|
|
Wenn ffmpeg fehlt: Fallback auf WAV (16kHz mono 16-bit PCM).
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self, samplerate=16000, channels=1):
|
|||
|
|
self.samplerate = samplerate
|
|||
|
|
self.channels = channels
|
|||
|
|
self._stream = None
|
|||
|
|
self._ffmpeg_proc: Optional[subprocess.Popen] = None
|
|||
|
|
self._output_path: Optional[str] = None
|
|||
|
|
self._recording = False
|
|||
|
|
self._wav_fallback = False
|
|||
|
|
self._frames: list = []
|
|||
|
|
|
|||
|
|
def start(self):
|
|||
|
|
mic = check_microphone()
|
|||
|
|
if not mic["ok"]:
|
|||
|
|
raise RuntimeError(mic["message"])
|
|||
|
|
|
|||
|
|
self._recording = True
|
|||
|
|
self._wav_fallback = False
|
|||
|
|
self._frames = []
|
|||
|
|
self._ffmpeg_proc = None
|
|||
|
|
self._device_index = mic.get("device_index")
|
|||
|
|
|
|||
|
|
ffmpeg = _find_ffmpeg()
|
|||
|
|
if ffmpeg:
|
|||
|
|
fd, self._output_path = tempfile.mkstemp(suffix=".m4a", prefix="kg_rec_")
|
|||
|
|
os.close(fd)
|
|||
|
|
try:
|
|||
|
|
self._ffmpeg_proc = subprocess.Popen(
|
|||
|
|
[ffmpeg, "-y",
|
|||
|
|
"-f", "s16le", "-ar", str(self.samplerate),
|
|||
|
|
"-ac", str(self.channels), "-i", "pipe:0",
|
|||
|
|
"-c:a", "aac", "-b:a", "64k",
|
|||
|
|
"-movflags", "+faststart",
|
|||
|
|
self._output_path],
|
|||
|
|
stdin=subprocess.PIPE,
|
|||
|
|
stdout=subprocess.DEVNULL,
|
|||
|
|
stderr=subprocess.DEVNULL,
|
|||
|
|
creationflags=_NO_WINDOW,
|
|||
|
|
)
|
|||
|
|
except Exception:
|
|||
|
|
self._ffmpeg_proc = None
|
|||
|
|
self._wav_fallback = True
|
|||
|
|
self._output_path = None
|
|||
|
|
else:
|
|||
|
|
self._wav_fallback = True
|
|||
|
|
|
|||
|
|
def callback(indata, frames, time_info, status):
|
|||
|
|
if not self._recording:
|
|||
|
|
return
|
|||
|
|
pcm = (np.clip(indata, -1.0, 1.0) * 32767.0).astype(np.int16)
|
|||
|
|
if self._ffmpeg_proc and self._ffmpeg_proc.stdin:
|
|||
|
|
try:
|
|||
|
|
self._ffmpeg_proc.stdin.write(pcm.tobytes())
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
else:
|
|||
|
|
self._frames.append(indata.copy())
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
self._stream = sd.InputStream(
|
|||
|
|
device=self._device_index,
|
|||
|
|
samplerate=self.samplerate,
|
|||
|
|
channels=self.channels,
|
|||
|
|
callback=callback,
|
|||
|
|
dtype="float32",
|
|||
|
|
blocksize=0,
|
|||
|
|
)
|
|||
|
|
self._stream.start()
|
|||
|
|
except Exception as e:
|
|||
|
|
invalidate_mic_cache()
|
|||
|
|
err = str(e)
|
|||
|
|
if "device" in err.lower() or "portaudio" in err.lower() or "-1" in err:
|
|||
|
|
raise RuntimeError(
|
|||
|
|
"Mikrofon konnte nicht geöffnet werden.\n\n"
|
|||
|
|
"Bitte prüfen Sie:\n"
|
|||
|
|
" - Ist ein Mikrofon angeschlossen?\n"
|
|||
|
|
" - Ist es in Windows aktiviert?\n\n"
|
|||
|
|
f"Windows: {_WINDOWS_SOUND_SETTINGS}\n\n"
|
|||
|
|
f"(Technisch: {err[:120]})"
|
|||
|
|
) from None
|
|||
|
|
raise
|
|||
|
|
|
|||
|
|
def stop_and_save(self) -> str:
|
|||
|
|
"""Stoppt Aufnahme, gibt Pfad zur fertigen Audiodatei zurueck."""
|
|||
|
|
if not self._stream:
|
|||
|
|
raise RuntimeError("Recorder wurde nicht gestartet.")
|
|||
|
|
|
|||
|
|
self._recording = False
|
|||
|
|
self._stream.stop()
|
|||
|
|
self._stream.close()
|
|||
|
|
self._stream = None
|
|||
|
|
|
|||
|
|
if self._ffmpeg_proc and self._ffmpeg_proc.stdin:
|
|||
|
|
try:
|
|||
|
|
self._ffmpeg_proc.stdin.close()
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
try:
|
|||
|
|
self._ffmpeg_proc.wait(timeout=30)
|
|||
|
|
except Exception:
|
|||
|
|
try:
|
|||
|
|
self._ffmpeg_proc.kill()
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
|
|||
|
|
if (self._output_path
|
|||
|
|
and os.path.isfile(self._output_path)
|
|||
|
|
and os.path.getsize(self._output_path) > 0):
|
|||
|
|
self._ffmpeg_proc = None
|
|||
|
|
return self._output_path
|
|||
|
|
|
|||
|
|
self._ffmpeg_proc = None
|
|||
|
|
self._wav_fallback = True
|
|||
|
|
|
|||
|
|
if self._wav_fallback or not self._output_path:
|
|||
|
|
return self._save_wav_fallback()
|
|||
|
|
|
|||
|
|
return self._output_path
|
|||
|
|
|
|||
|
|
def stop_and_save_wav(self) -> str:
|
|||
|
|
"""Legacy-Alias."""
|
|||
|
|
return self.stop_and_save()
|
|||
|
|
|
|||
|
|
def _save_wav_fallback(self) -> str:
|
|||
|
|
if not self._frames:
|
|||
|
|
raise RuntimeError("Keine Audio-Daten aufgenommen (leer).")
|
|||
|
|
|
|||
|
|
audio = np.concatenate(self._frames, axis=0)
|
|||
|
|
audio = np.clip(audio, -1.0, 1.0)
|
|||
|
|
pcm16 = (audio * 32767.0).astype(np.int16)
|
|||
|
|
|
|||
|
|
fd, path = tempfile.mkstemp(suffix=".wav", prefix="kg_rec_")
|
|||
|
|
os.close(fd)
|
|||
|
|
with wave.open(path, "wb") as wf:
|
|||
|
|
wf.setnchannels(self.channels)
|
|||
|
|
wf.setsampwidth(2)
|
|||
|
|
wf.setframerate(self.samplerate)
|
|||
|
|
wf.writeframes(pcm16.tobytes())
|
|||
|
|
return path
|
|||
|
|
|
|||
|
|
|
|||
|
|
# ── Chunking ──────────────────────────────────────────────────────────
|
|||
|
|
|
|||
|
|
def split_audio_into_chunks(audio_path: str, max_seconds: int = CHUNK_MAX_SECONDS) -> List[str]:
|
|||
|
|
ext = os.path.splitext(audio_path)[1].lower()
|
|||
|
|
if ext == ".m4a":
|
|||
|
|
return _split_m4a(audio_path, max_seconds)
|
|||
|
|
return _split_wav(audio_path, max_seconds)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _split_m4a(m4a_path: str, max_seconds: int) -> List[str]:
|
|||
|
|
ffmpeg = _find_ffmpeg()
|
|||
|
|
if not ffmpeg:
|
|||
|
|
return [m4a_path]
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
probe = subprocess.run(
|
|||
|
|
[ffmpeg, "-i", m4a_path, "-f", "null", "-"],
|
|||
|
|
capture_output=True, timeout=30, creationflags=_NO_WINDOW,
|
|||
|
|
)
|
|||
|
|
duration_s = None
|
|||
|
|
for line in (probe.stderr or b"").decode("utf-8", errors="replace").splitlines():
|
|||
|
|
if "Duration:" in line:
|
|||
|
|
parts = line.split("Duration:")[1].split(",")[0].strip()
|
|||
|
|
h, m, s = parts.split(":")
|
|||
|
|
duration_s = int(h) * 3600 + int(m) * 60 + float(s)
|
|||
|
|
break
|
|||
|
|
if duration_s is None or duration_s <= max_seconds:
|
|||
|
|
return [m4a_path]
|
|||
|
|
except Exception:
|
|||
|
|
return [m4a_path]
|
|||
|
|
|
|||
|
|
chunks: List[str] = []
|
|||
|
|
offset = 0.0
|
|||
|
|
idx = 0
|
|||
|
|
while offset < duration_s:
|
|||
|
|
fd, chunk_path = tempfile.mkstemp(suffix=f"_chunk{idx}.m4a", prefix="kg_rec_")
|
|||
|
|
os.close(fd)
|
|||
|
|
result = subprocess.run(
|
|||
|
|
[ffmpeg, "-y", "-ss", str(offset), "-i", m4a_path,
|
|||
|
|
"-t", str(max_seconds), "-c", "copy", chunk_path],
|
|||
|
|
capture_output=True, timeout=120, creationflags=_NO_WINDOW,
|
|||
|
|
)
|
|||
|
|
if result.returncode == 0 and os.path.isfile(chunk_path) and os.path.getsize(chunk_path) > 0:
|
|||
|
|
chunks.append(chunk_path)
|
|||
|
|
else:
|
|||
|
|
try:
|
|||
|
|
os.remove(chunk_path)
|
|||
|
|
except Exception:
|
|||
|
|
pass
|
|||
|
|
break
|
|||
|
|
offset += max_seconds
|
|||
|
|
idx += 1
|
|||
|
|
|
|||
|
|
return chunks if chunks else [m4a_path]
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _split_wav(wav_path: str, max_seconds: int) -> List[str]:
|
|||
|
|
with wave.open(wav_path, "rb") as wf:
|
|||
|
|
n_channels = wf.getnchannels()
|
|||
|
|
sampwidth = wf.getsampwidth()
|
|||
|
|
framerate = wf.getframerate()
|
|||
|
|
n_frames = wf.getnframes()
|
|||
|
|
|
|||
|
|
duration_s = n_frames / framerate
|
|||
|
|
if duration_s <= max_seconds:
|
|||
|
|
return [wav_path]
|
|||
|
|
|
|||
|
|
chunk_frames = int(max_seconds * framerate)
|
|||
|
|
chunks: List[str] = []
|
|||
|
|
|
|||
|
|
with wave.open(wav_path, "rb") as wf:
|
|||
|
|
frames_remaining = n_frames
|
|||
|
|
idx = 0
|
|||
|
|
while frames_remaining > 0:
|
|||
|
|
read_count = min(chunk_frames, frames_remaining)
|
|||
|
|
data = wf.readframes(read_count)
|
|||
|
|
fd, chunk_path = tempfile.mkstemp(suffix=f"_chunk{idx}.wav", prefix="kg_rec_")
|
|||
|
|
os.close(fd)
|
|||
|
|
with wave.open(chunk_path, "wb") as cf:
|
|||
|
|
cf.setnchannels(n_channels)
|
|||
|
|
cf.setsampwidth(sampwidth)
|
|||
|
|
cf.setframerate(framerate)
|
|||
|
|
cf.writeframes(data)
|
|||
|
|
chunks.append(chunk_path)
|
|||
|
|
frames_remaining -= read_count
|
|||
|
|
idx += 1
|
|||
|
|
|
|||
|
|
return chunks
|
|||
|
|
|
|||
|
|
|
|||
|
|
split_wav_into_chunks = split_audio_into_chunks
|
|||
|
|
|
|||
|
|
|
|||
|
|
def test_audio_device(duration_sec: float = 1.5) -> dict:
|
|||
|
|
"""Quick microphone test: records briefly and checks for signal.
|
|||
|
|
|
|||
|
|
Returns dict with keys:
|
|||
|
|
ok (bool), device (str|None), message (str)
|
|||
|
|
"""
|
|||
|
|
if sd is None:
|
|||
|
|
return {
|
|||
|
|
"ok": False,
|
|||
|
|
"device": None,
|
|||
|
|
"message": "Python-Paket 'sounddevice' ist nicht verfügbar.\n"
|
|||
|
|
"Audio-Aufnahme nicht möglich.",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
dev_info = sd.query_devices(kind="input")
|
|||
|
|
device_name = dev_info.get("name", "Unbekanntes Gerät")
|
|||
|
|
except Exception:
|
|||
|
|
return {
|
|||
|
|
"ok": False,
|
|||
|
|
"device": None,
|
|||
|
|
"message": "Kein Eingabegerät (Mikrofon) gefunden.\n"
|
|||
|
|
"Bitte Mikrofon anschliessen und erneut versuchen.",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
audio = sd.rec(
|
|||
|
|
int(duration_sec * 16000),
|
|||
|
|
samplerate=16000,
|
|||
|
|
channels=1,
|
|||
|
|
dtype="float32",
|
|||
|
|
blocking=True,
|
|||
|
|
)
|
|||
|
|
except Exception as exc:
|
|||
|
|
return {
|
|||
|
|
"ok": False,
|
|||
|
|
"device": device_name,
|
|||
|
|
"message": f"Aufnahmetest fehlgeschlagen:\n{exc}",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
if audio is None or len(audio) == 0:
|
|||
|
|
return {
|
|||
|
|
"ok": False,
|
|||
|
|
"device": device_name,
|
|||
|
|
"message": "Keine Audio-Daten empfangen.\n"
|
|||
|
|
"Bitte Mikrofon-Zugriff in den Windows-Einstellungen prüfen.",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
peak = float(np.max(np.abs(audio)))
|
|||
|
|
rms = float(np.sqrt(np.mean(audio ** 2)))
|
|||
|
|
|
|||
|
|
if peak < 0.001:
|
|||
|
|
return {
|
|||
|
|
"ok": False,
|
|||
|
|
"device": device_name,
|
|||
|
|
"message": f"Gerät: {device_name}\n\n"
|
|||
|
|
f"Kein Signal erkannt (Peak={peak:.4f}).\n"
|
|||
|
|
"Mikrofon ist möglicherweise stummgeschaltet oder defekt.",
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
level_pct = min(100, int(rms * 1000))
|
|||
|
|
return {
|
|||
|
|
"ok": True,
|
|||
|
|
"device": device_name,
|
|||
|
|
"message": f"Gerät: {device_name}\n\n"
|
|||
|
|
f"Audio-Signal erkannt.\n"
|
|||
|
|
f"Pegel: {level_pct}% (Peak={peak:.3f}, RMS={rms:.4f})\n\n"
|
|||
|
|
"Mikrofon funktioniert.",
|
|||
|
|
}
|