-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp_v2.py
More file actions
352 lines (294 loc) · 15 KB
/
app_v2.py
File metadata and controls
352 lines (294 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
"""
AI Real Estate Assistant - RAG Implementation (V2)
This file implements the Version 2 of the AI Real Estate Assistant, which uses
a Retrieval-Augmented Generation (RAG) approach with vector databases and
ConversationalRetrievalChain from LangChain.
Unlike V1 which uses a pandas dataframe agent, this version:
1. Uses vector databases for efficient information retrieval
2. Supports multiple LLM models (OpenAI GPT, Llama)
3. Implements streaming responses for better user experience
4. Provides source references for transparency
"""
import os # For path operations and environment variables
import utils # Custom utilities for the application
import requests # For HTTP requests to load external data
import traceback # For detailed error tracking
import validators # For validating URLs
import streamlit as st # Web UI framework
from streaming import StreamHandler # Custom handler for streaming responses
from common.cfg import * # Import configuration variables
from langchain.memory import ConversationBufferMemory # For storing conversation context
from langchain.chains import ConversationalRetrievalChain # Main RAG chain
import pandas as pd # For data manipulation
from langchain_core.documents.base import Document # LangChain document structure
from langchain_community.document_loaders.csv_loader import CSVLoader # For loading local CSV files
from langchain_community.document_loaders.dataframe import DataFrameLoader # For loading pandas DataFrames
from langchain.text_splitter import RecursiveCharacterTextSplitter # For chunking documents
from langchain_community.vectorstores import DocArrayInMemorySearch # In-memory vector store
# Configure the Streamlit page
st.set_page_config(page_title="🦾 AI Real Estate Assistant", page_icon='💬', layout='wide')
st.header('Chat with Real Estate AI Assistant') # Main heading
st.write('[](https://github.com/AleksNeStu/ai-real-estate-assistant)') # GitHub link
class ChatbotWeb:
"""
Main class that implements the RAG-based chatbot for real estate data.
This class handles:
1. Loading and processing CSV data from URLs
2. Setting up vector stores for efficient retrieval
3. Configuring the LLM and embedding models
4. Managing the conversational interface
5. Displaying source references for transparency
"""
def __init__(self):
"""
Initialize the ChatbotWeb instance.
This method:
1. Synchronizes Streamlit session state variables
2. Configures the Language Model (LLM) based on user selection
3. Sets up the embedding model for vector searches
"""
utils.sync_st_session() # Ensure session state consistency
self.llm = utils.configure_llm() # Set up the language model (OpenAI or Llama)
self.embedding_model = utils.configure_embedding_model() # Set up embeddings for vector search
def scrape_website(self, url):
"""
Scrape content from a website using a proxy service.
This method uses r.jina.ai as a proxy to fetch web content,
which helps avoid CORS issues and standardizes web scraping.
Parameters:
url (str): URL to scrape content from
Returns:
str: The scraped content or None if an error occurs
"""
content = ""
try:
base_url = "https://r.jina.ai/" # Proxy service to avoid CORS issues
final_url = base_url + url
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0'
}
response = requests.get(final_url, headers=headers)
content = response.text
return content
except Exception as e:
traceback.print_exc() # Print detailed error information
def load_docs_from_csv_local(self, path):
"""
Load documents from a local CSV file.
Uses LangChain's CSVLoader to convert each row in the CSV file
to a Document object that can be processed by the RAG pipeline.
Parameters:
path (str): Path to the local CSV file
Returns:
list: List of Document objects or None if an error occurs
"""
content = ""
try:
loader = CSVLoader(path) # Initialize loader for CSV
docs = loader.load() # Load CSV into Document objects
return docs
except Exception as e:
traceback.print_exc() # Print detailed error information
def load_docs_from_csv_web(self, url):
"""
Load documents from a CSV file hosted on the web.
First downloads the CSV file as a pandas DataFrame, then converts
each row to a Document object using DataFrameLoader.
Parameters:
url (str): URL to the CSV file
Returns:
list: List of Document objects or None if an error occurs
"""
try:
df = pd.read_csv(url) # Download and parse CSV
loader = DataFrameLoader(data_frame=df) # Convert DataFrame to Documents
docs = loader.load()
return docs
except Exception as e:
traceback.print_exc()
def load_data_from_csv_web(self, url):
"""
Load data from a web-hosted CSV and convert it to a string representation.
This method is optimized for RAG processing by converting the CSV data
to a dictionary string representation that can be easily chunked and embedded.
Parameters:
url (str): URL to the CSV file
Returns:
str: String representation of the CSV data or None if an error occurs
"""
try:
df = pd.read_csv(url) # Download and parse CSV
# Convert to dictionary format for better structure in embeddings
# Alternative formats are commented out:
# content = df.to_string(index=False)
# content = '\n'.join(df['content'].tolist())
content = str(df.to_dict(orient='records')) # List of row dictionaries
return content
except Exception as e:
traceback.print_exc()
@st.cache_resource(show_spinner='Analyzing csv data set', ttl=3600)
def setup_vectordb(_self, websites):
"""
Set up a vector database from the provided website URLs.
This method:
1. Loads CSV data from each URL
2. Creates Document objects with metadata
3. Splits documents into chunks for better retrieval
4. Creates and returns a vector database for semantic search
The function is cached using Streamlit's cache_resource for performance,
with a time-to-live (TTL) of 1 hour before it needs to be refreshed.
Parameters:
_self: The ChatbotWeb instance
websites (list): List of URLs to CSV files
Returns:
DocArrayInMemorySearch: A vector database for document retrieval
"""
# Scrape and load documents
docs = []
for url in websites:
# Alternative approach using local files (commented out):
# loader = CSVLoader('./ai-real-estate-assistant/dataset/pl/apartments_rent_pl_2024_01.csv')
# docs.extend(loader.load())
# Create a Document object for each website with source metadata
docs.append(Document(
page_content=_self.load_data_from_csv_web(url), # Content from CSV
metadata={"source": url} # Track source URL for citations
)
)
# Split documents into smaller chunks for better retrieval
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # Split into chunks of 1000 characters
chunk_overlap=200 # With 200 character overlap between chunks
)
splits = text_splitter.split_documents(docs)
# Create in-memory vector database using DocArrayInMemorySearch
# Uses the embedding model configured in __init__
vectordb = DocArrayInMemorySearch.from_documents(splits, _self.embedding_model)
# Alternative vector store implementation (commented out):
# from langchain_community.vectorstores import Chroma
# from langchain_openai import OpenAIEmbeddings
# vectordb = Chroma.from_documents(splits, OpenAIEmbeddings())
return vectordb
def setup_qa_chain(self, vectordb):
"""
Set up a Conversational Retrieval Chain for question answering.
This method configures:
1. A retriever from the vector database with Maximum Marginal Relevance (MMR)
2. Conversation memory to maintain context across interactions
3. A QA chain that combines the retriever, memory, and LLM
Parameters:
vectordb: The vector database to use for retrieval
Returns:
ConversationalRetrievalChain: The configured QA chain
"""
# Define retriever using Maximum Marginal Relevance (MMR) for diverse results
retriever = vectordb.as_retriever(
search_type='mmr', # MMR helps ensure diversity in retrieved documents
search_kwargs={
'k': 2, # Return 2 most relevant documents
'fetch_k': 4 # Fetch 4 candidates before selecting the 2 most diverse
}
)
# Setup memory for contextual conversation
memory = ConversationBufferMemory(
memory_key='chat_history', # Key used to access chat history in the chain
output_key='answer', # Key used to store the final answer
return_messages=True # Return chat history as message objects
)
# Setup QA chain that combines the LLM, retriever, and memory
qa_chain = ConversationalRetrievalChain.from_llm(
llm=self.llm, # Language model configured in __init__
retriever=retriever, # Document retriever
memory=memory, # Conversation memory
return_source_documents=True, # Include source documents in the output
verbose=False # Don't print debug info
)
return qa_chain
@utils.enable_chat_history # Decorator to enable persistent chat history
def main(self):
"""
Main method to run the Streamlit application.
This method:
1. Sets up the UI for URL input
2. Handles adding and clearing data sources
3. Creates the vector database and QA chain
4. Manages the chat interface
5. Processes user queries and displays responses with citations
"""
csv_url = 'CSV Data Set URL' # Label for the URL input
# Initialize session state for websites if not already set
if "websites" not in st.session_state:
st.session_state["websites"] = [] # List to store added URLs
# Load default URLs from config
st.session_state["value_urls"] = GIT_DATA_SET_URLS_STR.split('\n')
# Set default URL for the input field
url_val = ''
value_urls = st.session_state.get("value_urls", [])
if len(value_urls) >= 1:
url_val = value_urls[0] # Use first URL as default
# Create text area for URL input in the sidebar
web_url = st.sidebar.text_area(
label=f'Enter {csv_url}s',
placeholder="https://",
# help="To add another website, modify this field after adding the website.",
value=url_val
)
# Alternative way to display URLs (commented out)
# st.sidebar.text(GIT_DATA_SET_URLS_STR)
# Button to add new URL to the list
if st.sidebar.button(":heavy_plus_sign: Add Website"):
# Validate URL format before adding
valid_url = web_url.startswith('http') and validators.url(web_url)
if not valid_url:
# Show error for invalid URL
st.sidebar.error(f"Invalid URL! Please check {csv_url} that you have entered.", icon="⚠️")
else:
# Add valid URL to the session state
st.session_state["websites"].append(web_url)
# Button to clear all URLs
if st.sidebar.button("Clear", type="primary"):
st.session_state["websites"] = []
# Remove duplicates by converting to set and back to list
websites = list(set(st.session_state["websites"]))
# Stop execution if no websites are provided
if not websites:
st.error(f"Please enter {csv_url} to continue!")
st.stop()
else:
# Show list of loaded data sources in sidebar
st.sidebar.info("CSV Data Sets - \n - {}".format('\n - '.join(websites)))
# Set up vector database and QA chain
vectordb = self.setup_vectordb(websites) # Create vector database from URLs
qa_chain = self.setup_qa_chain(vectordb) # Configure the QA chain
# Create chat input field
user_query = st.chat_input(placeholder="Ask me question about real estate properties!")
# Process query when user inputs a message
if websites and user_query:
# Display the user message
utils.display_msg(user_query, 'user')
# Display assistant response with streaming
with st.chat_message("assistant"):
# Set up streaming handler to show response as it's generated
st_cb = StreamHandler(st.empty())
# Process the query through the QA chain
result = qa_chain.invoke(
{"question": user_query},
{"callbacks": [st_cb]} # Use streaming callback
)
# Extract and store the response
response = result["answer"]
st.session_state.messages.append({"role": "assistant", "content": response})
utils.print_qa(ChatbotWeb, user_query, response) # Log the Q&A
# Display source references for transparency
for idx, doc in enumerate(result['source_documents'], 1):
# Extract filename from the source URL
url = os.path.basename(doc.metadata['source'])
# Create a reference title with clickable popup
ref_title = f":blue[Reference {idx}: *{url}*]"
# Show document content in a popup when clicked
with st.popover(ref_title):
st.caption(doc.page_content)
# Application entry point
if __name__ == "__main__":
obj = ChatbotWeb() # Create an instance of the chatbot
obj.main() # Run the main application loop