-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlineinpipe
More file actions
executable file
·146 lines (119 loc) · 4.97 KB
/
lineinpipe
File metadata and controls
executable file
·146 lines (119 loc) · 4.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python3
"""lineinpipe — stream a configured ALSA capture input into lox-audioserver.
Usage: lineinpipe <input_id>
Reads inputs.<input_id> from speaker_config.json, opens a TCP connection to
lox's lineIn TCP-ingest (127.0.0.1:7080), sends the lox input id as a
newline-terminated handshake, then relays raw S16_LE PCM captured via arecord
from the `input_<id>` ALSA PCM.
The ALSA `plug` layer (see generate_alsa_config.py) resamples/remixes the
card's native format to lox's canonical 44100 Hz / 2ch, so we always capture
at those values and lox needs no per-zone sample-rate config — only
`inputs.lineIn.source.id` = the input's lox_input_id.
Exits non-zero on any error or stream end so systemd (Restart=always)
reconnects. Installed to /usr/local/bin/lineinpipe; run per input via the
lineinpipe@<input_id>.service template.
"""
import json
import os
import socket
import subprocess
import sys
import threading
import urllib.request
# lox lineIn TCP-ingest expects raw S16_LE at these defaults (44100/2ch).
LOX_RATE = 44100
LOX_CHANNELS = 2
CONFIG = os.environ.get(
"SPEAKER_CONFIG", "/home/tobias/multiroom-tooling/speaker_config.json"
)
LOX_HOST = os.environ.get("LOX_HOST", "127.0.0.1")
LOX_PORT = int(os.environ.get("LOX_PORT", "7080"))
# lox marks a line-in "connected" only while it keeps receiving status
# heartbeats (15s staleness window). Without these POSTs the input shows
# OFFLINE in Loxone even though audio is streaming. We POST to the admin/api
# on 7090 every HEARTBEAT_SECS.
LOX_API_HOST = os.environ.get("LOX_API_HOST", "127.0.0.1")
LOX_API_PORT = int(os.environ.get("LOX_API_PORT", "7090"))
HEARTBEAT_SECS = int(os.environ.get("LINEIN_HEARTBEAT_SECS", "5"))
def fail(msg: str) -> "NoReturn":
print(f"lineinpipe: {msg}", file=sys.stderr)
sys.exit(1)
def _post_status(lox_id: str, state: str, device: str) -> None:
"""POST one status heartbeat so lox keeps the input marked connected."""
url = f"http://{LOX_API_HOST}:{LOX_API_PORT}/api/linein/{lox_id}/bridge-status"
data = json.dumps({"state": state, "device": device}).encode()
req = urllib.request.Request(
url, data=data, headers={"Content-Type": "application/json"}, method="POST"
)
try:
urllib.request.urlopen(req, timeout=5).close()
except Exception as e: # noqa: BLE001 — heartbeat is best-effort
print(f"lineinpipe: heartbeat failed: {e}", file=sys.stderr)
def _heartbeat_loop(lox_id: str, device: str, stop: threading.Event) -> None:
"""Send a 'running' heartbeat immediately and every HEARTBEAT_SECS until
stopped. Runs in a daemon thread alongside the audio relay."""
while not stop.is_set():
_post_status(lox_id, "running", device)
stop.wait(HEARTBEAT_SECS)
def main() -> None:
if len(sys.argv) != 2:
fail("usage: lineinpipe <input_id>")
input_id = sys.argv[1]
try:
with open(CONFIG) as f:
cfg = json.load(f)
except (OSError, ValueError) as e:
fail(f"cannot read {CONFIG}: {e}")
inp = cfg.get("inputs", {}).get(input_id)
if not inp:
fail(f"input {input_id!r} not found in {CONFIG}")
lox_id = inp.get("lox_input_id", input_id)
card = inp.get("card", input_id)
pcm = f"input_{input_id}"
try:
sock = socket.create_connection((LOX_HOST, LOX_PORT), timeout=10)
except OSError as e:
fail(f"cannot connect to lox at {LOX_HOST}:{LOX_PORT}: {e}")
# Newline-terminated input id, then raw PCM.
sock.sendall((lox_id + "\n").encode())
arecord = subprocess.Popen(
[
"arecord", "-D", pcm,
"-f", "S16_LE",
"-c", str(LOX_CHANNELS),
"-r", str(LOX_RATE),
"-t", "raw", "-",
],
stdout=subprocess.PIPE,
)
# Keep lox's "connected" status fresh while we stream.
stop_heartbeat = threading.Event()
hb = threading.Thread(
target=_heartbeat_loop, args=(lox_id, card, stop_heartbeat), daemon=True
)
hb.start()
try:
assert arecord.stdout is not None
while True:
chunk = arecord.stdout.read(4096)
if not chunk:
break # arecord exited / capture device gone
sock.sendall(chunk)
except (BrokenPipeError, ConnectionResetError, OSError) as e:
print(f"lineinpipe: stream interrupted: {e}", file=sys.stderr)
finally:
# Stop heartbeating; lox flips the input offline once the last status
# goes stale (~15s). We deliberately do NOT post a final status — lox's
# `connected` flag tracks heartbeat freshness, not the state string, so
# a parting POST would keep it "connected" for another 15s.
stop_heartbeat.set()
try:
arecord.terminate()
arecord.wait(timeout=2)
except Exception:
arecord.kill()
sock.close()
# Reaching here means the stream ended — exit non-zero so systemd restarts.
fail("capture/relay stopped; exiting for restart")
if __name__ == "__main__":
main()