-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrun.py
More file actions
488 lines (430 loc) · 18 KB
/
run.py
File metadata and controls
488 lines (430 loc) · 18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
#!/usr/bin/env python3
import os, sys, json, time, pathlib, importlib.util
from typing import Dict, Any, List
import threading
from run_state import save_alarms_cache, load_ui_cache
sys.path.insert(0, os.path.dirname(__file__) or "/")
try:
import run_state
except ModuleNotFoundError:
p = pathlib.Path(__file__).with_name("run_state.py")
if not p.exists():
raise
spec = importlib.util.spec_from_file_location("run_state", str(p))
run_state = importlib.util.module_from_spec(spec)
sys.modules["run_state"] = run_state
spec.loader.exec_module(run_state)
from run_state import CACHE_LOCK, ACTIONS_CACHE, ORG_NAMES, save_ui_cache
API_BASE = os.environ.get("API_BASE_URL", "").rstrip("/")
TOKEN = os.environ.get("TOKEN", "")
HEADER_NAME = os.environ.get("HEADER_NAME", "Personal-Access-Token")
VERIFY_TLS = (os.environ.get("VERIFY_TLS", "true").lower() != "false")
ORG_IDS = [int(x) for x in os.environ.get("ORG_IDS", "").split(",") if x.strip().isdigit()]
PRIMARY = int(os.environ.get("PRIMARY_ORG_ID", "0") or 0)
INTERVAL = int(os.environ.get("POLL_INTERVAL", os.environ.get("POLL_INTERVAL_SEC", "30")))
MQTT_HOST = os.environ.get("MQTT_HOST", "")
MQTT_PORT = int(os.environ.get("MQTT_PORT", "1883"))
MQTT_USER = os.environ.get("MQTT_USERNAME", "")
MQTT_PASS = os.environ.get("MQTT_PASSWORD", "")
DISCOVERY = os.environ.get("DISCOVERY_PREFIX", "homeassistant")
DEVICE_NAME = os.environ.get("DEVICE_NAME", "GroupAlarm Bridge")
DEVICE_ID = os.environ.get("DEVICE_ID", "groupalarm_bridge_1")
ALARM_POLL_SEC = int(os.environ.get("ALARM_POLL_SEC", "5"))
LAST_ALARM_IDS = {}
ALARM_RESET_SECONDS = int(os.environ.get("ALARM_RESET_SECONDS", "5"))
_last_alarm_published: dict[int, dict] = {} # {org_id: {"id": int, "ts": epoch}}
mqtt = None
have_mqtt = False
import requests
def fetch_latest_alarms_for_org(org_id: int) -> list[dict]:
try:
data = http("GET", f"/alarms", params={"organization": org_id})
alarms = (data or {}).get("alarms") or []
return alarms[:5]
except Exception as e:
print(json.dumps({"alarms_error": str(e), "org": org_id}), flush=True)
return []
def refresh_alarms_every_5s():
while True:
try:
live_ids, names_all, avatars_all = current_org_ids_and_names()
target_orgs = live_ids or (ORG_IDS or ([PRIMARY] if PRIMARY else []))
alarms_by_org = {}
for oid in target_orgs:
alarms = fetch_latest_alarms_for_org(oid)
alarms_by_org[oid] = alarms
save_ui_cache(alarms_by_org=alarms_by_org)
except Exception as e:
print(json.dumps({"alarms_loop_error": str(e)}), flush=True)
time.sleep(5)
def http(method: str, path: str, **kw):
assert API_BASE, "API_BASE_URL missing"
headers = kw.pop("headers", {})
headers[HEADER_NAME] = TOKEN
kw["headers"] = headers
kw.setdefault("timeout", 15)
kw.setdefault("verify", VERIFY_TLS)
r = requests.request(method.upper(), f"{API_BASE}{path}", **kw)
r.raise_for_status()
if r.headers.get("content-type", "").startswith("application/json"):
return r.json()
return r.text
ICON_MAP = {
"local_fire_department": "mdi:fire-alert",
"medical_information": "mdi:medical-bag",
"notifications": "mdi:bell",
"engineering": "mdi:hard-hat",
"security": "mdi:shield",
"bolt": "mdi:flash",
"groups": "mdi:account-group",
"sos": "mdi:sos",
"zoom_in_map": "mdi:map-marker-outline",
"whatshot": "mdi:fire",
}
def quick_actions_for_org(org_id: int) -> List[dict]:
try:
data = http("GET", f"/organization/{org_id}/quick-actions")
if isinstance(data, list):
out = []
for qa in data:
if not isinstance(qa, dict):
continue
out.append({
"id": qa.get("id"),
"organization_id": qa.get("organization_id"),
"name": qa.get("name"),
"color": qa.get("color"),
"icon": qa.get("icon"),
"one_click": qa.get("one_click"),
"resource": qa.get("resource"),
"resource_id": qa.get("resource_id"),
"category": qa.get("category"),
})
return out
except Exception as e:
print(json.dumps({"org": org_id, "quick_actions_error": str(e)}), flush=True)
return []
def current_org_ids_and_names() -> tuple[list[int], dict[int, str], dict[int, str]]:
try:
orgs = http("GET", "/organizations")
except Exception as e:
print(json.dumps({"organizations_error": str(e)}), flush=True)
return [], {}, {}
ids: list[int] = []
names: dict[int, str] = {}
avatars: dict[int, str] = {}
for o in orgs or []:
if not isinstance(o, dict) or "id" not in o:
continue
oid = int(o["id"])
ids.append(oid)
names[oid] = o.get("name", f"Org {oid}")
avatars[oid] = o.get("avatarURL") or ""
return ids, names, avatars
def on_mqtt_message(client, userdata, msg):
try:
topic = msg.topic
payload = msg.payload.decode("utf-8", "ignore")
prefix = f"{DISCOVERY}/{DEVICE_ID}/org/"
if not topic.startswith(prefix):
return
parts = topic[len(prefix):].split("/")
if len(parts) != 4 or parts[1] != "action" or parts[3] != "set":
return
org_id = int(parts[0]); qa_id = int(parts[2])
if payload != "PRESS":
return
actions = quick_actions_for_org(org_id)
qa = next((x for x in actions if x.get("id") == qa_id), None)
if not qa:
print(json.dumps({"press_error":"qa_not_found","org":org_id,"qa_id":qa_id}), flush=True)
return
res_type = qa.get("resource"); res_id = qa.get("resource_id")
if res_type == "resource-template":
body = {"organizationID": org_id, "alarmResourceTemplateID": res_id}
try:
resp = http("POST", "/alarm", json=body)
print(json.dumps({"triggered":"alarm_from_template","org":org_id,"qa_id":qa_id,"resp":resp}), flush=True)
except Exception as e:
print(json.dumps({"trigger_error":"alarm_from_template","org":org_id,"qa_id":qa_id,"err":str(e)}), flush=True)
elif res_type == "tag":
body = {"organizationID": org_id}
try:
resp = http("POST", f"/tags/{res_id}/trigger", json=body)
print(json.dumps({"triggered":"tag","org":org_id,"qa_id":qa_id,"resp":resp}), flush=True)
except Exception as e:
print(json.dumps({"trigger_error":"tag","org":org_id,"qa_id":qa_id,"err":str(e)}), flush=True)
else:
print(json.dumps({"press_ignored":"unsupported_resource","org":org_id,"qa_id":qa_id,"resource":res_type}), flush=True)
except Exception as e:
print(json.dumps({"mqtt_on_message_err":str(e)}), flush=True)
def try_setup_mqtt():
global mqtt, have_mqtt
if not MQTT_HOST:
return
try:
import paho.mqtt.client as paho
mqtt = paho.Client(client_id=f"{DEVICE_ID}", protocol=paho.MQTTv311)
if MQTT_USER or MQTT_PASS:
mqtt.username_pw_set(MQTT_USER, MQTT_PASS)
mqtt.on_message = on_mqtt_message
mqtt.connect(MQTT_HOST, MQTT_PORT, keepalive=60)
mqtt.loop_start()
have_mqtt = True
run_state.mqtt_client = mqtt
print(json.dumps({"mqtt":"connected","host":MQTT_HOST,"port":MQTT_PORT}), flush=True)
except Exception as e:
print(json.dumps({"mqtt_error":str(e)}), flush=True)
def mqtt_publish(topic, payload, retain=True):
if not have_mqtt: return
if isinstance(payload, (dict, list)):
payload = json.dumps(payload, ensure_ascii=False)
mqtt.publish(topic, payload, qos=0, retain=retain)
def discovery_alarm_binary_sensor(org_id: int, org_name: str):
if not have_mqtt: return
uniq = f"{DEVICE_ID}_org{org_id}_alarm"
topic_cfg = f"{DISCOVERY}/binary_sensor/{DEVICE_ID}/org{org_id}_alarm/config"
state_topic = f"{DISCOVERY}/{DEVICE_ID}/org/{org_id}/alarm"
payload = {
"name": f"{org_name} – Alarm",
"unique_id": uniq,
"device": {
"identifiers": [DEVICE_ID],
"name": DEVICE_NAME,
"manufacturer": "GroupAlarm",
"model": "REST Bridge",
},
"state_topic": state_topic,
"value_template": "{{ 'ON' if value_json.active else 'OFF' }}",
"json_attributes_topic": state_topic,
"icon": "mdi:alarm-light",
}
mqtt_publish(topic_cfg, payload, retain=True)
def publish_alarm_state(org_id: int, alarm: dict | None, org_name: str):
"""
Bei neuem Alarm: active:true + reichlich Attribute veröffentlichen.
Nach Ablauf: active:false.
"""
state_topic = f"{DISCOVERY}/{DEVICE_ID}/org/{org_id}/alarm"
if alarm is None:
mqtt_publish(state_topic, {"active": False, "ts": int(time.time())}, retain=False)
return
ev = alarm.get("event") or {}
sev = ev.get("severity") or {}
opt = alarm.get("optionalContent") or {}
attrs = {
"active": True,
"id": alarm.get("id"),
"message": alarm.get("message"),
"startDate": alarm.get("startDate"),
"organizationID": alarm.get("organizationID"),
"event": {
"id": ev.get("id"),
"name": ev.get("name"),
"severity": {
"name": sev.get("name"),
"level": sev.get("level"),
"color": sev.get("color"),
"icon": sev.get("icon"),
},
},
"address": opt.get("address"),
"latitude": opt.get("latitude"),
"longitude": opt.get("longitude"),
"creatorName": alarm.get("creatorName"),
"creatorID": alarm.get("creatorID"),
}
mqtt_publish(state_topic, attrs, retain=False)
def discovery_quick_action_button(org_id: int, org_name: str, qa: dict):
if not have_mqtt:
return
qa_id = qa.get("id")
qa_name = qa.get("name", f"QAction {qa_id}")
icon = ICON_MAP.get(qa.get("icon") or "", "mdi:gesture-tap-button")
uniq = f"{DEVICE_ID}_org{org_id}_qa_{qa_id}"
cfg_topic = f"{DISCOVERY}/button/{DEVICE_ID}/org{org_id}_qa{qa_id}/config"
cmd_topic = f"{DISCOVERY}/{DEVICE_ID}/org/{org_id}/action/{qa_id}/set"
mqtt_publish(cfg_topic, {
"name": f"{org_name} – {qa_name}",
"unique_id": uniq,
"device": {
"identifiers": [DEVICE_ID],
"name": DEVICE_NAME,
"manufacturer": "GroupAlarm",
"model": "REST Bridge",
},
"icon": icon,
"entity_category": "config",
"command_topic": cmd_topic,
"payload_press": "PRESS",
}, retain=True)
def discovery_quick_actions_for_org(org_id: int, org_name: str, actions: List[dict]):
for qa in actions:
discovery_quick_action_button(org_id, org_name, qa)
def mqtt_cleanup_org(org_id: int, actions: list[dict] | None = None):
"""Leert retained Configs, damit HA alte Button-Entities löscht."""
if not have_mqtt:
return
if actions is None:
actions = ACTIONS_CACHE.get(org_id, [])
for qa in actions:
qa_id = qa.get("id")
cfg_topic = f"{DISCOVERY}/button/{DEVICE_ID}/org{org_id}_qa{qa_id}/config"
mqtt.publish(cfg_topic, b"", qos=0, retain=True)
st = f"{DISCOVERY}/{DEVICE_ID}/org/{org_id}/state"
mqtt.publish(st, b"", qos=0, retain=False)
def publish_quick_actions(org_id: int, actions: List[dict]):
"""Nur Log + optional ein Topic mit der Rohliste (kannst du für Debug nutzen)."""
print(json.dumps({"org": org_id, "quick_actions": actions}, ensure_ascii=False), flush=True)
if have_mqtt:
topic = f"{DISCOVERY}/{DEVICE_ID}/org/{org_id}/quick_actions"
mqtt.publish(topic, json.dumps(actions, ensure_ascii=False), qos=0, retain=True)
def refresh_all_quick_actions_and_discovery():
live_ids, names_all, avatars_all = current_org_ids_and_names()
target_orgs = live_ids or (ORG_IDS or ([PRIMARY] if PRIMARY else []))
ui_actions: Dict[int, List[dict]] = {}
seen: set[int] = set()
for org in target_orgs:
seen.add(org)
actions = quick_actions_for_org(org)
publish_quick_actions(org, actions)
discovery_quick_actions_for_org(org, names_all.get(org, f"Org {org}"), actions)
with CACHE_LOCK:
ORG_NAMES[org] = names_all.get(org, f"Org {org}")
ACTIONS_CACHE[org] = actions
ui_actions[org] = actions
if live_ids:
with CACHE_LOCK:
stale = [oid for oid in list(ACTIONS_CACHE.keys()) if oid not in seen]
for oid in stale:
try:
mqtt_cleanup_org(oid, ACTIONS_CACHE.get(oid))
except Exception as e:
print(json.dumps({"cleanup_error": str(e), "org": oid}), flush=True)
with CACHE_LOCK:
ACTIONS_CACHE.pop(oid, None)
ORG_NAMES.pop(oid, None)
try:
names_for_ui = {org: ORG_NAMES.get(org, f"Org {org}") for org in target_orgs}
save_ui_cache(
names=names_for_ui,
actions_by_org=ui_actions,
org_order=target_orgs,
avatars={org: avatars_all.get(org, "") for org in target_orgs}
)
print(json.dumps(
{"cache_file": "updated", "orgs": target_orgs,
"counts": {str(k): len(v) for k, v in ui_actions.items()}},
ensure_ascii=False
), flush=True)
except Exception as e:
print(json.dumps({"cache_file_error": str(e)}), flush=True)
def simplify_alarm(a: dict) -> dict:
ev = a.get("event") or {}
opt = a.get("optionalContent") or {}
sev = (ev.get("severity") or {})
return {
"id": a.get("id"),
"message": a.get("message"),
"startDate": a.get("startDate"),
"organizationID": a.get("organizationID"),
"event": {
"id": ev.get("id"),
"name": ev.get("name"),
"startDate": ev.get("startDate"),
"severity": {
"level": (sev.get("level")),
"name": (sev.get("name")),
"color": (sev.get("color")),
"icon": (sev.get("icon")),
}
},
"optionalContent": {
"address": opt.get("address"),
"latitude": opt.get("latitude"),
"longitude": opt.get("longitude"),
}
}
def fetch_org_alarms(org_id: int) -> list[dict]:
try:
data = http("GET", f"/alarms?organization={org_id}")
return list(data.get("alarms", [])) if isinstance(data, dict) else []
except Exception as e:
print(json.dumps({"alarms_error": str(e), "org": org_id}), flush=True)
return []
def alarm_poller_once(target_orgs: list[int]):
ui = load_ui_cache() or {}
names = ui.get("names", {})
by_org: dict[int, list] = {}
now = int(time.time())
for org in target_orgs:
alarms = fetch_org_alarms(org)
by_org[org] = alarms
org_name = names.get(org, f"Org {org}")
discovery_alarm_binary_sensor(org, org_name)
latest_id = max((a.get("id", 0) for a in alarms), default=0)
last = _last_alarm_published.get(org, {})
last_id = int(last.get("id") or 0)
last_ts = int(last.get("ts") or 0)
if latest_id and latest_id != last_id:
alarm_obj = next((a for a in alarms if a.get("id") == latest_id), None)
publish_alarm_state(org, alarm_obj, org_name)
_last_alarm_published[org] = {"id": latest_id, "ts": now}
elif last_id and (now - last_ts >= ALARM_RESET_SECONDS):
publish_alarm_state(org, None, org_name) # OFF
_last_alarm_published[org] = {"id": last_id, "ts": now}
save_alarms_cache(by_org=by_org)
print(json.dumps({"alarms_cache": "updated",
"counts": {str(k): len(v) for k, v in by_org.items()}},
ensure_ascii=False), flush=True)
def refresh_alarms_for_all_orgs():
live_ids, names_all, avatars_all = current_org_ids_and_names()
target_orgs = live_ids or (ORG_IDS or ([PRIMARY] if PRIMARY else []))
alarms_by_org: dict[int, list[dict]] = {}
for oid in target_orgs:
alarms_by_org[oid] = fetch_org_alarms(oid)
try:
save_alarms_cache(by_org=alarms_by_org)
print(json.dumps({"alarms_cache": "updated",
"counts": {str(k): len(v) for k, v in alarms_by_org.items()}}, ensure_ascii=False), flush=True)
except Exception as e:
print(json.dumps({"alarms_cache_error": str(e)}), flush=True)
def discovery_for_org(org_id: int, org_name: str) -> None:
return
def ensure_discovery():
if not have_mqtt:
return
try:
orgs = http("GET", "/organizations")
names = {int(o["id"]): o.get("name", f"Org {o['id']}") for o in orgs if isinstance(o, dict) and "id" in o}
except Exception as e:
print(json.dumps({"discovery_orgs_error": str(e)}), flush=True)
names = {}
target_orgs = ORG_IDS or ([PRIMARY] if PRIMARY else [])
if not target_orgs:
target_orgs = list(names.keys())
for oid in target_orgs:
discovery_for_org(oid, names.get(oid, f"Org {oid}"))
def ensure_mqtt_subscribe():
if not have_mqtt:
return
topic = f"{DISCOVERY}/{DEVICE_ID}/org/+/action/+/set"
try:
mqtt.subscribe(topic, qos=0)
mqtt.on_message = on_mqtt_message
except Exception as e:
print(json.dumps({"mqtt_subscribe_error": str(e), "topic": topic}), flush=True)
if __name__ == "__main__":
try_setup_mqtt()
refresh_all_quick_actions_and_discovery()
i = 0
while True:
if i % 6 == 0:
refresh_all_quick_actions_and_discovery()
target_orgs = ORG_IDS or ([PRIMARY] if PRIMARY else [])
if not target_orgs:
ui = load_ui_cache() or {}
target_orgs = ui.get("order", []) or sorted((ui.get("names") or {}).keys())
alarm_poller_once(target_orgs)
i += 1
time.sleep(5)