Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions models/article.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from pydantic import BaseModel, field_validator
from datetime import datetime
from typing import List, Optional
import hashlib


class NormalizedArticle(BaseModel):
"""
Unified article schema used across ingestion, retrieval, and agent pipeline.

This ensures:
- deterministic IDs (no duplication in Pinecone)
- cross-source deduplication via content_hash
- standardized structure for all ingestion sources
"""

id: str
title: str
source: str
source_type: str # 'rss' | 'scraper' | future: 'reddit', 'nvd'
credibility_tier: int # 1 (high), 2 (medium), 3 (low)
published_at: datetime
content: str
tags: List[str]
content_hash: str
cvss_score: Optional[float] = None

# -----------------------------
# VALIDATORS
# -----------------------------
@field_validator("credibility_tier")
def validate_tier(cls, v):
if v not in [1, 2, 3]:
raise ValueError("credibility_tier must be 1, 2, or 3")
return v

# -----------------------------
# STATIC HELPERS
# -----------------------------
@staticmethod
def generate_id(source_url: str) -> str:
"""Deterministic ID based on source URL"""
return hashlib.md5(source_url.encode()).hexdigest()

@staticmethod
def compute_content_hash(title: str, content: str) -> str:
"""Hash for deduplication across sources"""
combined = (title + content).encode()
return hashlib.sha256(combined).hexdigest()

# -----------------------------
# FACTORY METHODS
# -----------------------------
@classmethod
def from_scraper_dict(cls, data: dict):
"""Create article from existing scraper output"""
source_url = data.get("url")

return cls(
id=cls.generate_id(source_url),
title=data.get("title", ""),
source=data.get("source", "unknown"),
source_type="scraper",
credibility_tier=2,
published_at=data.get("published_at"),
content=data.get("content", ""),
tags=data.get("tags", []),
content_hash=cls.compute_content_hash(
data.get("title", ""), data.get("content", "")
),
cvss_score=None,
)

@classmethod
def from_rss_entry(cls, entry):
"""Create article from RSS feed entry"""
source_url = entry.link

return cls(
id=cls.generate_id(source_url),
title=entry.title,
source=getattr(entry, "source", "unknown"),
source_type="rss",
credibility_tier=1,
published_at=datetime(*entry.published_parsed[:6]),
content=getattr(entry, "summary", ""),
tags=[],
content_hash=cls.compute_content_hash(
entry.title, getattr(entry, "summary", "")
),
cvss_score=None,
)
82 changes: 82 additions & 0 deletions tests/test_article_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import pytest
from datetime import datetime
from models.article import NormalizedArticle


def test_generate_id_consistency():
url = "https://example.com/article"
id1 = NormalizedArticle.generate_id(url)
id2 = NormalizedArticle.generate_id(url)
assert id1 == id2


def test_content_hash_consistency():
title = "Test Title"
content = "Test Content"
h1 = NormalizedArticle.compute_content_hash(title, content)
h2 = NormalizedArticle.compute_content_hash(title, content)
assert h1 == h2


def test_valid_article_creation():
article = NormalizedArticle(
id="123",
title="Test",
source="bleepingcomputer",
source_type="rss",
credibility_tier=1,
published_at=datetime.now(),
content="Some content",
tags=[],
content_hash="abc",
cvss_score=None,
)
assert article.title == "Test"


def test_invalid_credibility_tier():
with pytest.raises(ValueError):
NormalizedArticle(
id="123",
title="Test",
source="x",
source_type="rss",
credibility_tier=5,
published_at=datetime.now(),
content="text",
tags=[],
content_hash="abc",
)


def test_from_scraper_dict():
data = {
"url": "https://example.com",
"title": "Sample",
"source": "testsource",
"published_at": datetime.now(),
"content": "content here",
"tags": ["security"],
}

article = NormalizedArticle.from_scraper_dict(data)

assert article.id is not None
assert article.content_hash is not None
assert article.source_type == "scraper"


class MockEntry:
def __init__(self):
self.link = "https://rss.com/article"
self.title = "RSS Title"
self.summary = "RSS content"
self.published_parsed = (2024, 1, 1, 0, 0, 0)


def test_from_rss_entry():
entry = MockEntry()
article = NormalizedArticle.from_rss_entry(entry)

assert article.source_type == "rss"
assert article.id is not None
40 changes: 40 additions & 0 deletions tests/test_query_preprocessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from utils.query_preprocessor import preprocess_query


def test_cve_variants():
inputs = [
"cve 2024 1234",
"CVE-2024-1234",
"cve2024-1234",
"CVE_2024_1234",
"cve 2024-1234"
]

for inp in inputs:
result = preprocess_query(inp)
assert "CVE-2024-1234" in result


def test_stopwords_removed():
text = "latest ransomware news update"
result = preprocess_query(text)

assert "latest" not in result
assert "news" not in result
assert "update" not in result


def test_lowercase_and_spacing():
text = " Latest CVE 2024 1234 "
result = preprocess_query(text)

assert result == "CVE-2024-1234"


def test_full_pipeline():
text = "Latest CVE 2024 1234 ransomware news"
result = preprocess_query(text)

assert "CVE-2024-1234" in result
assert "ransomware" in result
assert "latest" not in result
1 change: 1 addition & 0 deletions utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Makes utils a Python package
56 changes: 56 additions & 0 deletions utils/query_preprocessor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import re

# Generic words that add noise to queries
STOPWORDS = {
"news", "latest", "recent", "update", "article",
"report", "today", "current", "new", "blog", "post"
}


def normalize_query(text: str) -> str:
"""
Lowercase + remove extra whitespace
"""
text = text.lower()
text = re.sub(r"\s+", " ", text).strip()
return text


def standardize_cve_patterns(text: str) -> str:
"""
Normalize different CVE formats to: CVE-YYYY-NNNN
Handles:
- cve 2024 1234
- CVE-2024-1234
- cve2024-1234
- CVE_2024_1234
"""

def replacer(match):
year = match.group(1)
num = match.group(2)
return f"CVE-{year}-{num}"

pattern = r"cve[\s_-]?(\d{4})[\s_-]?(\d{4,7})"
text = re.sub(pattern, replacer, text, flags=re.IGNORECASE)

return text


def filter_stopwords(text: str) -> str:
"""
Remove generic noise words
"""
words = text.split()
filtered = [w for w in words if w not in STOPWORDS]
return " ".join(filtered)


def preprocess_query(text: str) -> str:
"""
Full preprocessing pipeline
"""
text = normalize_query(text)
text = standardize_cve_patterns(text)
text = filter_stopwords(text)
return text