Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions caddy_scraper/opensearch_document_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import asyncio

import glob
import json
import os
from typing import List, Callable, Any

from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.document_loaders import DataFrameLoader
from langchain.embeddings import BedrockEmbeddings
from opensearchpy import OpenSearch, helpers
import pandas as pd


class OpenSearchDocumentManager:
def __init__(self, client: OpenSearch, index_name: str = "caddy-hybrid-search-index"):
self.client = client
self.index_name = index_name
self.embedding_model = BedrockEmbeddings(
model_id="cohere.embed-english-v3", region_name="eu-west-3"
)

def create_index(self):
index_body = {
"settings": {
"index": {"knn": True}
},
"mappings": {
"properties": {
"text": {"type": "text"},
"source": {"type": "keyword"},
"domain": {"type": "keyword"},
"text_vector": {
"type": "knn_vector",
"dimension": 1024,
"method": {
"engine": "faiss",
"space_type": "l2",
"name": "hnsw",
"parameters": {}
}
}
}
}
}
# Delete index if it exists
if self.client.indices.exists(index=self.index_name):
self.client.indices.delete(index=self.index_name)
# Create new index
self.client.indices.create(index=self.index_name, body=index_body)

async def async_bulk_upload(self, file_path: str, domain: str = "citizen-advice"):
json_files = glob.glob(os.path.join(file_path, "scrape_result_*.json"))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

intermediate disk step — we could also stream out of the scraper!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a cool idea, we could get rid of the distinction between scraper/document_manager and just stream everything

for file in json_files:
with open(file) as f:
df = pd.DataFrame(json.load(f))

text_splitter = RecursiveCharacterTextSplitter(
chunk_size=2048,
chunk_overlap=100,
length_function=len,
)

loader = DataFrameLoader(df, page_content_column="markdown")
docs = loader.load()

# Citizen Advice Specific Logic to remove low quality docs
docs = [d for d in docs if d.metadata['markdown_length'] > 1000]
docs = [d for d in docs if "cymraeg" not in d.metadata['source']]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Being site-specific, this could be in the scraper in due course I guess?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100%, it's included so we have a semi-permenant record of it (before the scraper refactor)


docs = text_splitter.split_documents(docs)

embeddings = await self._gather_with_concurrency(
10,
*[
self.embedding_model.aembed_documents(
[d.page_content for d in docs]
)
],
)
success, failed = helpers.bulk(
self.client,
self._generate_bulk_actions(docs, embeddings, domain)
)
print(f"File {file} - Uploaded: {success}, Failed: {failed}")

async def _gather_with_concurrency(
self, concurrency: int, *coroutines: List[Callable]
) -> List[Any]:
"""Run a number of async coroutines with a concurrency limit.

Args:
concurrency (int): max number of concurrent coroutine runs.
coroutines (List[Callable]): list of coroutines to run asynchronously.

Returns:
List[Any]: list of coroutine results.
"""
semaphore = asyncio.Semaphore(concurrency)

async def semaphore_coroutine(coroutines):
async with semaphore:
return await coroutines

return await asyncio.gather(*(semaphore_coroutine(c) for c in coroutines))

def _generate_bulk_actions(self, documents, embeddings, domain="citizen-advice"):
for i, (doc, vector) in enumerate(zip(documents, embeddings[0])):
action = {
"_index": self.index_name,
"_id": str(i),
"_source": {
"text": doc.page_content,
"text_vector": vector,
"source": doc.metadata["source"],
"domain": domain
}
}
yield action
51 changes: 51 additions & 0 deletions caddy_scraper/opensearch_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import asyncio

from opensearchpy import OpenSearch

from opensearch_document_manager import OpenSearchDocumentManager
from opensearch_query_engine import OpenSearchQueryEngine
from caddy_scraper import CaddyScraper

client = OpenSearch(
hosts = [{'host': 'localhost', 'port': 9200}],
http_auth = ('admin', 'Caddy_14211'),
use_ssl = False,
verify_certs = False,
)

async def upload_docs():
# Upload documents to OpenSearch
doc_manager = OpenSearchDocumentManager(client, index_name="caddy-hybrid-search-index")
doc_manager.create_index()
await doc_manager.async_bulk_upload(file_path="ca_test_scrape")


