Skip to content

Commit 7830d6d

Browse files
wangzhigang1999pan3793
authored andcommitted
[KYUUBI #7379][2b/4] Data Agent Engine: agent runtime, middleware stack, and OpenAI provider
### Why are the changes needed? Part 2b of 4 for the Data Agent Engine ([umbrella](#7379), [KPIP-7373](#7373)). This PR adds the ReAct agent runtime that drives the LLM <-> tool loop, a composable middleware stack around it, and a production `OpenAiProvider`. It sits on top of the tool system and data source abstraction introduced in PR 2a, and is consumed by the REST layer in PR 3. Changes include: - `ReactAgent` — ReAct loop with streaming, tool-call dispatch, turn budget, malformed-tool-call recovery - `ConversationMemory` — message history with cumulative prompt-token tracking - `AgentRunContext` / `AgentInvocation` / `ApprovalMode` — per-run state plumbing - `ToolOutputStore` — size-gated tool-output offload, keyed by session+call-id, with `ReadToolOutputTool` / `GrepToolOutputTool` for LLM-driven retrieval - `AgentMiddleware` interface with `onRegister` hook for tool wiring, plus four middlewares: - `LoggingMiddleware` — structured request/response logging - `ApprovalMiddleware` — risk-level-based approval gate - `CompactionMiddleware` — token-threshold-driven history summarization keyed by session - `ToolResultOffloadMiddleware` — transparently owns the `ToolOutputStore` and registers retrieval tools - `OpenAiProvider` — OpenAI-compatible chat completions with streaming and tool calls - `ExecuteStatement.scala` — SSE encoding extended to emit `Compaction` events - Dialects moved under `datasource.dialect` package for organization - New `kyuubi.engine.data.agent.compaction.trigger.tokens` configuration entry - `MockLlmProvider` — deterministic mock for middleware and runtime tests - `mysql-connector-j` moved to `test` scope (GPL-licensed; cannot be bundled in an Apache binary release — addresses review feedback on #7417) ### How was this patch tested? - **Unit tests (Java)**: `ConversationMemoryTest`, `ToolOutputStoreTest`, `ApprovalMiddlewareTest`, `CompactionMiddlewareTest`, `ToolResultOffloadMiddlewareTest`, `event/EventTest`, plus updates to `ToolRegistryThreadSafetyTest` / `ToolTest` / `RunSelectQueryToolTest` / `RunMutationQueryToolTest` / `JdbcDialectTest` / MySQL `DialectTest` - **Live LLM tests** (opt-in, require `DATA_AGENT_LLM_API_KEY` / `DATA_AGENT_LLM_API_URL` / `DATA_AGENT_LLM_MODEL`): `ReactAgentLiveTest`, `CompactionMiddlewareLiveTest` — exercise the full loop against a real OpenAI-compatible endpoint - **E2E (Scala)**: `DataAgentE2ESuite` extended with OpenAI-provider paths; new `DataAgentCompactionE2ESuite` observes compaction via JDBC - Existing unit + MySQL Testcontainers tests from PR 2a remain green ### Was this patch authored or co-authored using generative AI tooling? Partially assisted by Claude Code (Claude Opus 4.7) for test generation, code review, and PR formatting. Core design and implementation are human-authored. Closes #7417 from wangzhigang1999/pr2b/data-agent-runtime. Closes #7379 1aac6aa [wangzhigang] [KYUUBI #7379][2b/4][FOLLOWUP] Split ReactAgent into LlmStreamClient + composite MiddlewareDispatcher 9fe9625 [wangzhigang] [KYUUBI #7379][2b/4][FOLLOWUP] Instruct LLM to SELECT after UPDATE in approval live test 211e867 [wangzhigang] [KYUUBI #7379][2b/4][FOLLOWUP] Unify AgentMiddleware hook return types under Decision<T> b9b4208 [wangzhigang] [KYUUBI #7379][2b/4][FOLLOWUP] Move ToolContext to first parameter in AgentTool.execute 108e9cd [wangzhigang] [KYUUBI #7379][2b/4][FOLLOWUP] Replace null-as-noop with explicit sentinel actions in AgentMiddleware d1777fc [wangzhigang] [KYUUBI #7379][2b/4][FOLLOWUP] Capitalize SQLite and MySQL in dialect class names 87c1f44 [wangzhigang] [KYUUBI #7379][2b/4][FOLLOWUP] Adopt Trino-style config keys, rename OpenAiProvider to ChatCompletionProvider 39fb9c5 [wangzhigang] [KYUUBI #7379][2b/4][FOLLOWUP] Tighten data-agent dependencies: drop SQLite/PostgreSQL bundle, pin kotlin/okhttp/okio 5223561 [wangzhigang] [KYUUBI #7379][2b/4] Move mysql-connector-j to test scope ce4eecc [wangzhigang] [KYUUBI #7379][2b/4] Data Agent Engine: agent runtime, middleware stack, OpenAI provider, and live E2E tests Authored-by: wangzhigang <iamzhigangwang@gmail.com> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 73a1af1 commit 7830d6d

60 files changed

Lines changed: 5566 additions & 180 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/configuration/settings.md

Lines changed: 4 additions & 3 deletions
Large diffs are not rendered by default.

externals/kyuubi-data-agent-engine/pom.xml

Lines changed: 87 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,15 @@
3030
<name>Kyuubi Project Engine Data Agent</name>
3131
<url>https://kyuubi.apache.org/</url>
3232

33+
<properties>
34+
<!-- Versions transitively pulled by openai-java; pinned here so any drift
35+
across openai-java upgrades is a deliberate change. -->
36+
<kotlin.stdlib.version>1.8.0</kotlin.stdlib.version>
37+
<kotlin.reflect.version>2.0.21</kotlin.reflect.version>
38+
<okhttp.version>4.12.0</okhttp.version>
39+
<okio.version>3.6.0</okio.version>
40+
</properties>
41+
3342
<dependencies>
3443
<!-- kyuubi dependency -->
3544
<dependency>
@@ -50,45 +59,111 @@
5059
<version>${project.version}</version>
5160
</dependency>
5261

62+
<!-- OpenAI official Java SDK -->
5363
<dependency>
5464
<groupId>com.openai</groupId>
5565
<artifactId>openai-java</artifactId>
66+
<version>${openai.sdk.version}</version>
5667
</dependency>
5768

69+
<!-- kotlin / okhttp / okio versions transitively introduced by openai-java; pinned for visibility. -->
70+
<dependency>
71+
<groupId>org.jetbrains.kotlin</groupId>
72+
<artifactId>kotlin-stdlib</artifactId>
73+
<version>${kotlin.stdlib.version}</version>
74+
</dependency>
75+
<dependency>
76+
<groupId>org.jetbrains.kotlin</groupId>
77+
<artifactId>kotlin-stdlib-common</artifactId>
78+
<version>${kotlin.stdlib.version}</version>
79+
</dependency>
80+
<dependency>
81+
<groupId>org.jetbrains.kotlin</groupId>
82+
<artifactId>kotlin-stdlib-jdk7</artifactId>
83+
<version>${kotlin.stdlib.version}</version>
84+
</dependency>
85+
<dependency>
86+
<groupId>org.jetbrains.kotlin</groupId>
87+
<artifactId>kotlin-stdlib-jdk8</artifactId>
88+
<version>${kotlin.stdlib.version}</version>
89+
</dependency>
90+
<dependency>
91+
<groupId>org.jetbrains.kotlin</groupId>
92+
<artifactId>kotlin-reflect</artifactId>
93+
<version>${kotlin.reflect.version}</version>
94+
</dependency>
95+
<dependency>
96+
<groupId>com.squareup.okhttp3</groupId>
97+
<artifactId>okhttp</artifactId>
98+
<version>${okhttp.version}</version>
99+
</dependency>
100+
<dependency>
101+
<groupId>com.squareup.okhttp3</groupId>
102+
<artifactId>logging-interceptor</artifactId>
103+
<version>${okhttp.version}</version>
104+
</dependency>
105+
<dependency>
106+
<groupId>com.squareup.okio</groupId>
107+
<artifactId>okio</artifactId>
108+
<version>${okio.version}</version>
109+
</dependency>
110+
<dependency>
111+
<groupId>com.squareup.okio</groupId>
112+
<artifactId>okio-jvm</artifactId>
113+
<version>${okio.version}</version>
114+
</dependency>
115+
116+
<!-- JSON Schema generation from Jackson-annotated classes -->
58117
<dependency>
59118
<groupId>com.github.victools</groupId>
60119
<artifactId>jsonschema-generator</artifactId>
120+
<version>${victools.jsonschema.version}</version>
61121
</dependency>
62-
63122
<dependency>
64123
<groupId>com.github.victools</groupId>
65124
<artifactId>jsonschema-module-jackson</artifactId>
125+
<version>${victools.jsonschema.version}</version>
66126
</dependency>
67127

68-
<!-- test dependencies -->
128+
<!-- JDBC drivers are not bundled; users provide them at runtime via extra classpath.
129+
SQLite and MySQL drivers are kept here in test scope only. -->
69130
<dependency>
70-
<groupId>org.apache.kyuubi</groupId>
71-
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
72-
<version>${project.version}</version>
73-
<type>test-jar</type>
131+
<groupId>org.xerial</groupId>
132+
<artifactId>sqlite-jdbc</artifactId>
133+
<version>${sqlite.version}</version>
74134
<scope>test</scope>
75135
</dependency>
76136

77137
<dependency>
78-
<groupId>org.xerial</groupId>
79-
<artifactId>sqlite-jdbc</artifactId>
138+
<groupId>com.mysql</groupId>
139+
<artifactId>mysql-connector-j</artifactId>
80140
<scope>test</scope>
81141
</dependency>
82142

143+
<!-- Trino JDBC driver -->
83144
<dependency>
84-
<groupId>org.testcontainers</groupId>
85-
<artifactId>testcontainers-mysql</artifactId>
145+
<groupId>io.trino</groupId>
146+
<artifactId>trino-jdbc</artifactId>
147+
</dependency>
148+
149+
<!-- Connection pool -->
150+
<dependency>
151+
<groupId>com.zaxxer</groupId>
152+
<artifactId>HikariCP</artifactId>
153+
</dependency>
154+
155+
<!-- test dependencies -->
156+
<dependency>
157+
<groupId>org.apache.kyuubi</groupId>
158+
<artifactId>kyuubi-common_${scala.binary.version}</artifactId>
159+
<version>${project.version}</version>
160+
<type>test-jar</type>
86161
<scope>test</scope>
87162
</dependency>
88163

89164
<dependency>
90-
<groupId>com.mysql</groupId>
91-
<artifactId>mysql-connector-j</artifactId>
165+
<groupId>org.testcontainers</groupId>
166+
<artifactId>testcontainers-mysql</artifactId>
92167
<scope>test</scope>
93168
</dependency>
94169

externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/JdbcDialect.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
package org.apache.kyuubi.engine.dataagent.datasource;
1919

20+
import org.apache.kyuubi.engine.dataagent.datasource.dialect.GenericDialect;
21+
import org.apache.kyuubi.engine.dataagent.datasource.dialect.MySQLDialect;
22+
import org.apache.kyuubi.engine.dataagent.datasource.dialect.SQLiteDialect;
23+
import org.apache.kyuubi.engine.dataagent.datasource.dialect.SparkDialect;
24+
import org.apache.kyuubi.engine.dataagent.datasource.dialect.TrinoDialect;
25+
2026
/**
2127
* SQL dialect abstraction for datasource-specific SQL generation.
2228
*
@@ -83,9 +89,9 @@ static JdbcDialect fromUrl(String jdbcUrl) {
8389
case "trino":
8490
return TrinoDialect.INSTANCE;
8591
case "mysql":
86-
return MysqlDialect.INSTANCE;
92+
return MySQLDialect.INSTANCE;
8793
case "sqlite":
88-
return SqliteDialect.INSTANCE;
94+
return SQLiteDialect.INSTANCE;
8995
default:
9096
return new GenericDialect(name);
9197
}

externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/GenericDialect.java renamed to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/GenericDialect.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,9 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.kyuubi.engine.dataagent.datasource;
18+
package org.apache.kyuubi.engine.dataagent.datasource.dialect;
19+
20+
import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
1921

2022
/**
2123
* Fallback dialect for JDBC subprotocols that have no dedicated implementation. Carries the

externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/MysqlDialect.java renamed to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/MySQLDialect.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.kyuubi.engine.dataagent.datasource;
18+
package org.apache.kyuubi.engine.dataagent.datasource.dialect;
19+
20+
import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
1921

2022
/** MySQL dialect. Uses backtick quoting for identifiers. */
21-
public final class MysqlDialect implements JdbcDialect {
23+
public final class MySQLDialect implements JdbcDialect {
2224

23-
static final MysqlDialect INSTANCE = new MysqlDialect();
25+
public static final MySQLDialect INSTANCE = new MySQLDialect();
2426

25-
private MysqlDialect() {}
27+
private MySQLDialect() {}
2628

2729
@Override
2830
public String datasourceName() {

externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/SqliteDialect.java renamed to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/SQLiteDialect.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,16 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.kyuubi.engine.dataagent.datasource;
18+
package org.apache.kyuubi.engine.dataagent.datasource.dialect;
19+
20+
import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
1921

2022
/** SQLite dialect. Uses double-quote quoting for identifiers. */
21-
public final class SqliteDialect implements JdbcDialect {
23+
public final class SQLiteDialect implements JdbcDialect {
2224

23-
static final SqliteDialect INSTANCE = new SqliteDialect();
25+
public static final SQLiteDialect INSTANCE = new SQLiteDialect();
2426

25-
private SqliteDialect() {}
27+
private SQLiteDialect() {}
2628

2729
@Override
2830
public String datasourceName() {

externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/SparkDialect.java renamed to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/SparkDialect.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.kyuubi.engine.dataagent.datasource;
18+
package org.apache.kyuubi.engine.dataagent.datasource.dialect;
19+
20+
import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
1921

2022
/** Spark SQL dialect. Uses backtick quoting for identifiers. */
2123
public final class SparkDialect implements JdbcDialect {
2224

23-
static final SparkDialect INSTANCE = new SparkDialect();
25+
public static final SparkDialect INSTANCE = new SparkDialect();
2426

2527
private SparkDialect() {}
2628

externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/TrinoDialect.java renamed to externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/datasource/dialect/TrinoDialect.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.kyuubi.engine.dataagent.datasource;
18+
package org.apache.kyuubi.engine.dataagent.datasource.dialect;
19+
20+
import org.apache.kyuubi.engine.dataagent.datasource.JdbcDialect;
1921

2022
/** Trino SQL dialect. Uses double-quote quoting for identifiers. */
2123
public final class TrinoDialect implements JdbcDialect {
2224

23-
static final TrinoDialect INSTANCE = new TrinoDialect();
25+
public static final TrinoDialect INSTANCE = new TrinoDialect();
2426

2527
private TrinoDialect() {}
2628

externals/kyuubi-data-agent-engine/src/main/java/org/apache/kyuubi/engine/dataagent/provider/ProviderRunRequest.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,23 @@
1717

1818
package org.apache.kyuubi.engine.dataagent.provider;
1919

20+
import org.apache.kyuubi.engine.dataagent.runtime.ApprovalMode;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
2024
/**
2125
* User-facing request parameters for a provider-level agent invocation. Only contains fields from
2226
* the caller (question, model override, etc.). Adding new per-request options does not require
2327
* changing the {@link DataAgentProvider} interface.
28+
*
29+
* <p>The approval mode is accepted as a raw string (natural for config-driven callers) and parsed
30+
* into {@link ApprovalMode} by {@link #getApprovalMode()}. Unrecognised values fall back to {@link
31+
* ApprovalMode#NORMAL} with a warning.
2432
*/
2533
public class ProviderRunRequest {
2634

35+
private static final Logger LOG = LoggerFactory.getLogger(ProviderRunRequest.class);
36+
2737
private final String question;
2838
private String modelName;
2939
private String approvalMode;
@@ -45,8 +55,20 @@ public ProviderRunRequest modelName(String modelName) {
4555
return this;
4656
}
4757

48-
public String getApprovalMode() {
49-
return approvalMode;
58+
/**
59+
* Resolved approval mode. Returns {@link ApprovalMode#NORMAL} when the caller did not set one or
60+
* supplied an unknown value.
61+
*/
62+
public ApprovalMode getApprovalMode() {
63+
if (approvalMode == null || approvalMode.isEmpty()) {
64+
return ApprovalMode.NORMAL;
65+
}
66+
try {
67+
return ApprovalMode.valueOf(approvalMode.toUpperCase());
68+
} catch (IllegalArgumentException e) {
69+
LOG.warn("Unknown approval mode '{}', using default NORMAL", approvalMode);
70+
return ApprovalMode.NORMAL;
71+
}
5072
}
5173

5274
public ProviderRunRequest approvalMode(String approvalMode) {

0 commit comments

Comments
 (0)