Skip to content

Commit e74e80e

Browse files
committed
Add confidence-scored DNS monitor with NS overrides, DKIM selectors, and RDAP-first expiry
1 parent 320ceb4 commit e74e80e

5 files changed

Lines changed: 370 additions & 1 deletion

File tree

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
# Ignore outputs and temp files
22
*.log
33
*.csv
4-
*.json
54
*.out
65

6+
# Output JSON (keep config JSON tracked)
7+
output*.json
8+
report*.json
9+
710
# Python cache
811
__pycache__/
912
*.pyc

README.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,27 @@ A DNS reconnaissance and mail-security toolkit for rapid defensive triage workfl
2020
- DNS posture checks across NS, MX, TXT, CAA, DMARC, and SPF
2121
- Bulk lookup workflows for subdomains and registration checks
2222
- Cloudflare detection with resolver and DoH fallback
23+
- `domain-security-monitor.py` for confidence-scored domain monitoring with structured JSON output
24+
25+
### New monitor quick start
26+
27+
```bash
28+
python3 ./domain-security-monitor.py --domain example.com --output json
29+
python3 ./domain-security-monitor.py --input-file domains.txt --output json
30+
```
31+
32+
Configuration files:
33+
34+
- `config/expected_ns.json`: default nameservers plus per-domain overrides
35+
- `config/dkim_selectors.json`: per-domain authoritative DKIM selector hints
36+
37+
The monitor includes:
38+
39+
- per-domain expected nameserver compliance
40+
- DKIM selector-aware checks with confidence labels
41+
- RDAP-first expiry lookup with WHOIS fallback
42+
- retry and backoff on DNS and HTTP operations
43+
- status and confidence metadata on each signal
2344

2445
## Detection notes and limitations
2546

config/dkim_selectors.json

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{
2+
"fiaformulae.com": ["k1", "k2"],
3+
"telenet.be": ["selector1", "selector2"],
4+
"virginmedia.co.uk": ["selector1", "selector2"],
5+
"sunrise.ch": ["selector1", "selector2"],
6+
"upc.sk": ["selector1", "selector2"]
7+
}

config/expected_ns.json

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"default": [
3+
"ns1.upc.biz",
4+
"ns2.upc.biz",
5+
"ns3.upc.biz"
6+
],
7+
"domain_overrides": {
8+
"fiaformulae.com": [
9+
"ns-1258.awsdns-29.org",
10+
"ns-161.awsdns-20.com",
11+
"ns-1922.awsdns-48.co.uk",
12+
"ns-580.awsdns-08.net"
13+
]
14+
}
15+
}

domain-security-monitor.py

