Files
2026-03-25 14:14:07 +01:00

203 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pyaudiowpatch as pyaudio
import numpy as np
import threading
import lameenc
from mutagen.mp3 import MP3
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK
class AudioRecorder:
"""Records system audio via WASAPI loopback and encodes to MP3."""
def __init__(self):
self._pa = None
self._stream = None
self._frames = []
self._lock = threading.Lock()
self._recording = False
self._sample_rate = 44100
self._channels = 2
self._chunk_size = 512
self._use_float = False
self._current_level = 0.0
def _init_pyaudio(self):
if self._pa is None:
self._pa = pyaudio.PyAudio()
def _find_loopback_device(self):
self._init_pyaudio()
try:
wasapi_info = self._pa.get_host_api_info_by_type(pyaudio.paWASAPI)
except OSError:
return None
default_output_idx = wasapi_info["defaultOutputDevice"]
default_output = self._pa.get_device_info_by_index(default_output_idx)
for i in range(self._pa.get_device_count()):
dev = self._pa.get_device_info_by_index(i)
if (dev.get("isLoopbackDevice", False)
and default_output["name"] in dev["name"]):
return dev
return None
def start(self):
loopback = self._find_loopback_device()
if not loopback:
raise RuntimeError(
"Kein Loopback-Geraet gefunden.\n"
"Stelle sicher, dass ein Audioausgang aktiv ist."
)
self._sample_rate = int(loopback["defaultSampleRate"])
self._channels = min(loopback["maxInputChannels"], 2)
self._frames = []
self._recording = True
self._use_float = False
self._current_level = 0.0
def callback(in_data, frame_count, time_info, status):
if self._recording:
with self._lock:
self._frames.append(in_data)
self._compute_level(in_data)
else:
self._current_level = 0.0
return (None, pyaudio.paContinue)
try:
self._stream = self._pa.open(
format=pyaudio.paInt16,
channels=self._channels,
rate=self._sample_rate,
input=True,
input_device_index=loopback["index"],
frames_per_buffer=self._chunk_size,
stream_callback=callback,
)
except OSError:
self._use_float = True
self._channels = loopback["maxInputChannels"]
self._stream = self._pa.open(
format=pyaudio.paFloat32,
channels=self._channels,
rate=self._sample_rate,
input=True,
input_device_index=loopback["index"],
frames_per_buffer=self._chunk_size,
stream_callback=callback,
)
def _compute_level(self, data):
"""Compute RMS level from a single audio chunk (0.0 1.0)."""
try:
if self._use_float:
samples = np.frombuffer(data, dtype=np.float32)
rms = float(np.sqrt(np.mean(samples ** 2)))
self._current_level = min(rms * 4.0, 1.0)
else:
samples = np.frombuffer(data, dtype=np.int16).astype(np.float64)
rms = float(np.sqrt(np.mean(samples ** 2)))
self._current_level = min(rms / 8000.0, 1.0)
except Exception:
self._current_level = 0.0
def get_level(self):
"""Current audio level (0.0 1.0), safe to call from any thread."""
return self._current_level
def stop(self):
self._recording = False
self._current_level = 0.0
if self._stream:
try:
self._stream.stop_stream()
self._stream.close()
except OSError:
pass
self._stream = None
def pause(self):
self._recording = False
def resume(self):
self._recording = True
@property
def is_recording(self):
return self._recording
def harvest_frames(self):
"""Take all recorded frames and clear the buffer."""
with self._lock:
frames = self._frames[:]
self._frames.clear()
return frames
def _prepare_pcm(self, raw_data):
"""Convert raw audio to 16-bit stereo PCM."""
if self._use_float:
float_samples = np.frombuffer(raw_data, dtype=np.float32)
if self._channels > 2:
float_samples = float_samples.reshape(-1, self._channels)[:, :2].flatten()
int_samples = np.clip(float_samples * 32767, -32768, 32767).astype(np.int16)
return int_samples.tobytes(), 2
else:
if self._channels > 2:
samples = np.frombuffer(raw_data, dtype=np.int16)
samples = samples.reshape(-1, self._channels)[:, :2].flatten()
return samples.tobytes(), 2
return raw_data, self._channels
def save_mp3(self, frames, filepath, title="", artist="", bitrate=320,
album="", track_num=0):
"""Encode PCM frames to MP3 and add ID3 tags."""
if not frames:
return False
raw_pcm = b"".join(frames)
min_bytes = self._sample_rate * self._channels * 2
if len(raw_pcm) < min_bytes:
return False
pcm_data, channels = self._prepare_pcm(raw_pcm)
encoder = lameenc.Encoder()
encoder.set_bit_rate(bitrate)
encoder.set_in_sample_rate(self._sample_rate)
encoder.set_channels(channels)
encoder.set_quality(2)
mp3_data = encoder.encode(pcm_data)
mp3_data += encoder.flush()
with open(filepath, "wb") as f:
f.write(mp3_data)
try:
audio = MP3(filepath)
if audio.tags is None:
audio.add_tags()
if title:
audio.tags.add(TIT2(encoding=3, text=title))
if artist:
audio.tags.add(TPE1(encoding=3, text=artist))
if album:
audio.tags.add(TALB(encoding=3, text=album))
if track_num:
audio.tags.add(TRCK(encoding=3, text=str(track_num)))
audio.save()
except Exception:
pass
return True
def cleanup(self):
self.stop()
if self._pa:
self._pa.terminate()
self._pa = None