Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/weaviate-live-rag/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Used by the TypeStream server (to embed docs) and the chatbot (to embed queries + answer).
# Both must use the same embedding model, which they do by default (text-embedding-3-small).
OPENAI_API_KEY=sk-...
80 changes: 80 additions & 0 deletions examples/weaviate-live-rag/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Live RAG: Postgres → TypeStream → Weaviate

A self-contained demo of **continuously-fresh RAG**. A support chatbot answers from a
Weaviate index of help-center docs. The source of truth is Postgres — and the moment a
row changes, TypeStream re-embeds it and the bot's answer changes seconds later. No
reindex job, no consumer code, no Kafka to operate.

> Data is fictional ("Northwind"). Nothing here is real.

![The pipeline in the TypeStream UI](pipeline-graph.png)

## What's inside

```
Postgres(help_articles) --Debezium CDC--> Kafka --> TypeStream pipeline
(kafkaSource unwrapCdc -> embeddingGenerator -> kafkaSink: help_article_embeddings)
--> Weaviate sink connector --> Weaviate(HelpArticle, vectorizer:none)
^ nearVector
chatbot (Node+Express) ----------------/ embed question -> top-3 -> grounded answer
```

The chatbot queries Weaviate directly; TypeStream's only job is keeping Weaviate fresh.

## Run it

```bash
cp .env.example .env # add your OPENAI_API_KEY
docker compose up # wait for the `bootstrap` container to print "Demo is live."
```

Then open **http://localhost:8000**.

The `bootstrap` container runs once: it registers the CDC connector, creates the Weaviate
collection, applies the TypeStream pipeline, and registers the Weaviate sink connector,
then exits. After that the pipeline is persisted in Kafka and auto-recovers on every restart.

## The two demo moves

Dry-run all three questions before recording — answers are deterministic (temperature 0),
so only freshness changes on camera.

**1. UPDATE — a clean number flip**

Ask: *"How long is the free trial?"* → **"14 days."**

```bash
docker compose exec -T postgres psql -U typestream -d demo < demo/update-trial.sql
```

Ask the same question again → **"30 days."**

**2. INSERT — a topic that didn't exist**

Ask: *"Do you support single sign-on / Okta?"* → **"I don't have that information."**

```bash
docker compose exec -T postgres psql -U typestream -d demo < demo/insert-sso.sql
```

Ask again seconds later → the bot answers from the brand-new SSO doc.

## Retargeting to another datastore

The reusable core (Postgres, CDC, redpanda, TypeStream server, `bootstrap.sh`, the chatbot
shell) is vendor-neutral. Only two files encode "Weaviate":

| File | What changes |
|------|--------------|
| `bootstrap/target.sh` | `create_collection` + `register_sink_connector` for the new sink |
| `chatbot/retriever.js` | how a question becomes documents (`retrieve(q, k) -> [{title, body}]`) |
| `docker-compose.yml` | swap the `weaviate` service block + the chatbot's `WEAVIATE_HOST` |
| `pipeline/*.json` | (only if the sink topic name changes) |

## Notes & limits

- Requires `OPENAI_API_KEY` (server embeds docs; chatbot embeds queries + generates answers).
- Embedding model is pinned to `text-embedding-3-small` on both sides — keep them in sync.
- **Deletes are out of scope** here (insert + update only). Removing ghost vectors on delete
is a separate, more advanced beat.
- First `docker compose up` pulls/builds images; pre-warm once before recording.
12 changes: 12 additions & 0 deletions examples/weaviate-live-rag/bootstrap/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Tiny one-shot image: curl + jq + grpcurl, plus the bootstrap scripts and pipeline.
# Build context is the example root so we can COPY both bootstrap/ and pipeline/.
FROM fullstorydev/grpcurl:latest AS grpcurl

