203 lines
6.5 KiB
Python
203 lines
6.5 KiB
Python
import pyaudiowpatch as pyaudio
|
||
import numpy as np
|
||
import threading
|
||
import lameenc
|
||
from mutagen.mp3 import MP3
|
||
from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK
|
||
|
||
|
||
class AudioRecorder:
|
||
"""Records system audio via WASAPI loopback and encodes to MP3."""
|
||
|
||
def __init__(self):
|
||
self._pa = None
|
||
self._stream = None
|
||
self._frames = []
|
||
self._lock = threading.Lock()
|
||
self._recording = False
|
||
self._sample_rate = 44100
|
||
self._channels = 2
|
||
self._chunk_size = 512
|
||
self._use_float = False
|
||
self._current_level = 0.0
|
||
|
||
def _init_pyaudio(self):
|
||
if self._pa is None:
|
||
self._pa = pyaudio.PyAudio()
|
||
|
||
def _find_loopback_device(self):
|
||
self._init_pyaudio()
|
||
try:
|
||
wasapi_info = self._pa.get_host_api_info_by_type(pyaudio.paWASAPI)
|
||
except OSError:
|
||
return None
|
||
|
||
default_output_idx = wasapi_info["defaultOutputDevice"]
|
||
default_output = self._pa.get_device_info_by_index(default_output_idx)
|
||
|
||
for i in range(self._pa.get_device_count()):
|
||
dev = self._pa.get_device_info_by_index(i)
|
||
if (dev.get("isLoopbackDevice", False)
|
||
and default_output["name"] in dev["name"]):
|
||
return dev
|
||
|
||
return None
|
||
|
||
def start(self):
|
||
loopback = self._find_loopback_device()
|
||
if not loopback:
|
||
raise RuntimeError(
|
||
"Kein Loopback-Geraet gefunden.\n"
|
||
"Stelle sicher, dass ein Audioausgang aktiv ist."
|
||
)
|
||
|
||
self._sample_rate = int(loopback["defaultSampleRate"])
|
||
self._channels = min(loopback["maxInputChannels"], 2)
|
||
self._frames = []
|
||
self._recording = True
|
||
self._use_float = False
|
||
self._current_level = 0.0
|
||
|
||
def callback(in_data, frame_count, time_info, status):
|
||
if self._recording:
|
||
with self._lock:
|
||
self._frames.append(in_data)
|
||
self._compute_level(in_data)
|
||
else:
|
||
self._current_level = 0.0
|
||
return (None, pyaudio.paContinue)
|
||
|
||
try:
|
||
self._stream = self._pa.open(
|
||
format=pyaudio.paInt16,
|
||
channels=self._channels,
|
||
rate=self._sample_rate,
|
||
input=True,
|
||
input_device_index=loopback["index"],
|
||
frames_per_buffer=self._chunk_size,
|
||
stream_callback=callback,
|
||
)
|
||
except OSError:
|
||
self._use_float = True
|
||
self._channels = loopback["maxInputChannels"]
|
||
self._stream = self._pa.open(
|
||
format=pyaudio.paFloat32,
|
||
channels=self._channels,
|
||
rate=self._sample_rate,
|
||
input=True,
|
||
input_device_index=loopback["index"],
|
||
frames_per_buffer=self._chunk_size,
|
||
stream_callback=callback,
|
||
)
|
||
|
||
def _compute_level(self, data):
|
||
"""Compute RMS level from a single audio chunk (0.0 – 1.0)."""
|
||
try:
|
||
if self._use_float:
|
||
samples = np.frombuffer(data, dtype=np.float32)
|
||
rms = float(np.sqrt(np.mean(samples ** 2)))
|
||
self._current_level = min(rms * 4.0, 1.0)
|
||
else:
|
||
samples = np.frombuffer(data, dtype=np.int16).astype(np.float64)
|
||
rms = float(np.sqrt(np.mean(samples ** 2)))
|
||
self._current_level = min(rms / 8000.0, 1.0)
|
||
except Exception:
|
||
self._current_level = 0.0
|
||
|
||
def get_level(self):
|
||
"""Current audio level (0.0 – 1.0), safe to call from any thread."""
|
||
return self._current_level
|
||
|
||
def stop(self):
|
||
self._recording = False
|
||
self._current_level = 0.0
|
||
if self._stream:
|
||
try:
|
||
self._stream.stop_stream()
|
||
self._stream.close()
|
||
except OSError:
|
||
pass
|
||
self._stream = None
|
||
|
||
def pause(self):
|
||
self._recording = False
|
||
|
||
def resume(self):
|
||
self._recording = True
|
||
|
||
@property
|
||
def is_recording(self):
|
||
return self._recording
|
||
|
||
def harvest_frames(self):
|
||
"""Take all recorded frames and clear the buffer."""
|
||
with self._lock:
|
||
frames = self._frames[:]
|
||
self._frames.clear()
|
||
return frames
|
||
|
||
def _prepare_pcm(self, raw_data):
|
||
"""Convert raw audio to 16-bit stereo PCM."""
|
||
if self._use_float:
|
||
float_samples = np.frombuffer(raw_data, dtype=np.float32)
|
||
if self._channels > 2:
|
||
float_samples = float_samples.reshape(-1, self._channels)[:, :2].flatten()
|
||
int_samples = np.clip(float_samples * 32767, -32768, 32767).astype(np.int16)
|
||
return int_samples.tobytes(), 2
|
||
else:
|
||
if self._channels > 2:
|
||
samples = np.frombuffer(raw_data, dtype=np.int16)
|
||
samples = samples.reshape(-1, self._channels)[:, :2].flatten()
|
||
return samples.tobytes(), 2
|
||
return raw_data, self._channels
|
||
|
||
def save_mp3(self, frames, filepath, title="", artist="", bitrate=320,
|
||
album="", track_num=0):
|
||
"""Encode PCM frames to MP3 and add ID3 tags."""
|
||
if not frames:
|
||
return False
|
||
|
||
raw_pcm = b"".join(frames)
|
||
|
||
min_bytes = self._sample_rate * self._channels * 2
|
||
if len(raw_pcm) < min_bytes:
|
||
return False
|
||
|
||
pcm_data, channels = self._prepare_pcm(raw_pcm)
|
||
|
||
encoder = lameenc.Encoder()
|
||
encoder.set_bit_rate(bitrate)
|
||
encoder.set_in_sample_rate(self._sample_rate)
|
||
encoder.set_channels(channels)
|
||
encoder.set_quality(2)
|
||
|
||
mp3_data = encoder.encode(pcm_data)
|
||
mp3_data += encoder.flush()
|
||
|
||
with open(filepath, "wb") as f:
|
||
f.write(mp3_data)
|
||
|
||
try:
|
||
audio = MP3(filepath)
|
||
if audio.tags is None:
|
||
audio.add_tags()
|
||
if title:
|
||
audio.tags.add(TIT2(encoding=3, text=title))
|
||
if artist:
|
||
audio.tags.add(TPE1(encoding=3, text=artist))
|
||
if album:
|
||
audio.tags.add(TALB(encoding=3, text=album))
|
||
if track_num:
|
||
audio.tags.add(TRCK(encoding=3, text=str(track_num)))
|
||
audio.save()
|
||
except Exception:
|
||
pass
|
||
|
||
return True
|
||
|
||
def cleanup(self):
|
||
self.stop()
|
||
if self._pa:
|
||
self._pa.terminate()
|
||
self._pa = None
|