import pyaudiowpatch as pyaudio import numpy as np import threading import lameenc from mutagen.mp3 import MP3 from mutagen.id3 import ID3, TIT2, TPE1, TALB, TRCK class AudioRecorder: """Records system audio via WASAPI loopback and encodes to MP3.""" def __init__(self): self._pa = None self._stream = None self._frames = [] self._lock = threading.Lock() self._recording = False self._sample_rate = 44100 self._channels = 2 self._chunk_size = 512 self._use_float = False self._current_level = 0.0 def _init_pyaudio(self): if self._pa is None: self._pa = pyaudio.PyAudio() def _find_loopback_device(self): self._init_pyaudio() try: wasapi_info = self._pa.get_host_api_info_by_type(pyaudio.paWASAPI) except OSError: return None default_output_idx = wasapi_info["defaultOutputDevice"] default_output = self._pa.get_device_info_by_index(default_output_idx) for i in range(self._pa.get_device_count()): dev = self._pa.get_device_info_by_index(i) if (dev.get("isLoopbackDevice", False) and default_output["name"] in dev["name"]): return dev return None def start(self): loopback = self._find_loopback_device() if not loopback: raise RuntimeError( "Kein Loopback-Geraet gefunden.\n" "Stelle sicher, dass ein Audioausgang aktiv ist." ) self._sample_rate = int(loopback["defaultSampleRate"]) self._channels = min(loopback["maxInputChannels"], 2) self._frames = [] self._recording = True self._use_float = False self._current_level = 0.0 def callback(in_data, frame_count, time_info, status): if self._recording: with self._lock: self._frames.append(in_data) self._compute_level(in_data) else: self._current_level = 0.0 return (None, pyaudio.paContinue) try: self._stream = self._pa.open( format=pyaudio.paInt16, channels=self._channels, rate=self._sample_rate, input=True, input_device_index=loopback["index"], frames_per_buffer=self._chunk_size, stream_callback=callback, ) except OSError: self._use_float = True self._channels = loopback["maxInputChannels"] self._stream = self._pa.open( format=pyaudio.paFloat32, channels=self._channels, rate=self._sample_rate, input=True, input_device_index=loopback["index"], frames_per_buffer=self._chunk_size, stream_callback=callback, ) def _compute_level(self, data): """Compute RMS level from a single audio chunk (0.0 – 1.0).""" try: if self._use_float: samples = np.frombuffer(data, dtype=np.float32) rms = float(np.sqrt(np.mean(samples ** 2))) self._current_level = min(rms * 4.0, 1.0) else: samples = np.frombuffer(data, dtype=np.int16).astype(np.float64) rms = float(np.sqrt(np.mean(samples ** 2))) self._current_level = min(rms / 8000.0, 1.0) except Exception: self._current_level = 0.0 def get_level(self): """Current audio level (0.0 – 1.0), safe to call from any thread.""" return self._current_level def stop(self): self._recording = False self._current_level = 0.0 if self._stream: try: self._stream.stop_stream() self._stream.close() except OSError: pass self._stream = None def pause(self): self._recording = False def resume(self): self._recording = True @property def is_recording(self): return self._recording def harvest_frames(self): """Take all recorded frames and clear the buffer.""" with self._lock: frames = self._frames[:] self._frames.clear() return frames def _prepare_pcm(self, raw_data): """Convert raw audio to 16-bit stereo PCM.""" if self._use_float: float_samples = np.frombuffer(raw_data, dtype=np.float32) if self._channels > 2: float_samples = float_samples.reshape(-1, self._channels)[:, :2].flatten() int_samples = np.clip(float_samples * 32767, -32768, 32767).astype(np.int16) return int_samples.tobytes(), 2 else: if self._channels > 2: samples = np.frombuffer(raw_data, dtype=np.int16) samples = samples.reshape(-1, self._channels)[:, :2].flatten() return samples.tobytes(), 2 return raw_data, self._channels def save_mp3(self, frames, filepath, title="", artist="", bitrate=320, album="", track_num=0): """Encode PCM frames to MP3 and add ID3 tags.""" if not frames: return False raw_pcm = b"".join(frames) min_bytes = self._sample_rate * self._channels * 2 if len(raw_pcm) < min_bytes: return False pcm_data, channels = self._prepare_pcm(raw_pcm) encoder = lameenc.Encoder() encoder.set_bit_rate(bitrate) encoder.set_in_sample_rate(self._sample_rate) encoder.set_channels(channels) encoder.set_quality(2) mp3_data = encoder.encode(pcm_data) mp3_data += encoder.flush() with open(filepath, "wb") as f: f.write(mp3_data) try: audio = MP3(filepath) if audio.tags is None: audio.add_tags() if title: audio.tags.add(TIT2(encoding=3, text=title)) if artist: audio.tags.add(TPE1(encoding=3, text=artist)) if album: audio.tags.add(TALB(encoding=3, text=album)) if track_num: audio.tags.add(TRCK(encoding=3, text=str(track_num))) audio.save() except Exception: pass return True def cleanup(self): self.stop() if self._pa: self._pa.terminate() self._pa = None