-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathaudio_capture.py
More file actions
155 lines (132 loc) · 4.98 KB
/
Copy pathaudio_capture.py
File metadata and controls
155 lines (132 loc) · 4.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
"""
CS2 Real-time Voice Translator
Audio device enumeration and capture thread
Uses PyAudioWPatch for WASAPI loopback support — this lets us capture
audio from output devices (speakers/headphones), not just microphones.
"""
import io
import wave
import queue
import threading
import numpy as np
import pyaudiowpatch as pyaudio
SAMPLE_RATE = 16000
CHANNELS = 1
CHUNK = 1024
FORMAT = pyaudio.paInt16
SAMPLE_WIDTH = 2 # 16-bit = 2 bytes
def list_audio_devices() -> list[dict]:
"""Return available input devices and WASAPI loopback devices."""
devices = []
p = pyaudio.PyAudio()
try:
# Regular input devices (microphones, virtual cables)
for i in range(p.get_device_count()):
info = p.get_device_info_by_index(i)
if info["maxInputChannels"] > 0:
devices.append({
"index": i,
"name": info["name"],
"loopback": False,
"channels": info["maxInputChannels"],
"default_rate": int(info["defaultSampleRate"]),
})
# WASAPI loopback devices (capture what speakers output)
try:
for loopback in p.get_loopback_device_info_generator():
devices.append({
"index": loopback["index"],
"name": f"{loopback['name']} [Loopback]",
"loopback": True,
"channels": loopback["maxInputChannels"],
"default_rate": int(loopback["defaultSampleRate"]),
})
except Exception:
pass # WASAPI loopback not available on this system
finally:
p.terminate()
return devices
def build_wav(frames: list[bytes]) -> io.BytesIO:
"""Wrap raw PCM frames into a WAV file in memory."""
buf = io.BytesIO()
with wave.open(buf, "wb") as wf:
wf.setnchannels(CHANNELS)
wf.setsampwidth(SAMPLE_WIDTH)
wf.setframerate(SAMPLE_RATE)
for frame in frames:
wf.writeframes(frame)
buf.seek(0)
buf.name = "audio.wav"
return buf
def _resample_mono(data: bytes, src_channels: int, src_rate: int) -> bytes:
"""Convert multi-channel audio to mono 16kHz 16-bit PCM."""
samples = np.frombuffer(data, dtype=np.int16)
# Mix down to mono
if src_channels > 1:
samples = samples.reshape(-1, src_channels).mean(axis=1).astype(np.int16)
# Resample if needed
if src_rate != SAMPLE_RATE:
num_samples = int(len(samples) * SAMPLE_RATE / src_rate)
indices = np.linspace(0, len(samples) - 1, num_samples).astype(int)
samples = samples[indices]
return samples.tobytes()
class AudioCaptureThread(threading.Thread):
"""Daemon thread that captures audio and produces WAV BytesIO buffers."""
def __init__(
self,
device_index: int | None,
buffer_duration: float,
output_queue: queue.Queue,
stop_event: threading.Event,
on_error: callable,
loopback: bool = False,
device_channels: int = CHANNELS,
device_rate: int = SAMPLE_RATE,
):
super().__init__(daemon=True)
self.device_index = device_index
self.buffer_duration = buffer_duration
self.output_queue = output_queue
self.stop_event = stop_event
self.on_error = on_error
self.loopback = loopback
self.device_channels = device_channels
self.device_rate = device_rate
def run(self):
p = pyaudio.PyAudio()
try:
# Loopback devices are already registered as input devices by
# PyAudioWPatch — just open them normally with their native settings.
stream = p.open(
format=FORMAT,
channels=self.device_channels,
rate=self.device_rate,
input=True,
input_device_index=self.device_index,
frames_per_buffer=CHUNK,
)
except OSError as e:
self.on_error(f"Could not open audio device: {e}")
p.terminate()
return
chunks_needed = int(self.device_rate / CHUNK * self.buffer_duration)
frames: list[bytes] = []
needs_conversion = self.device_channels > 1 or self.device_rate != SAMPLE_RATE
try:
while not self.stop_event.is_set():
try:
data = stream.read(CHUNK, exception_on_overflow=False)
if needs_conversion:
data = _resample_mono(data, self.device_channels, self.device_rate)
frames.append(data)
if len(frames) >= chunks_needed:
wav_buf = build_wav(frames)
self.output_queue.put(wav_buf)
frames = []
except OSError as e:
self.on_error(f"Audio read error: {e}")
break
finally:
stream.stop_stream()
stream.close()
p.terminate()