if __name__ == "__main__":
# Scrape Documents
scraper = CaddyScraper(
base_url="https://www.citizensadvice.org.uk/",
sitemap_url="https://www.citizensadvice.org.uk/sitemap.xml",
crawling_method='sitemap',
output_dir='ca_test_scrape',
div_ids=["main-content", "cads-main-content"],
div_classes=["main-content", "cads-main-content"],
scrape_depth=1
)
scraper.run()

# Upload Documents
asyncio.run(upload_docs())


# Query Documents
query_engine = OpenSearchQueryEngine(
client,
index_name="caddy-hybrid-search-index",
lexical_weight=0.3,
vector_weight=0.7
)
results = query_engine.submit_hybrid_search(query_text="i am getting evicted for no reason what can I do", keywords="eviction", n_results=5)
print(results)



144 changes: 144 additions & 0 deletions caddy_scraper/opensearch_query_engine.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from opensearchpy import OpenSearch
import requests
from langchain_community.embeddings import BedrockEmbeddings
from langchain.schema import Document
from typing import List


class OpenSearchQueryEngine:
def __init__(
self,
client: OpenSearch,
index_name: str = "caddy-hybrid-search-index",
lexical_weight: float = 0.3,
vector_weight: float = 0.7
):
self.client = client
self.index_name = index_name
self.embedding_model = BedrockEmbeddings(
model_id="cohere.embed-english-v3", region_name="eu-west-3"
)
# Create the search pipeline for normalization
self._create_search_pipeline(lexical_weight, vector_weight)

def _create_search_pipeline(self, lexical_weight: float, vector_weight: float):
"""Create a search pipeline with normalization processor"""
pipeline_config = {
"description": "Post processor for hybrid search",
"phase_results_processors": [{
"normalization-processor": {
"normalization": {
"technique": "min_max"
},
"combination": {
"technique": "arithmetic_mean",
"parameters": {
"weights": [lexical_weight, vector_weight]
}
}
}
}]
}

try:
self.client.transport.perform_request(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're currently configuring each time we construct an engine, but this preliminary setup for the OpenSearch instance?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not wedded to where this happens. In this PR I tried to make a distinction between upload (document-management) and retrieval (query_engine) and this felt more closely related to the later. Only benefit to keeping it in a query_engine is the ability to tweak hybrid search weightings at retrieval time. I agree that once we have stopped experimenting it makes sense to move this to a separate configuration process

'PUT',
'/_search/pipeline/hybrid-search-pipeline',
body=pipeline_config
)
except Exception as e:
print(f"Warning: Failed to create search pipeline: {e}")

def submit_hybrid_search(self, query_text: str, keywords: str, n_neighbours: int = 2, n_results: int = 5):
"""Perform hybrid search combining lexical and vector search with normalization.

Args:
query_text (str): Text to generate vector embedding from
keywords (str): Keywords for lexical search
n_neighbours (int): Number of nearest neighbors for KNN
n_results (int): Number of results to return

Returns:
dict: OpenSearch response containing search results
"""
query_vector = self.embedding_model.embed_query(query_text)

search_query = {
"size": n_results,
"query": {
"hybrid": {
"queries": [
{
"match": {
"text": {
"query": keywords
}
}
},
{
"knn": {
"text_vector": {
"vector": query_vector,
"k": n_neighbours
}
}
}
]
}
}
}

# Use the search pipeline for normalization
response = self.client.search(
body=search_query,
index=self.index_name,
search_pipeline="hybrid-search-pipeline"
)
return self.convert_opensearch_to_langchain(response)

def submit_vector_search(self, query_text: str, n_results: int = 5):
"""Perform pure vector search without lexical matching.

Args:
query_text (str): The search query text
n_results (int): Number of results to return

Returns:
dict: OpenSearch response containing search results
"""
query_vector = self.embedding_model.embed_query(query_text)

search_query = {
"size": n_results,
"query": {
"knn": {
"text_vector": {
"vector": query_vector,
"k": n_results,
}
}
}
}

response = self.client.search(
body=search_query,
index=self.index_name
)
return self.convert_opensearch_to_langchain(response)

def convert_opensearch_to_langchain(self, opensearch_results) -> List[Document]:
documents = []

for hit in opensearch_results['hits']['hits']:
source = hit['_source']

doc = Document(
page_content= source['text'],
metadata={
'score': hit['_score'],
'source': source['source'],
}
)
documents.append(doc)

return documents