-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
108 lines (80 loc) · 3.39 KB
/
Copy pathmain.py
File metadata and controls
108 lines (80 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import logging
from fastapi import FastAPI
import inngest
import inngest.fast_api
from inngest.experimental import ai
from dotenv import load_dotenv
import uuid
import os
import datetime
from data_loader import load_and_chunk_pdf, embed_texts
from vector_db import QdrantStorage
from custom_types import RAGChunksAndSrc,RAGQueryResult,RAGSearchResult,RAGUpsertResult
load_dotenv()
inngest_client = inngest.Inngest(
app_id = "rag_app",
logger = logging.getLogger("uvicorn"),
is_production = False,
serializer=inngest.PydanticSerializer()
)
@inngest_client.create_function(
fn_id="rag_function",
trigger= inngest.TriggerEvent(event="rag/ingest_pdf")
)
async def rag_ingest_pdf(ctx:inngest.Context):
def _load(ctx: inngest.Context)-> RAGChunksAndSrc:
pdf_path = ctx.event.data["pdf_path"]
source_id = ctx.event.data.get("source_id", pdf_path)
chunks = load_and_chunk_pdf(pdf_path)
return RAGChunksAndSrc(chunks=chunks, source_id=source_id)
def _upsert(chunks_and_src:RAGChunksAndSrc)-> RAGUpsertResult:
chunks = chunks_and_src.chunks
source_id = chunks_and_src.source_id
vectors = embed_texts(chunks)
ids = [str(uuid.uuid5(uuid.NAMESPACE_URL, f"{source_id}_{i}")) for i in range(len(chunks))]
payloads = [{"source": source_id,"text":chunks[i]} for i in range(len(chunks))]
QdrantStorage().upsert(ids, vectors, payloads)
return RAGUpsertResult(ingested=len(chunks))
chunks_and_src = await ctx.step.run("load_and_chunks",lambda : _load(ctx),output_type=RAGChunksAndSrc)
ingested = await ctx.step.run("upsert_chunks",lambda:_upsert(chunks_and_src),output_type=RAGUpsertResult)
return ingested.model_dump()
@inngest_client.create_function(
fn_id="rag_query_function",
trigger= inngest.TriggerEvent(event="rag/query_pdf_ai")
)
async def rag_query_pdf_ai(ctx:inngest.Context):
def _search(question:str,top_k:int=5):
query_vec = embed_texts([question])[0]
store = QdrantStorage()
found = store.search(query_vec, top_k)
return RAGSearchResult(contexts=found["contexts"], sources=found["sources"])
question = ctx.event.data["question"]
top_k = int(ctx.event.data.get("top_k",5))
found = await ctx.step.run("embed-and-search",lambda: _search(question, top_k), output_type=RAGSearchResult)
context_block = "\n\n".join(f"- {c}" for c in found.contexts)
user_content = (
"Use the following retrieved contexts to answer the question.\n\n"
f"Context: \n{context_block}\n\n"
f"Question: {question}\n\n"
"Answer concisely using the context above."
)
adapter = ai.openai.Adapter(
auth_key = os.getenv("OPENAI_API_KEY"),
model = "gpt-40-mini",
)
res = await ctx.step.ai.infer(
"llm-answer",
adapter=adapter,
body={
"max_tokens":1024,
"temperature":0.2,
"messages":[
{"role":"system","content":"You answer queestions using only the provided context."},
{"role":"user","content":user_content}
]
}
)
answer = res["choices"][0]["message"]["content"].strip()
return {"answer":answer, "sources": found.sources, "num_contexts": len(found.contexts)}
app = FastAPI()
inngest.fast_api.serve(app, inngest_client,functions=[rag_ingest_pdf,rag_query_pdf_ai])