FROM alpine:3.20
RUN apk add --no-cache bash curl jq
COPY --from=grpcurl /bin/grpcurl /usr/local/bin/grpcurl
WORKDIR /work
COPY bootstrap/bootstrap.sh bootstrap/target.sh ./
COPY pipeline/ ./pipeline/
RUN chmod +x bootstrap.sh target.sh
ENTRYPOINT ["./bootstrap.sh"]
102 changes: 102 additions & 0 deletions examples/weaviate-live-rag/bootstrap/bootstrap.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/usr/bin/env bash
# ---------------------------------------------------------------------------
# One-shot bootstrap (generic / reusable across targets).
#
# A fresh clone has empty Kafka state, so TypeStream has no pipeline to recover
# on first boot. This applies it once; every restart afterwards auto-recovers
# it (PipelineStateStore). Steps:
# 1. register the Postgres CDC source connector
# 2. wait for the CDC topic to carry data
# 3. create the destination collection (target.sh)
# 4. apply the TypeStream pipeline (CDC -> embed -> help_article_embeddings)
# 5. wait for the embeddings topic to carry data
# 6. register the destination sink connector (target.sh)
# ---------------------------------------------------------------------------
set -euo pipefail

KAFKA_CONNECT=${KAFKA_CONNECT:-http://kafka-connect:8083}
SERVER_ADDR=${SERVER_ADDR:-server:4242}
WEAVIATE_REST=${WEAVIATE_REST:-http://weaviate:8080}
SCHEMA_REGISTRY=${SCHEMA_REGISTRY:-http://redpanda:8081}
CDC_TOPIC=${CDC_TOPIC:-dbserver.public.help_articles}
EMBEDDINGS_TOPIC=${EMBEDDINGS_TOPIC:-help_article_embeddings}
COLLECTION=${COLLECTION:-HelpArticle}
PIPELINE_FILE=${PIPELINE_FILE:-pipeline/help-articles.typestream.json}
EXPECTED_DOCS=${EXPECTED_DOCS:-10} # rows in db/01-help-articles.sql; bump if you change the seed
export KAFKA_CONNECT SERVER_ADDR WEAVIATE_REST SCHEMA_REGISTRY EMBEDDINGS_TOPIC COLLECTION

# shellcheck source=target.sh
. ./target.sh

retry() { # retry <seconds> <description> <command...>
local timeout=$1 desc=$2; shift 2
local deadline=$(( $(date +%s) + timeout ))
until "$@" >/dev/null 2>&1; do
if [ "$(date +%s)" -ge "$deadline" ]; then
echo "TIMED OUT waiting for: ${desc}" >&2; exit 1
fi
sleep 2
done
}

subject_exists() { curl -sf "${SCHEMA_REGISTRY}/subjects" | jq -e --arg s "$1" 'index($s)' >/dev/null; }

weaviate_count() {
curl -s "${WEAVIATE_REST}/v1/graphql" -H 'Content-Type: application/json' \
-d "{\"query\":\"{Aggregate{${COLLECTION}{meta{count}}}}\"}" \
| jq -r ".data.Aggregate.${COLLECTION}[0].meta.count // 0"
}
docs_ready() { [ "$(weaviate_count)" -ge "${EXPECTED_DOCS}" ]; }

echo "[1/7] waiting for dependencies (kafka-connect, weaviate, server)..."
retry 120 "kafka-connect" curl -sf "${KAFKA_CONNECT}/connectors"
retry 120 "weaviate" curl -sf "${WEAVIATE_REST}/v1/.well-known/ready"
retry 120 "server reflection" grpcurl -plaintext "${SERVER_ADDR}" list

echo "[2/7] registering Postgres CDC connector (public.help_articles)..."
curl -sf -X PUT "${KAFKA_CONNECT}/connectors/help-articles-cdc/config" \
-H 'Content-Type: application/json' \
-d '{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "typestream",
"database.password": "typestream",
"database.dbname": "demo",
"topic.prefix": "dbserver",
"schema.include.list": "public",
"table.include.list": "public.help_articles",
"plugin.name": "pgoutput",
"slot.name": "help_articles_slot",
"publication.name": "help_articles_pub",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://redpanda:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://redpanda:8081"
}' >/dev/null
echo " done"

echo "[3/7] waiting for CDC snapshot to populate ${CDC_TOPIC}..."
retry 120 "${CDC_TOPIC} schema" subject_exists "${CDC_TOPIC}-value"
sleep 12 # let the server's filesystem (fsRefreshRate=10s) discover the topic

echo "[4/7] creating destination collection..."
create_collection

echo "[5/7] applying TypeStream pipeline..."
jq '{metadata: {name: .name, version: .version, description: .description}, graph: .graph}' \
"${PIPELINE_FILE}" > /tmp/apply.json
grpcurl -plaintext -d @ "${SERVER_ADDR}" \
io.typestream.grpc.PipelineService/ApplyPipeline < /tmp/apply.json
echo " pipeline applied"

echo "[6/7] waiting for embeddings, then registering sink connector..."
retry 180 "${EMBEDDINGS_TOPIC} schema" subject_exists "${EMBEDDINGS_TOPIC}-value"
register_sink_connector

echo "[7/7] waiting for all ${EXPECTED_DOCS} docs to land in ${COLLECTION}..."
retry 120 "${EXPECTED_DOCS} docs in ${COLLECTION}" docs_ready
echo " ${COLLECTION} now has $(weaviate_count) objects"

echo ""
echo "Demo is live. Open http://localhost:8000 and ask: \"How long is the free trial?\""
50 changes: 50 additions & 0 deletions examples/weaviate-live-rag/bootstrap/target.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env bash
# ---------------------------------------------------------------------------
# TARGET-SPECIFIC SEAM (Weaviate).
# To retarget this demo to another datastore, this is one of only two files you
# rewrite (the other is chatbot/retriever.js). bootstrap.sh stays unchanged.
#
# Must define:
# create_collection - create the destination collection/schema (idempotent)
# register_sink_connector - register the Kafka Connect sink that drains
# $EMBEDDINGS_TOPIC into the destination
# Provided by bootstrap.sh: $KAFKA_CONNECT, $WEAVIATE_REST, $SCHEMA_REGISTRY,
# $EMBEDDINGS_TOPIC, $COLLECTION
# ---------------------------------------------------------------------------

create_collection() {
if curl -sf "${WEAVIATE_REST}/v1/schema/${COLLECTION}" >/dev/null 2>&1; then
echo " collection ${COLLECTION} already exists"
return 0
fi
# vectorizer: none -> we supply pre-computed vectors from the pipeline.
# Remaining properties are filled by Weaviate auto-schema on first object.
curl -sf -X POST "${WEAVIATE_REST}/v1/schema" \
-H 'Content-Type: application/json' \
-d "{\"class\":\"${COLLECTION}\",\"vectorizer\":\"none\"}" >/dev/null
echo " created collection ${COLLECTION} (vectorizer: none)"
}

# Verbatim mirror of the config TypeStream's ConnectionService builds for the
# installed kafka-connect-weaviate v0.1.2 (server/.../ConnectionService.kt).
register_sink_connector() {
curl -sf -X PUT "${KAFKA_CONNECT}/connectors/weaviate-sink/config" \
-H 'Content-Type: application/json' \
-d "{
\"connector.class\": \"io.weaviate.connector.WeaviateSinkConnector\",
\"tasks.max\": \"1\",
\"topics\": \"${EMBEDDINGS_TOPIC}\",
\"weaviate.connection.url\": \"${WEAVIATE_REST}\",
\"weaviate.grpc.url\": \"weaviate:50051\",
\"weaviate.grpc.secured\": \"false\",
\"collection.mapping\": \"${COLLECTION}\",
\"key.converter\": \"org.apache.kafka.connect.storage.StringConverter\",
\"value.converter\": \"io.confluent.connect.avro.AvroConverter\",
\"value.converter.schema.registry.url\": \"${SCHEMA_REGISTRY}\",
\"document.id.strategy\": \"io.weaviate.connector.idstrategy.FieldIdStrategy\",
\"document.id.field.name\": \"id\",
\"vector.strategy\": \"io.weaviate.connector.vectorstrategy.FieldVectorStrategy\",
\"vector.field.name\": \"embedding\"
}" >/dev/null
echo " registered weaviate-sink connector (topic ${EMBEDDINGS_TOPIC} -> ${COLLECTION})"
}
7 changes: 7 additions & 0 deletions examples/weaviate-live-rag/chatbot/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM node:20-slim
WORKDIR /app
COPY package.json ./
RUN npm install --omit=dev
COPY . .
EXPOSE 8000
CMD ["node", "server.js"]
15 changes: 15 additions & 0 deletions examples/weaviate-live-rag/chatbot/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "live-rag-chatbot",
"version": "1.0.0",
"private": true,
"type": "module",
"main": "server.js",
"scripts": {
"start": "node server.js"
},
"dependencies": {
"express": "^4.19.2",
"openai": "^4.67.3",
"weaviate-ts-client": "^2.2.0"
}
}
63 changes: 63 additions & 0 deletions examples/weaviate-live-rag/chatbot/public/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>Northwind Support</title>
<style>
:root { color-scheme: light dark; }
body { font-family: ui-sans-serif, system-ui, sans-serif; max-width: 640px; margin: 8vh auto; padding: 0 20px; }
h1 { font-size: 1.25rem; font-weight: 600; }
.sub { opacity: .6; margin-top: -.5rem; font-size: .9rem; }
form { display: flex; gap: 8px; margin: 1.5rem 0; }
input { flex: 1; padding: 12px 14px; font-size: 1rem; border: 1px solid #8884; border-radius: 10px; }
button { padding: 12px 18px; font-size: 1rem; border: 0; border-radius: 10px; background: #4f46e5; color: #fff; cursor: pointer; }
button:disabled { opacity: .5; cursor: default; }
#answer { white-space: pre-wrap; line-height: 1.5; min-height: 2rem; padding: 16px; border: 1px solid #8884; border-radius: 12px; }
#sources { margin-top: .75rem; font-size: .8rem; opacity: .6; }
</style>
</head>
<body>
<h1>Northwind Support Assistant</h1>
<p class="sub">Answers grounded in the help center — kept fresh by TypeStream.</p>

<form id="chat">
<input id="q" value="How long is the free trial?" placeholder="How long is the free trial?" autocomplete="off" autofocus />
<button type="submit">Ask</button>
</form>

<div id="answer">Ask a question to get started.</div>
<div id="sources"></div>

<script>
const form = document.getElementById("chat");
const input = document.getElementById("q");
const answer = document.getElementById("answer");
const sources = document.getElementById("sources");
const button = form.querySelector("button");

form.addEventListener("submit", async (e) => {
e.preventDefault();
const message = input.value.trim();
if (!message) return;
button.disabled = true;
answer.textContent = "Thinking…";
sources.textContent = "";
try {
const res = await fetch("/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ message }),
});
const data = await res.json();
answer.textContent = data.answer || data.error || "(no answer)";
sources.textContent = data.sources?.length ? "Sources: " + data.sources.join(", ") : "";
} catch {
answer.textContent = "Request failed.";
} finally {
button.disabled = false;
}
});
</script>
</body>
</html>
49 changes: 49 additions & 0 deletions examples/weaviate-live-rag/chatbot/retriever.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// ---------------------------------------------------------------------------
// TARGET-SPECIFIC SEAM (Weaviate).
// Owns the entire "question -> documents" step, embedding included, so server.js
// stays vendor-neutral. To retarget the demo, this is one of only two files you
// rewrite (the other is bootstrap/target.sh). Keep the export shape identical:
//
// retrieve(question: string, k: number) -> Promise<Array<{title, body}>>
//
// (A graph/SQL backend could ignore embeddings entirely and still satisfy this.)
// ---------------------------------------------------------------------------
import weaviate from "weaviate-ts-client";
import OpenAI from "openai";

const COLLECTION = process.env.COLLECTION || "HelpArticle";
const EMBEDDING_MODEL = process.env.EMBEDDING_MODEL || "text-embedding-3-small";

const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
const client = weaviate.client({
scheme: process.env.WEAVIATE_SCHEME || "http",
host: process.env.WEAVIATE_HOST || "weaviate:8080",
});

export async function retrieve(question, k = 3) {
// Query vector MUST use the same model the pipeline embedded docs with,
// or nearVector search is meaningless.
const embedding = await openai.embeddings.create({
model: EMBEDDING_MODEL,
input: question,
});
const vector = embedding.data[0].embedding;

const result = await client.graphql
.get()
.withClassName(COLLECTION)
.withFields("title body _additional { distance }")
.withNearVector({ vector })
.withLimit(k)
.do();

const docs = result?.data?.Get?.[COLLECTION] ?? [];

// Dump the Weaviate query result to the console (visible via `docker compose logs -f chatbot`).
console.log(`[retrieve] q=${JSON.stringify(question)} ->`);
docs.forEach((d, i) =>
console.log(` ${i + 1}. dist=${d._additional?.distance?.toFixed(4)} ${d.title}`)
);

return docs;
}
Loading
Loading