-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb.py
More file actions
184 lines (162 loc) · 5.09 KB
/
db.py
File metadata and controls
184 lines (162 loc) · 5.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import sqlite3
from typing import Any, Dict, List, Optional
from log import get_logger
logger = get_logger(__name__)
SCHEMA = [
"""
CREATE TABLE IF NOT EXISTS speedtests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
ping_ms REAL,
jitter_ms REAL,
download_mbps REAL,
upload_mbps REAL,
packet_loss REAL
);
""",
"""
CREATE TABLE IF NOT EXISTS pings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
target TEXT NOT NULL,
sent INTEGER,
received INTEGER,
loss_pct REAL,
min_ms REAL,
avg_ms REAL,
max_ms REAL,
stddev_ms REAL
);
""",
"""
CREATE TABLE IF NOT EXISTS dnschecks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts INTEGER NOT NULL,
resolver TEXT NOT NULL,
domain TEXT NOT NULL,
status TEXT,
query_time_ms INTEGER
);
""",
]
def _connect(db_path: str):
conn = sqlite3.connect(db_path, check_same_thread=False)
conn.row_factory = sqlite3.Row
logger.info(f"Connected to database at {db_path}")
return conn
def init_db(db_path: str):
logger.info(f"Initializing database at {db_path}")
conn = _connect(db_path)
cur = conn.cursor()
for ddl in SCHEMA:
cur.executescript(ddl)
conn.commit()
logger.info("Database initialized with required tables.")
conn.close()
def insert_speedtest(db_path: str, data: Dict[str, Any]):
conn = _connect(db_path)
cur = conn.cursor()
cur.execute(
"INSERT INTO speedtests (ts, ping_ms, jitter_ms, download_mbps, upload_mbps, packet_loss) VALUES (?, ?, ?, ?, ?, ?)",
(
data.get("ts"),
data.get("ping_ms"),
data.get("jitter_ms"),
data.get("download_mbps"),
data.get("upload_mbps"),
data.get("packet_loss"),
),
)
conn.commit()
logger.info(f"Inserted speedtest data at ts={data.get('ts')}")
conn.close()
def insert_ping(db_path: str, data: Dict[str, Any]):
conn = _connect(db_path)
cur = conn.cursor()
cur.execute(
"""
INSERT INTO pings (ts, target, sent, received, loss_pct, min_ms, avg_ms, max_ms, stddev_ms)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data.get("ts"),
data.get("target"),
data.get("sent"),
data.get("received"),
data.get("loss_pct"),
data.get("min_ms"),
data.get("avg_ms"),
data.get("max_ms"),
data.get("stddev_ms"),
),
)
conn.commit()
logger.info(
f"Inserted ping data for target={data.get('target')} at ts={data.get('ts')}"
)
conn.close()
def insert_dnscheck(db_path: str, data: Dict[str, Any]):
conn = _connect(db_path)
cur = conn.cursor()
cur.execute(
"INSERT INTO dnschecks (ts, resolver, domain, status, query_time_ms) VALUES (?, ?, ?, ?, ?)",
(
data.get("ts"),
data.get("resolver"),
data.get("domain"),
data.get("status"),
data.get("query_time_ms"),
),
)
conn.commit()
logger.info(
f"Inserted DNS check data for resolver={data.get('resolver')} and domain={data.get('domain')} at ts={data.get('ts')}"
)
conn.close()
def _rows_to_dicts(rows) -> List[Dict[str, Any]]:
return [dict(r) for r in rows]
def query_speedtests(db_path: str, since: Optional[int] = None):
"""Query speedtests optionally filtering by a lower bound timestamp (unix seconds).
Returned list is ascending by ts for easier plotting.
"""
conn = _connect(db_path)
cur = conn.cursor()
if since is not None:
cur.execute(
"SELECT * FROM speedtests WHERE ts >= ? ORDER BY ts DESC",
(since,),
)
else:
cur.execute("SELECT * FROM speedtests ORDER BY ts DESC")
rows = cur.fetchall()
conn.close()
logger.info(f"Queried {len(rows)} speedtest records from database (since={since}).")
return _rows_to_dicts(rows)[::-1] # ascending by time for charts
def query_pings(db_path: str, since: Optional[int] = None):
conn = _connect(db_path)
cur = conn.cursor()
if since is not None:
cur.execute(
"SELECT * FROM pings WHERE ts >= ? ORDER BY ts DESC",
(since,),
)
else:
cur.execute("SELECT * FROM pings ORDER BY ts DESC")
rows = cur.fetchall()
conn.close()
logger.info(f"Queried {len(rows)} ping records from database (since={since}).")
return _rows_to_dicts(rows)[::-1]
def query_dns(db_path: str, since: Optional[int] = None):
conn = _connect(db_path)
cur = conn.cursor()
if since is not None:
cur.execute(
"SELECT * FROM dnschecks WHERE ts >= ? ORDER BY ts DESC",
(since,),
)
else:
cur.execute("SELECT * FROM dnschecks ORDER BY ts DESC")
rows = cur.fetchall()
conn.close()
logger.info(f"Queried {len(rows)} DNS check records from database (since={since}).")
return _rows_to_dicts(rows)[::-1]