-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
189 lines (147 loc) · 4.95 KB
/
app.py
File metadata and controls
189 lines (147 loc) · 4.95 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import asyncio
import os
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Any, Dict, List
import yaml
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from checks import run_dns_cmd, run_ping_cmd, run_speedtest_cmd
from db import (
init_db,
insert_dnscheck,
insert_ping,
insert_speedtest,
query_dns,
query_pings,
query_speedtests,
)
from log import get_logger
logger = get_logger(__name__)
BASE_DIR = Path(__file__).parent
DB_PATH = os.environ.get("NETDASH_DB", str(BASE_DIR / "netdash.sqlite3"))
CONFIG_PATH = os.environ.get("NETDASH_CONFIG", str(BASE_DIR / "config.yaml"))
STATE: Dict[str, Any] = {
"config": {},
"tasks": [],
}
def load_config() -> Dict[str, Any]:
with open(CONFIG_PATH, "r") as f:
cfg = yaml.safe_load(f) or {}
# Defaults
cfg.setdefault("speedtest", {"enabled": True, "interval_seconds": 1800})
cfg.setdefault(
"ping",
{
"enabled": True,
"interval_seconds": 60,
"targets": ["1.1.1.1", "1.0.0.1", "8.8.8.8", "8.8.4.4"],
"count": 5,
"interval": 0.2,
},
)
cfg.setdefault(
"dns",
{
"enabled": True,
"interval_seconds": 120,
"resolvers": ["1.1.1.1", "1.0.0.1", "8.8.8.8", "8.8.4.4"],
"domains": ["example.com"],
"timeout": 5,
},
)
cfg.setdefault("retention_days", 30)
logger.info(f"Configuration loaded: {cfg}")
return cfg
async def periodic(task_coro, period: int):
# Stagger first run a tiny bit to avoid thundering herd on startup
await asyncio.sleep(1)
while True:
try:
await task_coro()
except Exception as e:
logger.error(f"Task error: {e}")
await asyncio.sleep(period)
async def _run_speedtest():
data = await asyncio.to_thread(run_speedtest_cmd)
if data:
insert_speedtest(DB_PATH, data)
async def _run_ping():
cfg = STATE.get("config", {})
targets = cfg.get("ping", {}).get("targets", [])
count = int(cfg.get("ping", {}).get("count", 5))
interval = float(cfg.get("ping", {}).get("interval", 0.2))
for t in targets:
data = await asyncio.to_thread(run_ping_cmd, t, count, interval)
if data:
insert_ping(DB_PATH, data)
async def _run_dns():
cfg = STATE.get("config", {})
resolvers = cfg.get("dns", {}).get("resolvers", [])
domains = cfg.get("dns", {}).get("domains", [])
timeout = int(cfg.get("dns", {}).get("timeout", 5))
for r in resolvers:
for d in domains:
data = await asyncio.to_thread(run_dns_cmd, r, d, timeout)
if data:
insert_dnscheck(DB_PATH, data)
@asynccontextmanager
async def lifespan(app: FastAPI):
Path(DB_PATH).parent.mkdir(parents=True, exist_ok=True)
init_db(DB_PATH)
cfg = load_config()
STATE["config"] = cfg
tasks: List[asyncio.Task] = []
# Speedtest scheduler
if cfg.get("speedtest", {}).get("enabled"):
tasks.append(
asyncio.create_task(
periodic(_run_speedtest, cfg["speedtest"]["interval_seconds"])
)
)
logger.info("Speedtest task scheduled.")
# Ping scheduler
if cfg.get("ping", {}).get("enabled"):
tasks.append(
asyncio.create_task(periodic(_run_ping, cfg["ping"]["interval_seconds"]))
)
logger.info("Ping task scheduled.")
# DNS scheduler
if cfg.get("dns", {}).get("enabled"):
tasks.append(
asyncio.create_task(periodic(_run_dns, cfg["dns"]["interval_seconds"]))
)
logger.info("DNS check task scheduled.")
logger.info("Background tasks started.")
STATE["tasks"] = tasks
yield
for t in tasks:
t.cancel()
app = FastAPI(title="Network Uptime Dashboard", lifespan=lifespan)
app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
templates = Jinja2Templates(directory=str(BASE_DIR / "templates"))
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
@app.get("/api/speedtests")
async def api_speedtests(since: int | None = None):
rows = query_speedtests(DB_PATH, since=since)
return JSONResponse(rows)
@app.get("/api/pings")
async def api_pings(since: int | None = None):
rows = query_pings(DB_PATH, since=since)
return JSONResponse(rows)
@app.get("/api/dns")
async def api_dns(since: int | None = None):
rows = query_dns(DB_PATH, since=since)
return JSONResponse(rows)
@app.get("/api/config")
async def api_config():
return JSONResponse(STATE["config"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app:app", host="0.0.0.0", port=int(os.environ.get("PORT", 8000)), reload=False
)