Lines changed: 323 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,323 @@
1+
#!/usr/bin/env python3
2+
"""Domain security monitor with confidence metadata.
3+
4+
Improvements included:
5+
- Per-domain expected nameserver overrides
6+
- DKIM selector-aware checks with confidence scoring
7+
- RDAP-first expiry lookup with WHOIS fallback
8+
- Retry/backoff for DNS and HTTP calls
9+
- Structured JSON output with status/confidence/source per signal
10+
"""
11+
12+
from __future__ import annotations
13+
14+
import argparse
15+
import json
16+
import random
17+
import re
18+
import socket
19+
import subprocess
20+
import time
21+
from dataclasses import dataclass
22+
from datetime import datetime, timezone
23+
from pathlib import Path
24+
from typing import Any
25+
from urllib.parse import quote
26+
from urllib.request import Request, urlopen
27+
28+
BASE_DIR = Path(__file__).resolve().parent
29+
DEFAULT_EXPECTED_NS_FILE = BASE_DIR / "config" / "expected_ns.json"
30+
DEFAULT_DKIM_SELECTORS_FILE = BASE_DIR / "config" / "dkim_selectors.json"
31+
32+
33+
@dataclass
34+
class Signal:
35+
status: str
36+
confidence: str
37+
data_source: str
38+
details: dict[str, Any]
39+
40+
41+
def now_utc() -> str:
42+
return datetime.now(timezone.utc).isoformat()
43+
44+
45+
def norm_ns(value: str) -> str:
46+
return value.strip().rstrip(".").lower()
47+
48+
49+
def run_with_retry(cmd: list[str], retries: int = 3, timeout: int = 5) -> tuple[int, str, str, str]:
50+
last_rc = 1
51+
last_out = ""
52+
last_err = ""
53+
source = "dns"
54+
55+
for i in range(retries):
56+
try:
57+
proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
58+
last_rc = proc.returncode
59+
last_out = proc.stdout.strip()
60+
last_err = proc.stderr.strip()
61+
if last_rc == 0:
62+
return last_rc, last_out, last_err, source
63+
except Exception as exc:
64+
last_err = str(exc)
65+
66+
if i < retries - 1:
67+
time.sleep((0.25 * (i + 1)) + random.random() * 0.2)
68+
69+
return last_rc, last_out, last_err, source
70+
71+
72+
def http_json_with_retry(url: str, retries: int = 3, timeout: int = 6) -> tuple[dict[str, Any] | None, str]:
73+
source = "rdap"
74+
last_err = ""
75+
for i in range(retries):
76+
try:
77+
req = Request(url, headers={"User-Agent": "dns-analysis-monitor/1.0"})
78+
with urlopen(req, timeout=timeout) as resp:
79+
payload = json.loads(resp.read().decode("utf-8", errors="ignore"))
80+
return payload, source
81+
except Exception as exc:
82+
last_err = str(exc)
83+
if i < retries - 1:
84+
time.sleep((0.25 * (i + 1)) + random.random() * 0.2)
85+
return None, f"{source}_error:{last_err[:120]}"
86+
87+
88+
def dig(record_type: str, name: str, retries: int = 3) -> tuple[list[str], str]:
89+
rc, out, err, source = run_with_retry(["dig", "+time=2", "+tries=1", "+short", record_type, name], retries=retries, timeout=6)
90+
if rc != 0 or not out:
91+
return [], f"{source}_error:{err[:120]}" if err else source
92+
return [line.strip() for line in out.splitlines() if line.strip()], source
93+
94+
95+
def resolve_ips(domain: str) -> tuple[list[str], str]:
96+
try:
97+
_, _, ips = socket.gethostbyname_ex(domain)
98+
return sorted(set(ips)), "dns"
99+
except Exception as exc:
100+
return [], f"dns_error:{str(exc)[:120]}"
101+
102+
103+
def load_json(path: Path, default: dict[str, Any]) -> dict[str, Any]:
104+
try:
105+
if path.exists():
106+
return json.loads(path.read_text(encoding="utf-8"))
107+
except Exception:
108+
pass
109+
return default
110+
111+
112+
def check_nameservers(domain: str, expected_cfg: dict[str, Any]) -> Signal:
113+
ns_records, src = dig("NS", domain)
114+
actual = sorted(set(norm_ns(x) for x in ns_records))
115+
116+
domain_overrides = {k.lower(): [norm_ns(v) for v in vals] for k, vals in (expected_cfg.get("domain_overrides") or {}).items()}
117+
default_ns = [norm_ns(x) for x in (expected_cfg.get("default") or [])]
118+
expected = sorted(set(domain_overrides.get(domain.lower(), default_ns)))
119+
120+
if not actual:
121+
return Signal("unknown", "low", src, {"actual": [], "expected": expected, "match": None})
122+
123+
if not expected:
124+
return Signal("unknown", "medium", src, {"actual": actual, "expected": [], "match": None})
125+
126+
match = actual == expected
127+
return Signal(
128+
"pass" if match else "fail",
129+
"high",
130+
src,
131+
{"actual": actual, "expected": expected, "match": match},
132+
)
133+
134+
135+
def check_spf(domain: str) -> Signal:
136+
txt, src = dig("TXT", domain)
137+
records = [x.replace('"', "") for x in txt]
138+
spf = [r for r in records if "v=spf1" in r.lower()]
139+
if not records:
140+
return Signal("unknown", "low", src, {"present": None, "record": None})
141+
if not spf:
142+
return Signal("fail", "high", src, {"present": False, "record": None})
143+
rec = spf[0]
144+
if "-all" in rec.lower():
145+
status = "pass"
146+
elif "~all" in rec.lower() or "?all" in rec.lower():
147+
status = "warn"
148+
else:
149+
status = "warn"
150+
return Signal(status, "high", src, {"present": True, "record": rec})
151+
152+
153+
def check_dmarc(domain: str) -> Signal:
154+
txt, src = dig("TXT", f"_dmarc.{domain}")
155+
records = [x.replace('"', "") for x in txt]
156+
dmarc = [r for r in records if "v=dmarc1" in r.lower()]
157+
if not dmarc:
158+
return Signal("fail", "high", src, {"present": False, "policy": "missing", "record": None})
159+
160+
rec = dmarc[0]
161+
m = re.search(r"\bp=([a-zA-Z]+)", rec, flags=re.I)
162+
policy = (m.group(1).lower() if m else "invalid")
163+
164+
if policy == "reject":
165+
status = "pass"
166+
elif policy == "quarantine":
167+
status = "warn"
168+
elif policy == "none":
169+
status = "warn"
170+
else:
171+
status = "fail"
172+
173+
return Signal(status, "high", src, {"present": True, "policy": policy, "record": rec})
174+
175+
176+
def check_dkim(domain: str, selectors_cfg: dict[str, Any]) -> Signal:
177+
base = ["selector1", "selector2", "default", "google", "k1", "k2", "dkim", "mail", "smtp", "s1", "s2"]
178+
extra = selectors_cfg.get(domain.lower(), []) if isinstance(selectors_cfg, dict) else []
179+
selectors = []
180+
seen = set()
181+
for s in [*extra, *base]:
182+
sl = str(s).strip().lower()
183+
if not sl or sl in seen:
184+
continue
185+
seen.add(sl)
186+
selectors.append(sl)
187+
188+
hits = []
189+
sources = set()
190+
for sel in selectors:
191+
txt, src = dig("TXT", f"{sel}._domainkey.{domain}", retries=2)
192+
sources.add(src)
193+
joined = " ".join(txt).lower()
194+
if "v=dkim1" in joined or " p=" in joined or "k=rsa" in joined:
195+
hits.append(sel)
196+
197+
if hits:
198+
confidence = "high" if any(s in selectors[: max(1, len(extra))] for s in hits) and extra else "medium"
199+
return Signal("pass", confidence, "+".join(sorted(sources)), {"selectors_checked": selectors, "selectors_found": sorted(set(hits))})
200+
201+
confidence = "medium" if extra else "low"
202+
return Signal("fail", confidence, "+".join(sorted(sources)), {"selectors_checked": selectors, "selectors_found": []})
203+
204+
205+
def parse_iso_date(date_str: str) -> datetime | None:
206+
try:
207+
d = datetime.fromisoformat(date_str.replace("Z", "+00:00"))
208+
if d.tzinfo is None:
209+
d = d.replace(tzinfo=timezone.utc)
210+
return d.astimezone(timezone.utc)
211+
except Exception:
212+
return None
213+
214+
215+
def check_expiry(domain: str) -> Signal:
216+
# RDAP-first
217+
payload, src = http_json_with_retry(f"https://rdap.org/domain/{quote(domain)}")
218+
if payload:
219+
for ev in payload.get("events", []):
220+
action = str(ev.get("eventAction", "")).lower()
221+
if action in {"expiration", "expiry", "expiration date"}:
222+
dt = parse_iso_date(str(ev.get("eventDate", "")))
223+
if dt:
224+
days = (dt - datetime.now(timezone.utc)).days
225+
if days < 0:
226+
status = "fail"
227+
elif days <= 30:
228+
status = "warn"
229+
else:
230+
status = "pass"
231+
return Signal(status, "high", "rdap", {"days": days, "expiry_utc": dt.isoformat()})
232+
233+
# WHOIS fallback
234+
rc, out, err, whois_src = run_with_retry(["whois", domain], retries=2, timeout=10)
235+
text = out or ""
236+
patterns = [
237+
r"Expiry Date:\s*(.+)",
238+
r"Registrar Registration Expiration Date:\s*(.+)",
239+
r"paid-till:\s*(.+)",
240+
r"expires:\s*(.+)",
241+
]
242+
candidate = None
243+
for p in patterns:
244+
m = re.search(p, text, flags=re.I)
245+
if m:
246+
candidate = m.group(1).strip().splitlines()[0].strip()
247+
break
248+
249+
if candidate:
250+
dt = parse_iso_date(candidate)
251+
if dt:
252+
days = (dt - datetime.now(timezone.utc)).days
253+
status = "fail" if days < 0 else ("warn" if days <= 30 else "pass")
254+
return Signal(status, "medium", "whois", {"days": days, "expiry_utc": dt.isoformat()})
255+
256+
return Signal("unknown", "low", whois_src if rc == 0 else src, {"days": None, "expiry_utc": None})
257+
258+
259+
def analyse_domain(domain: str, expected_cfg: dict[str, Any], dkim_cfg: dict[str, Any]) -> dict[str, Any]:
260+
ips, ip_src = resolve_ips(domain)
261+
262+
return {
263+
"domain": domain,
264+
"generated_at_utc": now_utc(),
265+
"signals": {
266+
"ip_resolution": Signal("pass" if ips else "unknown", "high" if ips else "low", ip_src, {"ips": ips}).__dict__,
267+
"nameservers": check_nameservers(domain, expected_cfg).__dict__,
268+
"spf": check_spf(domain).__dict__,
269+
"dmarc": check_dmarc(domain).__dict__,
270+
"dkim": check_dkim(domain, dkim_cfg).__dict__,
271+
"expiry": check_expiry(domain).__dict__,
272+
},
273+
}
274+
275+
276+
def parse_args() -> argparse.Namespace:
277+
p = argparse.ArgumentParser(description="DNS analysis monitor with confidence metadata")
278+
p.add_argument("--domain", help="Single domain to analyse")
279+
p.add_argument("--input-file", help="Batch file with one domain per line")
280+
p.add_argument("--expected-ns", default=str(DEFAULT_EXPECTED_NS_FILE), help="Expected nameserver policy JSON")
281+
p.add_argument("--dkim-selectors", default=str(DEFAULT_DKIM_SELECTORS_FILE), help="Per-domain DKIM selectors JSON")
282+
p.add_argument("--output", choices=["json"], default="json")
283+
return p.parse_args()
284+
285+
286+
def load_domains(args: argparse.Namespace) -> list[str]:
287+
items: list[str] = []
288+
if args.domain:
289+
items.append(args.domain.strip().lower())
290+
if args.input_file:
291+
for ln in Path(args.input_file).read_text(encoding="utf-8").splitlines():
292+
v = ln.strip().lower()
293+
if not v or v.startswith("#"):
294+
continue
295+
items.append(v)
296+
297+
dedup = []
298+
seen = set()
299+
for d in items:
300+
if d in seen:
301+
continue
302+
seen.add(d)
303+
dedup.append(d)
304+
return dedup
305+
306+
307+
def main() -> int:
308+
args = parse_args()
309+
domains = load_domains(args)
310+
if not domains:
311+
print(json.dumps({"error": "provide --domain or --input-file"}, indent=2))
312+
return 2
313+
314+
expected_cfg = load_json(Path(args.expected_ns), {"default": [], "domain_overrides": {}})
315+
dkim_cfg = load_json(Path(args.dkim_selectors), {})
316+
317+
results = [analyse_domain(d, expected_cfg, dkim_cfg) for d in domains]
318+
print(json.dumps({"count": len(results), "results": results}, indent=2))
319+
return 0
320+
321+
322+
if __name__ == "__main__":
323+
raise SystemExit(main())

0 commit comments

Comments
 (0)