-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
132 lines (106 loc) · 4.26 KB
/
Copy pathmain.py
File metadata and controls
132 lines (106 loc) · 4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
"""Run alert worker and expose monitoring API endpoints."""
from __future__ import annotations
import json
import logging
import os
import signal
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
from urllib.parse import parse_qs, urlparse
from alerts.alert_manager import load_active_alerts, start_alert_manager
from storage.db_client import DBClient
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)
logger = logging.getLogger(__name__)
def normalize_alert_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
for row in rows:
if row.get("_field") not in (None, "message"):
continue
raw_message = str(row.get("_value") or row.get("message") or "")
try:
payload = json.loads(raw_message)
except json.JSONDecodeError:
payload = {}
if isinstance(payload, dict) and {"severity", "device", "cpu", "time"} <= set(payload):
normalized.append(
{
"severity": str(payload["severity"]),
"device": str(payload["device"]),
"cpu": float(payload["cpu"]),
"time": str(payload["time"]),
}
)
continue
normalized.append(
{
"time": str(row.get("_time", "")),
"host": str(row.get("host", "unknown")),
"severity": str(row.get("severity", "info")),
"message": raw_message,
}
)
return normalized
def build_handler(db: DBClient) -> type[BaseHTTPRequestHandler]:
class AlertAPIHandler(BaseHTTPRequestHandler):
def _send_json(self, status_code: int, payload: dict[str, Any]) -> None:
body = json.dumps(payload).encode("utf-8")
self.send_response(status_code)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self) -> None: # noqa: N802
parsed = urlparse(self.path)
if parsed.path == "/health":
self._send_json(200, {"status": "ok", "influx": db.is_healthy()})
return
if parsed.path == "/api/alerts":
params = parse_qs(parsed.query)
host = params.get("host", [None])[0]
try:
limit = int(params.get("limit", ["20"])[0])
except ValueError:
self._send_json(400, {"error": "limit must be integer"})
return
file_alerts = load_active_alerts()
if host:
file_alerts = [a for a in file_alerts if a.get("device") == host]
file_alerts = file_alerts[-max(1, min(limit, 200)) :]
rows = db.get_recent_alerts(host=host, limit=max(1, min(limit, 200)))
items = [*file_alerts, *normalize_alert_rows(rows)]
self._send_json(
200,
{
"count": len(items),
"items": items,
},
)
return
self._send_json(404, {"error": "not found"})
def log_message(self, fmt: str, *args: Any) -> None:
logger.info("HTTP %s", fmt % args)
return AlertAPIHandler
def main() -> int:
host = os.getenv("API_HOST", "0.0.0.0")
port = int(os.getenv("API_PORT", "8000"))
with DBClient.from_env() as db:
_, stop_event = start_alert_manager(db)
server = ThreadingHTTPServer((host, port), build_handler(db))
logger.info("API serving at http://%s:%d", host, port)
def _shutdown(*_: Any) -> None:
logger.info("Shutting down...")
stop_event.set()
server.shutdown()
signal.signal(signal.SIGINT, _shutdown)
signal.signal(signal.SIGTERM, _shutdown)
try:
server.serve_forever()
finally:
stop_event.set()
server.server_close()
return 0
if __name__ == "__main__":
raise SystemExit(main())