Skip to content
This repository was archived by the owner on Jun 11, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import tiktoken
from typing import List
from langchain_text_splitters import RecursiveCharacterTextSplitter

default_encoder = "cl100k_base"

encoding = tiktoken.get_encoding(default_encoder)


text_splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=20,
length_function=len,
is_separator_regex=False,
)


markdown_splitter_list = []


def num_tokens_from_string(string: str, encoding_name: str) -> int:
"""Returns the number of tokens in a text string."""
encoding = tiktoken.get_encoding(encoding_name)
num_tokens = len(encoding.encode(string))
return num_tokens


def split_strings_by_logic(
list_of_string: List[str],
max_tokens: int,
list_of_splitters: List[str],
final_splitter: str,
) -> List[str]:
"""Splits a markdown string into a list of strings based on the logic of the markdown."""

# Create an empty list to store the resulting strings
result = []

# Iterate over each markdown string in the list
for markdown in list_of_string:
# Get the number of tokens in the markdown string
num_tokens = num_tokens_from_string(markdown, default_encoder)

# If the number of tokens is less than or equal to max_tokens, add the markdown string to the result list
if num_tokens <= max_tokens:
result.append(markdown)
else:
# Split the markdown string using each splitter in the list
split_strings = [
splitter.split_text(markdown) for splitter in list_of_splitters
]

# Flatten the list of split strings
split_strings = [
split_string for sublist in split_strings for split_string in sublist
]

# Check if any of the resulting strings are shorter than or equal to max_tokens
for split_string in split_strings:
if num_tokens_from_string(split_string, default_encoder) <= max_tokens:
result.append(split_string)

# If none of the splitters resulted in strings shorter than or equal to max_tokens, split using the final_splitter
if len(result) == 0:
split_strings = final_splitter.split_text(markdown)
result.extend(split_strings)

# Return the list of resulting strings
return result


def add_token_length_to_document(document):
"""Adds the number of tokens in a document to the document."""

document.metadata["token_count"] = num_tokens_from_string(
document.page_content, default_encoder
)

return document


def split_documents_by_chunker(
documents: List, max_tokens: int, final_splitter
) -> List[str]:
# apply add_token_length_to_document to each document in the list

document_list = [add_token_length_to_document(doc) for doc in documents]

print("total document count:", len(document_list))

list_of_too_long_docs = [
doc for doc in document_list if doc.metadata["token_count"] > max_tokens
]

list_of_short_docs = [
doc for doc in document_list if doc.metadata["token_count"] <= max_tokens
]

print("too long document count:", len(list_of_too_long_docs))

if len(list_of_too_long_docs) > 0:
shortened_documents = final_splitter.split_documents(list_of_too_long_docs)
new_short_docs = [
add_token_length_to_document(doc) for doc in shortened_documents
]
list_of_short_docs.extend(new_short_docs)
return list_of_short_docs

return list_of_short_docs
154 changes: 130 additions & 24 deletions core_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
from joblib import Memory
from langchain_community.document_loaders import AsyncHtmlLoader, DataFrameLoader
from langchain_community.document_transformers import BeautifulSoupTransformer

from generate_vectorstore import vectorstore
from chunking import split_documents_by_chunker, text_splitter
from opensearchpy import helpers
import time
import functools
import traceback


LOCATION = "./cachedir"
Expand Down Expand Up @@ -64,6 +67,47 @@ def wrapper_retry(*args, **kwargs):
return decorator_retry


def remove_duplicate_chunks(markdown_text):
# Split the text into chunks based on line breaks and heading markers
chunks = re.split(r"\n\s*\n+|\n(?=#)", markdown_text)

# Create a set to store unique chunks
unique_chunks = set()

# Create a list to store the result
result = []

# Iterate through the chunks
for chunk in chunks:
# If the chunk is not empty and not a duplicate, add it to the result
if chunk.strip() and chunk.strip() not in unique_chunks:
unique_chunks.add(chunk.strip())
result.append(chunk)

# Join the remaining chunks back into a single string
return "\n\n".join(result)


def remove_anchor_urls(urls):
"""
Removes anchor URLs (URLs with a # followed by text at the end) from a list of URLs.

Args:
urls (list): A list of URLs (strings).

Returns:
list: A new list containing only the URLs that are not anchor URLs.
"""
anchor_pattern = re.compile(r"#.*$")
cleaned_urls = []

