-
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathdb_setup.py
More file actions
475 lines (429 loc) · 16.1 KB
/
Copy pathdb_setup.py
File metadata and controls
475 lines (429 loc) · 16.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
import os
import re
import psycopg2
from urllib.parse import urlparse
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Parse database connection parameters from environment variables
def get_db_params():
"""Get database connection parameters from environment variables."""
# Check if a PostgreSQL connection string is provided
db_url = os.getenv('SUPABASE_URL')
if db_url and db_url.startswith('postgresql://'):
# Parse PostgreSQL connection string
# Format: postgresql://user:password@host:port/database
pattern = r'postgresql://([^:]+):([^@]+)@([^:]+):(\d+)/(.+)'
match = re.match(pattern, db_url)
if match:
user, password, host, port, database = match.groups()
return {
'host': host,
'port': int(port),
'database': database,
'user': user,
'password': password
}
# Check if a full URL is provided (backward compatibility)
supabase_url = os.getenv('SUPABASE_URL')
if supabase_url:
# Parse the URL if it's a full URL with protocol
if supabase_url.startswith('http://') or supabase_url.startswith('https://'):
parsed_url = urlparse(supabase_url)
host = parsed_url.hostname
port = parsed_url.port or 5432 # Default PostgreSQL port
else:
# Handle case where URL is just host:port without protocol
parts = supabase_url.split(':')
host = parts[0]
port = int(parts[1]) if len(parts) > 1 else 5432
# Use DB_USER if provided, otherwise fall back to SUPABASE_KEY
user = os.getenv('DB_USER') or os.getenv('SUPABASE_KEY', 'postgres')
return {
'host': host,
'port': port,
'database': os.getenv('SUPABASE_DB', 'postgres'),
'user': user,
'password': os.getenv('SUPABASE_PASSWORD', 'postgres')
}
else:
# Use individual components if no URL is provided (local development)
return {
'host': os.getenv('SUPABASE_HOST', '192.168.70.90'),
'port': int(os.getenv('SUPABASE_PORT', '54322')),
'database': os.getenv('SUPABASE_DB', 'postgres'),
'user': os.getenv('DB_USER') or os.getenv('SUPABASE_KEY', 'postgres'),
'password': os.getenv('SUPABASE_PASSWORD', 'postgres')
}
# Get database connection parameters
db_params = get_db_params()
# SQL statements to set up the database
setup_statements = [
# Enable the vector extension
"CREATE EXTENSION IF NOT EXISTS vector;",
# Create a table for crawled sites
"""
CREATE TABLE IF NOT EXISTS crawl_sites (
id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
url TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""",
# Create a table for crawled pages with vector embeddings
"""
CREATE TABLE IF NOT EXISTS crawl_pages (
id SERIAL PRIMARY KEY,
site_id INTEGER REFERENCES crawl_sites(id) ON DELETE CASCADE,
url TEXT NOT NULL,
title TEXT,
content TEXT,
summary TEXT,
embedding vector(1536),
metadata JSONB,
content_hash TEXT,
is_chunk BOOLEAN DEFAULT FALSE,
chunk_index INTEGER,
parent_id INTEGER REFERENCES crawl_pages(id) ON DELETE CASCADE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
""",
# Create user_preferences table for enhanced memory system
"""
CREATE TABLE IF NOT EXISTS user_preferences (
id SERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
preference_type TEXT NOT NULL,
preference_value TEXT NOT NULL,
context TEXT,
confidence FLOAT CHECK (confidence >= 0 AND confidence <= 1),
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
last_used TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
source_session TEXT,
is_active BOOLEAN DEFAULT TRUE,
metadata JSONB
);
""",
# Create indexes for user_preferences
"""
CREATE INDEX IF NOT EXISTS idx_user_preferences_user_id ON user_preferences(user_id);
CREATE INDEX IF NOT EXISTS idx_user_preferences_type ON user_preferences(preference_type);
CREATE INDEX IF NOT EXISTS idx_user_preferences_last_used ON user_preferences(last_used);
CREATE INDEX IF NOT EXISTS idx_user_preferences_confidence ON user_preferences(confidence);
""",
# Create function to merge preference contexts
"""
CREATE OR REPLACE FUNCTION merge_preference_contexts(old_context TEXT, new_context TEXT)
RETURNS TEXT
LANGUAGE plpgsql
AS $$
BEGIN
IF old_context IS NULL THEN
RETURN new_context;
ELSIF new_context IS NULL THEN
RETURN old_context;
ELSE
RETURN old_context || ' | ' || new_context;
END IF;
END;
$$;
""",
# Create function to update user preference
"""
CREATE OR REPLACE FUNCTION update_user_preference(
p_user_id TEXT,
p_preference_type TEXT,
p_preference_value TEXT,
p_context TEXT,
p_confidence FLOAT,
p_source_session TEXT,
p_metadata JSONB
)
RETURNS user_preferences
LANGUAGE plpgsql
AS $$
DECLARE
v_preference user_preferences;
BEGIN
-- Try to update existing preference
UPDATE user_preferences
SET
confidence = GREATEST(confidence, p_confidence),
context = merge_preference_contexts(context, p_context),
last_used = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP,
metadata = COALESCE(metadata, '{}'::jsonb) || COALESCE(p_metadata, '{}'::jsonb)
WHERE
user_id = p_user_id
AND preference_type = p_preference_type
AND preference_value = p_preference_value
RETURNING * INTO v_preference;
-- If no row was updated, insert new preference
IF v_preference IS NULL THEN
INSERT INTO user_preferences (
user_id,
preference_type,
preference_value,
context,
confidence,
source_session,
metadata
)
VALUES (
p_user_id,
p_preference_type,
p_preference_value,
p_context,
p_confidence,
p_source_session,
p_metadata
)
RETURNING * INTO v_preference;
END IF;
RETURN v_preference;
END;
$$;
""",
# Create function to get user preferences
"""
CREATE OR REPLACE FUNCTION get_user_preferences(
p_user_id TEXT,
p_min_confidence FLOAT DEFAULT 0.0,
p_active_only BOOLEAN DEFAULT TRUE
)
RETURNS TABLE (
id INTEGER,
user_id TEXT,
preference_type TEXT,
preference_value TEXT,
context TEXT,
confidence FLOAT,
created_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE,
last_used TIMESTAMP WITH TIME ZONE,
source_session TEXT,
is_active BOOLEAN,
metadata JSONB
)
LANGUAGE plpgsql
AS $$
BEGIN
RETURN QUERY
SELECT
up.id,
up.user_id,
up.preference_type,
up.preference_value,
up.context,
up.confidence,
up.created_at,
up.updated_at,
up.last_used,
up.source_session,
up.is_active,
up.metadata
FROM user_preferences up
WHERE
up.user_id = p_user_id
AND up.confidence >= p_min_confidence
AND (NOT p_active_only OR up.is_active = TRUE)
ORDER BY up.confidence DESC, up.last_used DESC;
END;
$$;
""",
# Create function for similarity search
"""
CREATE OR REPLACE FUNCTION match_page_embeddings(
query_embedding VECTOR(1536),
match_threshold FLOAT,
match_count INT
)
RETURNS TABLE (
id INTEGER,
site_id INTEGER,
url TEXT,
title TEXT,
content TEXT,
summary TEXT,
similarity FLOAT
)
LANGUAGE plpgsql
AS $$
BEGIN
RETURN QUERY
SELECT
crawl_pages.id,
crawl_pages.site_id,
crawl_pages.url,
crawl_pages.title,
crawl_pages.content,
crawl_pages.summary,
1 - (crawl_pages.embedding <=> query_embedding) AS similarity
FROM crawl_pages
WHERE 1 - (crawl_pages.embedding <=> query_embedding) > match_threshold
ORDER BY crawl_pages.embedding <=> query_embedding
LIMIT match_count;
END;
$$;
""",
# Create an index for faster vector searches
"""
CREATE INDEX IF NOT EXISTS crawl_pages_embedding_idx ON crawl_pages
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
""",
# Add indexes for better performance
"""
CREATE INDEX IF NOT EXISTS idx_pages_site_id ON crawl_pages(site_id);
CREATE INDEX IF NOT EXISTS idx_pages_url ON crawl_pages(url);
CREATE INDEX IF NOT EXISTS idx_pages_parent_id ON crawl_pages(parent_id);
CREATE INDEX IF NOT EXISTS idx_pages_is_chunk ON crawl_pages(is_chunk);
CREATE INDEX IF NOT EXISTS idx_pages_content_hash ON crawl_pages(content_hash);
""",
# Create crawl job tracking table for durable crawl status
"""
CREATE TABLE IF NOT EXISTS crawl_jobs (
id SERIAL PRIMARY KEY,
site_id INTEGER REFERENCES crawl_sites(id) ON DELETE CASCADE,
url TEXT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'queued',
options JSONB,
crawl4ai_task_id TEXT,
pages_found INTEGER DEFAULT 0,
pages_crawled INTEGER DEFAULT 0,
chunks_created INTEGER DEFAULT 0,
error TEXT,
started_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
finished_at TIMESTAMP WITH TIME ZONE,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_crawl_jobs_site_id ON crawl_jobs(site_id);
CREATE INDEX IF NOT EXISTS idx_crawl_jobs_status ON crawl_jobs(status);
CREATE INDEX IF NOT EXISTS idx_crawl_jobs_updated_at ON crawl_jobs(updated_at DESC);
""",
# Create a function to update the updated_at timestamp
"""
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql'
""",
# Create triggers to update the updated_at column
"""
DROP TRIGGER IF EXISTS update_sites_updated_at ON crawl_sites;
CREATE TRIGGER update_sites_updated_at
BEFORE UPDATE ON crawl_sites
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column()
""",
"""
DROP TRIGGER IF EXISTS update_pages_updated_at ON crawl_pages;
CREATE TRIGGER update_pages_updated_at
BEFORE UPDATE ON crawl_pages
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column()
""",
"""
DROP TRIGGER IF EXISTS update_preferences_updated_at ON user_preferences;
CREATE TRIGGER update_preferences_updated_at
BEFORE UPDATE ON user_preferences
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column()
"""
]
def setup_database():
"""Set up the database with the required tables and extensions."""
conn = None
try:
# Connect to the database
conn = psycopg2.connect(**db_params)
conn.autocommit = True
cur = conn.cursor()
# Check if the vector extension is available
try:
cur.execute("SELECT 1 FROM pg_available_extensions WHERE name = 'vector'")
vector_available = cur.fetchone() is not None
if not vector_available:
print("WARNING: The 'vector' extension is not available in this PostgreSQL installation.")
print("Vector search functionality will not work without it.")
print("You may need to install the pgvector extension in your PostgreSQL instance.")
print("For more information, visit: https://github.com/pgvector/pgvector")
# Continue with setup but skip the vector extension
setup_statements_no_vector = [stmt for stmt in setup_statements if "CREATE EXTENSION" not in stmt]
# Modify the crawl_pages table to use TEXT instead of vector if needed
for i, stmt in enumerate(setup_statements_no_vector):
if "CREATE TABLE IF NOT EXISTS crawl_pages" in stmt:
# Replace vector type with TEXT
setup_statements_no_vector[i] = re.sub(
r'embedding vector\(1536\)',
'embedding TEXT',
stmt
)
# Execute the modified statements
for statement in setup_statements_no_vector:
try:
cur.execute(statement)
print(f"Executed: {statement[:60]}...")
except Exception as e:
print(f"Error executing statement: {e}")
print(f"Statement: {statement}")
print("Database setup completed with limited functionality (no vector search).")
return
except Exception as e:
print(f"Error checking for vector extension: {e}")
# Continue with normal setup
# Execute all setup statements
for statement in setup_statements:
try:
cur.execute(statement)
print(f"Executed: {statement[:60]}...")
except Exception as e:
print(f"Error executing statement: {e}")
print(f"Statement: {statement}")
# Create indexes for better performance
try:
# Index for site_id in crawl_pages
cur.execute("CREATE INDEX IF NOT EXISTS idx_crawl_pages_site_id ON crawl_pages(site_id);")
# Index for parent_id in crawl_pages
cur.execute("CREATE INDEX IF NOT EXISTS idx_crawl_pages_parent_id ON crawl_pages(parent_id);")
# Index for is_chunk in crawl_pages
cur.execute("CREATE INDEX IF NOT EXISTS idx_crawl_pages_is_chunk ON crawl_pages(is_chunk);")
# Create a unique index on the URL to prevent duplicates
cur.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_crawl_pages_url ON crawl_pages(url);")
print("Created indexes for better performance.")
except Exception as e:
print(f"Error creating indexes: {e}")
# Set up the conversation history table
try:
cur.execute("""
CREATE TABLE IF NOT EXISTS chat_conversations (
id SERIAL PRIMARY KEY,
session_id VARCHAR(255) NOT NULL,
user_id VARCHAR(255),
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
role VARCHAR(50) NOT NULL,
content TEXT NOT NULL,
metadata JSONB
);
""")
# Create indexes for the conversation history table
cur.execute("CREATE INDEX IF NOT EXISTS idx_chat_conversations_session_id ON chat_conversations(session_id);")
cur.execute("CREATE INDEX IF NOT EXISTS idx_chat_conversations_user_id ON chat_conversations(user_id);")
print("✓ Conversation history table set up successfully")
except Exception as e:
print(f"Error setting up conversation history table: {e}")
print("Database setup completed successfully.")
except Exception as e:
print(f"Error setting up database: {e}")
finally:
if conn:
conn.close()
if __name__ == "__main__":
setup_database()