Files
aza/APP/spotify-recorder/audio.py

203 lines
6.5 KiB
Python
Raw Normal View History

2026-03-25 14:14:07 +01:00
import pyaudiowpatch as pyaudio
import numpy as np
import threading
import lameenc
from mutagen.mp3 import MP3
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK
class AudioRecorder:
"""Records system audio via WASAPI loopback and encodes to MP3."""
def __init__(self):
self._pa = None
self._stream = None
self._frames = []
self._lock = threading.Lock()
self._recording = False
self._sample_rate = 44100
self._channels = 2
self._chunk_size = 512
self._use_float = False
self._current_level = 0.0
def _init_pyaudio(self):
if self._pa is None:
self._pa = pyaudio.PyAudio()
def _find_loopback_device(self):
self._init_pyaudio()
try:
wasapi_info = self._pa.get_host_api_info_by_type(pyaudio.paWASAPI)
except OSError:
return None
default_output_idx = wasapi_info["defaultOutputDevice"]
default_output = self._pa.get_device_info_by_index(default_output_idx)
for i in range(self._pa.get_device_count()):
dev = self._pa.get_device_info_by_index(i)
if (dev.get("isLoopbackDevice", False)
and default_output["name"] in dev["name"]):
return dev
return None
def start(self):
loopback = self._find_loopback_device()
if not loopback:
raise RuntimeError(
"Kein Loopback-Geraet gefunden.\n"
"Stelle sicher, dass ein Audioausgang aktiv ist."
)
self._sample_rate = int(loopback["defaultSampleRate"])
self._channels = min(loopback["maxInputChannels"], 2)
self._frames = []
self._recording = True
self._use_float = False
self._current_level = 0.0
def callback(in_data, frame_count, time_info, status):
if self._recording:
with self._lock:
self._frames.append(in_data)
self._compute_level(in_data)
else:
self._current_level = 0.0
return (None, pyaudio.paContinue)
try:
self._stream = self._pa.open(
format=pyaudio.paInt16,
channels=self._channels,
rate=self._sample_rate,
input=True,
input_device_index=loopback["index"],
frames_per_buffer=self._chunk_size,
stream_callback=callback,
)
except OSError:
self._use_float = True
self._channels = loopback["maxInputChannels"]
self._stream = self._pa.open(
format=pyaudio.paFloat32,
channels=self._channels,
rate=self._sample_rate,
input=True,
input_device_index=loopback["index"],
frames_per_buffer=self._chunk_size,
stream_callback=callback,
)
def _compute_level(self, data):
"""Compute RMS level from a single audio chunk (0.0 1.0)."""
try:
if self._use_float:
samples = np.frombuffer(data, dtype=np.float32)
rms = float(np.sqrt(np.mean(samples ** 2)))
self._current_level = min(rms * 4.0, 1.0)
else:
samples = np.frombuffer(data, dtype=np.int16).astype(np.float64)
rms = float(np.sqrt(np.mean(samples ** 2)))
self._current_level = min(rms / 8000.0, 1.0)
except Exception:
self._current_level = 0.0
def get_level(self):
"""Current audio level (0.0 1.0), safe to call from any thread."""
return self._current_level
def stop(self):
self._recording = False
self._current_level = 0.0
if self._stream:
try:
self._stream.stop_stream()
self._stream.close()
except OSError:
pass
self._stream = None
def pause(self):
self._recording = False
def resume(self):
self._recording = True
@property
def is_recording(self):
return self._recording
def harvest_frames(self):
"""Take all recorded frames and clear the buffer."""
with self._lock:
frames = self._frames[:]
self._frames.clear()
return frames
def _prepare_pcm(self, raw_data):
"""Convert raw audio to 16-bit stereo PCM."""
if self._use_float:
float_samples = np.frombuffer(raw_data, dtype=np.float32)
if self._channels > 2:
float_samples = float_samples.reshape(-1, self._channels)[:, :2].flatten()
int_samples = np.clip(float_samples * 32767, -32768, 32767).astype(np.int16)
return int_samples.tobytes(), 2
else:
if self._channels > 2:
samples = np.frombuffer(raw_data, dtype=np.int16)
samples = samples.reshape(-1, self._channels)[:, :2].flatten()
return samples.tobytes(), 2
return raw_data, self._channels
def save_mp3(self, frames, filepath, title="", artist="", bitrate=320,
album="", track_num=0):
"""Encode PCM frames to MP3 and add ID3 tags."""
if not frames:
return False
raw_pcm = b"".join(frames)
min_bytes = self._sample_rate * self._channels * 2
if len(raw_pcm) < min_bytes:
return False
pcm_data, channels = self._prepare_pcm(raw_pcm)
encoder = lameenc.Encoder()
encoder.set_bit_rate(bitrate)
encoder.set_in_sample_rate(self._sample_rate)
encoder.set_channels(channels)
encoder.set_quality(2)
mp3_data = encoder.encode(pcm_data)
mp3_data += encoder.flush()
with open(filepath, "wb") as f:
f.write(mp3_data)
try:
audio = MP3(filepath)
if audio.tags is None:
audio.add_tags()
if title:
audio.tags.add(TIT2(encoding=3, text=title))
if artist:
audio.tags.add(TPE1(encoding=3, text=artist))
if album:
audio.tags.add(TALB(encoding=3, text=album))
if track_num:
audio.tags.add(TRCK(encoding=3, text=str(track_num)))
audio.save()
except Exception:
pass
return True
def cleanup(self):
self.stop()
if self._pa:
self._pa.terminate()
self._pa = None