for url in urls:
if not anchor_pattern.search(url):
cleaned_urls.append(url)

return cleaned_urls


def crawl_url_batch(
url_list: List,
domain_description: str,
Expand Down Expand Up @@ -91,25 +135,27 @@ def crawl_url_batch(

main_section_html = ""

# Extract html content by div ids
if div_ids:
for div_id in div_ids:
selected_div_id_html = soup.find("div", id=div_id)
if selected_div_id_html:
main_section_html += str(selected_div_id_html)

# Extract html content by div classes
if div_classes:
for div_class in div_classes:
selected_div_classes_html = soup.find("div", class_=div_class)
if selected_div_classes_html:
main_section_html += str(selected_div_classes_html)

if not main_section_html:
main_section_html = str(soup)

# page content
current_page_markdown = html2text.html2text(str(main_section_html))
page_dict = {"source_url": current_url, "markdown": current_page_markdown}
scraped_pages.append(page_dict)
# Parse the combined HTML content again to extract paragraphs
if len(main_section_html) > 0:
main_section_soup = BeautifulSoup(main_section_html, "html.parser")
paragraphs_in_main = main_section_soup.find_all("p")
current_page_markdown = html2text.html2text(str(paragraphs_in_main))
page_dict = {"source_url": current_url, "markdown": current_page_markdown}
scraped_pages.append(page_dict)

document_df = pd.DataFrame(scraped_pages)

Expand All @@ -121,6 +167,12 @@ def crawl_url_batch(
unique_pages["scraped_at"] = pd.to_datetime("today")
unique_pages["updated_at"] = pd.to_datetime("today")

# TODO REMOVE this once debugging done
unique_pages.to_csv("unique_pages.csv")

# apply remove_duplicate_markdown_chunks to each row
unique_pages["markdown"] = unique_pages["markdown"].apply(remove_duplicate_chunks)

dataframe_loader = DataFrameLoader(unique_pages, page_content_column="markdown")

docs_to_upload = dataframe_loader.load()
Expand All @@ -140,7 +192,9 @@ def get_sitemap(url):

response = requests.get(url) # nosec
response.raise_for_status() # Ensure we get a valid response or raise an HTTPError
response.encoding = response.apparent_encoding # Set the apparent encoding if not provided
response.encoding = (
response.apparent_encoding
) # Set the apparent encoding if not provided
xml = BeautifulSoup(response.content, "lxml-xml")
return xml

Expand Down Expand Up @@ -208,20 +262,59 @@ def generate_vectorstore():


@retry()
def add_document_list_to_vectorstore(document_list, vectorstore, bulk_size=20000):
"""Takes a list of documents, and adds them to the vectorstore in bulk
def add_document_list_to_vectorstore(
document_list, vectorstore, batch_size=500, retry_count=3, maxiumum_token_length=512
):
"""Takes a list of documents, and adds them to the vectorstore in batches

Args:
document_list (list): list of documents
vectorstore (vectorstore): vectorstore to add the documents to
bulk_size (int, optional): the size of the bulk to add the documents in
batch_size (int, optional): the size of each batch to add the documents in
retry_count (int, optional): the number of times to retry adding documents in case of failure

Returns:
added_docs (int): the number of documents added to the vectorstore
"""

added_docs = vectorstore.add_documents(document_list, bulk_size=bulk_size)
print("beginning to shorten long documents")
print("initial document count:", len(document_list))

shortened_documents = split_documents_by_chunker(
document_list, maxiumum_token_length, text_splitter
)

print("shortened document count:", len(shortened_documents))
document_list = shortened_documents

added_docs = 0
num_docs = len(document_list)
num_batches = (num_docs // batch_size) + 1

print("ending length of document list:", len(document_list))

print(f"Adding {num_docs} documents to the vectorstore in {num_batches} batches")

for i in range(num_batches):
print(f"Adding batch {i+1} of {num_batches}")
start = i * batch_size
end = min((i + 1) * batch_size, num_docs)
batch = document_list[start:end]

retry = 0
while retry < retry_count:
try:
vectorstore.add_documents(batch, bulk_size=2000)
added_docs += len(batch)
print("added batch", i + 1)
break
except Exception:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of interest, is there one particular type of error that causes this process to fail?

retry += 1
print(f"Failed to add batch {i+1}. Retrying... ({retry}/{retry_count})")
# print the full text of the exception
print(traceback.format_exc())
time.sleep(1)

return added_docs


Expand Down Expand Up @@ -268,7 +361,6 @@ def sitemap_to_dataframe(xml, name=None, verbose=False):
return df



def get_all_urls(url, domains_to_exclude=None):
"""Return a dataframe containing all of the URLs from a site's XML sitemaps.

Expand Down Expand Up @@ -449,20 +541,26 @@ def scrape_url_list(base_url, url_list, authentication_cookie=None):
return unique_pages, links


def delete_duplicate_urls_from_store(vectorstore):
"""Looks for duplicate source urls in the Opensearch vectorstore, and removes them, keeping only the most recent based on metadata.time_scraped"""
def delete_duplicate_chunks_from_store(vectorstore):
"""Looks for duplicate source urls and text chunks in the Opensearch vectorstore, and removes them, keeping only the most recent based on metadata.time_scraped"""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I right in thinking the process follows these steps:

  • scrape website
  • upload new scrape to vectorstore
  • check for matching chunks in vector store
  • delete old chunks if a new chunk has been found.

Are the errors you encounter when uploading documents to the vector store the main reason you don't: delete all entries in the vector store from the old scrape of the same website before uploading the new scrape?


index_name = vectorstore.index_name

# Step 1: Aggregate to find potential duplicates
agg_query = {
"size": 0,
"aggs": {
"duplicate_urls": {
"terms": {
"field": "metadata.source_url.keyword",
"min_doc_count": 2,
"size": 10000, # Adjust size based on expected number of unique URLs
"duplicate_chunks": {
"composite": {
"sources": [
{
"source_url": {
"terms": {"field": "metadata.source_url.keyword"}
}
},
{"text": {"terms": {"field": "text.keyword"}}},
],
"size": 10000, # Adjust size based on expected number of unique chunks
}
}
},
Expand All @@ -473,13 +571,21 @@ def delete_duplicate_urls_from_store(vectorstore):
# Step 2: For each duplicate, find the most recent document and prepare to delete the rest
delete_candidates = []

for bucket in agg_result["aggregations"]["duplicate_urls"]["buckets"]:
url = bucket["key"]
for bucket in agg_result["aggregations"]["duplicate_chunks"]["buckets"]:
source_url = bucket["key"]["source_url"]
text = bucket["key"]["text"]

# Find documents with this URL, sorted by time_scraped in descending order
# Find documents with this URL and text, sorted by time_scraped in descending order
search_query = {
"size": bucket["doc_count"],
"query": {"term": {"metadata.source_url.keyword": url}},
"query": {
"bool": {
"must": [
{"term": {"metadata.source_url.keyword": source_url}},
{"term": {"text.keyword": text}},
]
}
},
"sort": [
{"metadata.scraped_at": {"order": "desc"}} # Sort by time_scraped
],
Expand Down
11 changes: 7 additions & 4 deletions generate_vectorstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@


from langchain_community.embeddings import BedrockEmbeddings
from langchain_community.embeddings import HuggingFaceEmbeddings

from langchain_community.vectorstores import OpenSearchVectorSearch


from dotenv import load_dotenv

load_dotenv()
opensearch_https = os.environ.get("OPENSEARCH_HTTPS")
# embedding_model_name = os.environ.get("HF_EMBEDDING_MODEL")
bedrock_model_id = os.environ.get("BEDROCK_MODEL_ID")

region = "eu-west-3"
Expand All @@ -28,7 +26,12 @@
refreshable_credentials=credentials,
)

embedding_model = BedrockEmbeddings(model_id = bedrock_model_id, region_name=region)
embedding_model = BedrockEmbeddings(
model_id=bedrock_model_id,
region_name=region,
model_kwargs={"input_type": "search_document", "truncate": "END"},
)


def generate_rolling_vectorstore_name():
"""Generates a name name for the index based on todays day
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ python-dotenv
lxml
requests-aws4auth
pre-commit
tiktoken
Loading