Skip to content

Commit aca0fa4

Browse files
reduce startup time by ~47% and memory usage by up to 150-300MB (#2204)
Co-authored-by: Julius Marminge <julius0216@outlook.com>
1 parent 0576820 commit aca0fa4

30 files changed

Lines changed: 1850 additions & 458 deletions

apps/server/src/checkpointing/Layers/CheckpointDiffQuery.test.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,13 @@ describe("CheckpointDiffQueryLive", () => {
8686
Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)),
8787
Layer.provideMerge(
8888
Layer.succeed(ProjectionSnapshotQuery, {
89+
getCommandReadModel: () =>
90+
Effect.die("CheckpointDiffQuery should not request the command read model"),
8991
getSnapshot: () =>
9092
Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"),
9193
getShellSnapshot: () =>
9294
Effect.die("CheckpointDiffQuery should not request the orchestration shell snapshot"),
95+
getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 0 }),
9396
getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }),
9497
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
9598
getProjectShellById: () => Effect.succeed(Option.none()),
@@ -163,10 +166,13 @@ describe("CheckpointDiffQueryLive", () => {
163166
Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)),
164167
Layer.provideMerge(
165168
Layer.succeed(ProjectionSnapshotQuery, {
169+
getCommandReadModel: () =>
170+
Effect.die("CheckpointDiffQuery should not request the command read model"),
166171
getSnapshot: () =>
167172
Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"),
168173
getShellSnapshot: () =>
169174
Effect.die("CheckpointDiffQuery should not request the orchestration shell snapshot"),
175+
getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 0 }),
170176
getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }),
171177
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
172178
getProjectShellById: () => Effect.succeed(Option.none()),
@@ -208,10 +214,13 @@ describe("CheckpointDiffQueryLive", () => {
208214
Layer.provideMerge(Layer.succeed(CheckpointStore, checkpointStore)),
209215
Layer.provideMerge(
210216
Layer.succeed(ProjectionSnapshotQuery, {
217+
getCommandReadModel: () =>
218+
Effect.die("CheckpointDiffQuery should not request the command read model"),
211219
getSnapshot: () =>
212220
Effect.die("CheckpointDiffQuery should not request the full orchestration snapshot"),
213221
getShellSnapshot: () =>
214222
Effect.die("CheckpointDiffQuery should not request the orchestration shell snapshot"),
223+
getSnapshotSequence: () => Effect.succeed({ snapshotSequence: 0 }),
215224
getCounts: () => Effect.succeed({ projectCount: 0, threadCount: 0 }),
216225
getActiveProjectByWorkspaceRoot: () => Effect.succeed(Option.none()),
217226
getProjectShellById: () => Effect.succeed(Option.none()),

apps/server/src/cli.test.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import { Command } from "effect/unstable/cli";
1717

1818
import { cli } from "./cli.ts";
1919
import { deriveServerPaths, ServerConfig, type ServerConfigShape } from "./config.ts";
20-
import { OrchestrationEngineService } from "./orchestration/Services/OrchestrationEngine.ts";
2120
import { ProjectionSnapshotQuery } from "./orchestration/Services/ProjectionSnapshotQuery.ts";
2221
import { OrchestrationLayerLive } from "./orchestration/runtimeLayer.ts";
2322
import {
@@ -316,8 +315,8 @@ it.layer(NodeServices.layer)("cli log-level parsing", (it) => {
316315
"--base-dir",
317316
baseDir,
318317
]);
319-
const orchestrationEngine = yield* OrchestrationEngineService;
320-
const readModel = yield* orchestrationEngine.getReadModel();
318+
const projectionSnapshotQuery = yield* ProjectionSnapshotQuery;
319+
const readModel = yield* projectionSnapshotQuery.getSnapshot();
321320
const addedProject = readModel.projects.find(
322321
(project) => project.workspaceRoot === workspaceRoot && project.deletedAt === null,
323322
);

apps/server/src/orchestration/Layers/CheckpointReactor.test.ts

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import {
4141
type OrchestrationEngineShape,
4242
} from "../Services/OrchestrationEngine.ts";
4343
import { CheckpointReactor } from "../Services/CheckpointReactor.ts";
44+
import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts";
4445
import {
4546
ProviderService,
4647
type ProviderServiceShape,
@@ -132,7 +133,14 @@ function createProviderServiceHarness(
132133
}
133134

134135
async function waitForThread(
135-
engine: OrchestrationEngineShape,
136+
readModel: () => Promise<{
137+
readonly threads: ReadonlyArray<{
138+
readonly id: ThreadId;
139+
readonly latestTurn: { readonly turnId: string } | null;
140+
readonly checkpoints: ReadonlyArray<{ readonly checkpointTurnCount: number }>;
141+
readonly activities: ReadonlyArray<{ readonly kind: string }>;
142+
}>;
143+
}>,
136144
predicate: (thread: {
137145
latestTurn: { turnId: string } | null;
138146
checkpoints: ReadonlyArray<{ checkpointTurnCount: number }>;
@@ -146,8 +154,8 @@ async function waitForThread(
146154
checkpoints: ReadonlyArray<{ checkpointTurnCount: number }>;
147155
activities: ReadonlyArray<{ kind: string }>;
148156
}> => {
149-
const readModel = await Effect.runPromise(engine.getReadModel());
150-
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
157+
const snapshot = await readModel();
158+
const thread = snapshot.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
151159
if (thread && predicate(thread)) {
152160
return thread;
153161
}
@@ -231,7 +239,7 @@ async function waitForGitRefExists(cwd: string, ref: string, timeoutMs = 15_000)
231239

232240
describe("CheckpointReactor", () => {
233241
let runtime: ManagedRuntime.ManagedRuntime<
234-
OrchestrationEngineService | CheckpointReactor | CheckpointStore,
242+
OrchestrationEngineService | CheckpointReactor | CheckpointStore | ProjectionSnapshotQuery,
235243
unknown
236244
> | null = null;
237245
let scope: Scope.Closeable | null = null;
@@ -279,6 +287,10 @@ describe("CheckpointReactor", () => {
279287
Layer.provide(RepositoryIdentityResolverLive),
280288
Layer.provide(SqlitePersistenceMemory),
281289
);
290+
const projectionSnapshotLayer = OrchestrationProjectionSnapshotQueryLive.pipe(
291+
Layer.provide(RepositoryIdentityResolverLive),
292+
Layer.provide(SqlitePersistenceMemory),
293+
);
282294

283295
const ServerConfigLayer = ServerConfig.layerTest(process.cwd(), {
284296
prefix: "t3-checkpoint-reactor-test-",
@@ -304,6 +316,7 @@ describe("CheckpointReactor", () => {
304316

305317
const layer = CheckpointReactorLive.pipe(
306318
Layer.provideMerge(orchestrationLayer),
319+
Layer.provideMerge(projectionSnapshotLayer),
307320
Layer.provideMerge(RuntimeReceiptBusLive),
308321
Layer.provideMerge(Layer.succeed(ProviderService, provider.service)),
309322
Layer.provideMerge(vcsStatusBroadcasterLayer),
@@ -322,6 +335,7 @@ describe("CheckpointReactor", () => {
322335

323336
runtime = ManagedRuntime.make(layer);
324337
const engine = await runtime.runPromise(Effect.service(OrchestrationEngineService));
338+
const snapshotQuery = await runtime.runPromise(Effect.service(ProjectionSnapshotQuery));
325339
const reactor = await runtime.runPromise(Effect.service(CheckpointReactor));
326340
const checkpointStore = await runtime.runPromise(Effect.service(CheckpointStore));
327341
scope = await Effect.runPromise(Scope.make("sequential"));
@@ -387,6 +401,7 @@ describe("CheckpointReactor", () => {
387401

388402
return {
389403
engine,
404+
readModel: () => Effect.runPromise(snapshotQuery.getSnapshot()),
390405
provider,
391406
cwd,
392407
drain,
@@ -443,7 +458,7 @@ describe("CheckpointReactor", () => {
443458

444459
await waitForEvent(harness.engine, (event) => event.type === "thread.turn-diff-completed");
445460
const thread = await waitForThread(
446-
harness.engine,
461+
harness.readModel,
447462
(entry) => entry.latestTurn?.turnId === "turn-1" && entry.checkpoints.length === 1,
448463
);
449464
expect(thread.checkpoints[0]?.checkpointTurnCount).toBe(1);
@@ -541,7 +556,7 @@ describe("CheckpointReactor", () => {
541556
});
542557

543558
await harness.drain();
544-
const midReadModel = await Effect.runPromise(harness.engine.getReadModel());
559+
const midReadModel = await harness.readModel();
545560
const midThread = midReadModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
546561
expect(midThread?.checkpoints).toHaveLength(0);
547562

@@ -557,7 +572,7 @@ describe("CheckpointReactor", () => {
557572
});
558573

559574
const thread = await waitForThread(
560-
harness.engine,
575+
harness.readModel,
561576
(entry) => entry.latestTurn?.turnId === "turn-main" && entry.checkpoints.length === 1,
562577
);
563578
expect(thread.checkpoints[0]?.checkpointTurnCount).toBe(1);
@@ -614,7 +629,7 @@ describe("CheckpointReactor", () => {
614629

615630
await waitForEvent(harness.engine, (event) => event.type === "thread.turn-diff-completed");
616631
const thread = await waitForThread(
617-
harness.engine,
632+
harness.readModel,
618633
(entry) => entry.latestTurn?.turnId === "turn-claude-1" && entry.checkpoints.length === 1,
619634
);
620635

@@ -659,7 +674,7 @@ describe("CheckpointReactor", () => {
659674

660675
await waitForEvent(harness.engine, (event) => event.type === "thread.turn-diff-completed");
661676
const thread = await waitForThread(
662-
harness.engine,
677+
harness.readModel,
663678
(entry) =>
664679
entry.checkpoints.length === 1 &&
665680
entry.activities.some((activity) => activity.kind === "checkpoint.capture.failed"),
@@ -794,7 +809,7 @@ describe("CheckpointReactor", () => {
794809
});
795810

796811
await harness.drain();
797-
const readModel = await Effect.runPromise(harness.engine.getReadModel());
812+
const readModel = await harness.readModel();
798813
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
799814
expect(thread?.checkpoints.some((checkpoint) => checkpoint.checkpointTurnCount === 3)).toBe(
800815
false,
@@ -923,7 +938,10 @@ describe("CheckpointReactor", () => {
923938
);
924939

925940
await waitForEvent(harness.engine, (event) => event.type === "thread.reverted");
926-
const thread = await waitForThread(harness.engine, (entry) => entry.checkpoints.length === 1);
941+
const thread = await waitForThread(
942+
harness.readModel,
943+
(entry) => entry.checkpoints.length === 1,
944+
);
927945

928946
expect(thread.latestTurn?.turnId).toBe("turn-1");
929947
expect(thread.checkpoints).toHaveLength(1);
@@ -1105,7 +1123,7 @@ describe("CheckpointReactor", () => {
11051123
}),
11061124
);
11071125

1108-
const thread = await waitForThread(harness.engine, (entry) =>
1126+
const thread = await waitForThread(harness.readModel, (entry) =>
11091127
entry.activities.some((activity) => activity.kind === "checkpoint.revert.failed"),
11101128
);
11111129

apps/server/src/orchestration/Layers/CheckpointReactor.ts

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { CheckpointStore } from "../../checkpointing/Services/CheckpointStore.ts
2020
import { ProviderService } from "../../provider/Services/ProviderService.ts";
2121
import { CheckpointReactor, type CheckpointReactorShape } from "../Services/CheckpointReactor.ts";
2222
import { OrchestrationEngineService } from "../Services/OrchestrationEngine.ts";
23+
import { ProjectionSnapshotQuery } from "../Services/ProjectionSnapshotQuery.ts";
2324
import { RuntimeReceiptBus } from "../Services/RuntimeReceiptBus.ts";
2425
import type { CheckpointStoreError } from "../../checkpointing/Errors.ts";
2526
import type { OrchestrationDispatchError } from "../Errors.ts";
@@ -66,6 +67,7 @@ const serverCommandId = (tag: string): CommandId =>
6667

6768
const make = Effect.gen(function* () {
6869
const orchestrationEngine = yield* OrchestrationEngineService;
70+
const projectionSnapshotQuery = yield* ProjectionSnapshotQuery;
6971
const providerService = yield* ProviderService;
7072
const checkpointStore = yield* CheckpointStore;
7173
const receiptBus = yield* RuntimeReceiptBus;
@@ -124,29 +126,26 @@ const make = Effect.gen(function* () {
124126
const resolveSessionRuntimeForThread = Effect.fn("resolveSessionRuntimeForThread")(function* (
125127
threadId: ThreadId,
126128
): Effect.fn.Return<Option.Option<{ readonly threadId: ThreadId; readonly cwd: string }>> {
127-
const readModel = yield* orchestrationEngine.getReadModel();
128-
const thread = readModel.threads.find((entry) => entry.id === threadId);
129-
130129
const sessions = yield* providerService.listSessions();
130+
const session = sessions.find((entry) => entry.threadId === threadId);
131+
return session?.cwd
132+
? Option.some({ threadId: session.threadId, cwd: session.cwd })
133+
: Option.none();
134+
});
131135

132-
const findSessionWithCwd = (
133-
session: (typeof sessions)[number] | undefined,
134-
): Option.Option<{ readonly threadId: ThreadId; readonly cwd: string }> => {
135-
if (!session?.cwd) {
136-
return Option.none();
137-
}
138-
return Option.some({ threadId: session.threadId, cwd: session.cwd });
139-
};
140-
141-
if (thread) {
142-
const projectedSession = sessions.find((session) => session.threadId === thread.id);
143-
const fromProjected = findSessionWithCwd(projectedSession);
144-
if (Option.isSome(fromProjected)) {
145-
return fromProjected;
146-
}
147-
}
136+
const resolveThreadDetail = Effect.fn("resolveThreadDetail")(function* (threadId: ThreadId) {
137+
return yield* projectionSnapshotQuery
138+
.getThreadDetailById(threadId)
139+
.pipe(Effect.map(Option.getOrUndefined));
140+
});
148141

149-
return Option.none();
142+
const resolveThreadProjects = Effect.fn("resolveThreadProjects")(function* (
143+
projectId: ProjectId,
144+
) {
145+
const project = yield* projectionSnapshotQuery
146+
.getProjectShellById(projectId)
147+
.pipe(Effect.map(Option.getOrUndefined));
148+
return project ? [project] : [];
150149
});
151150

152151
const isGitWorkspace = (cwd: string) => isGitRepository(cwd);
@@ -331,8 +330,7 @@ const make = Effect.gen(function* () {
331330
return;
332331
}
333332

334-
const readModel = yield* orchestrationEngine.getReadModel();
335-
const thread = readModel.threads.find((entry) => entry.id === event.threadId);
333+
const thread = yield* resolveThreadDetail(event.threadId);
336334
if (!thread) {
337335
return;
338336
}
@@ -353,10 +351,11 @@ const make = Effect.gen(function* () {
353351
return;
354352
}
355353

354+
const projects = yield* resolveThreadProjects(thread.projectId);
356355
const checkpointCwd = yield* resolveCheckpointCwd({
357356
threadId: thread.id,
358357
thread,
359-
projects: readModel.projects,
358+
projects,
360359
preferSessionRuntime: true,
361360
});
362361
if (!checkpointCwd) {
@@ -407,8 +406,7 @@ const make = Effect.gen(function* () {
407406
return;
408407
}
409408

410-
const readModel = yield* orchestrationEngine.getReadModel();
411-
const thread = readModel.threads.find((entry) => entry.id === threadId);
409+
const thread = yield* resolveThreadDetail(threadId);
412410
if (!thread) {
413411
yield* Effect.logWarning("checkpoint capture from placeholder skipped: thread not found", {
414412
threadId,
@@ -429,10 +427,11 @@ const make = Effect.gen(function* () {
429427
return;
430428
}
431429

430+
const projects = yield* resolveThreadProjects(thread.projectId);
432431
const checkpointCwd = yield* resolveCheckpointCwd({
433432
threadId,
434433
thread,
435-
projects: readModel.projects,
434+
projects,
436435
preferSessionRuntime: true,
437436
});
438437
if (!checkpointCwd) {
@@ -458,16 +457,16 @@ const make = Effect.gen(function* () {
458457
return;
459458
}
460459

461-
const readModel = yield* orchestrationEngine.getReadModel();
462-
const thread = readModel.threads.find((entry) => entry.id === event.threadId);
460+
const thread = yield* resolveThreadDetail(event.threadId);
463461
if (!thread) {
464462
return;
465463
}
466464

465+
const projects = yield* resolveThreadProjects(thread.projectId);
467466
const checkpointCwd = yield* resolveCheckpointCwd({
468467
threadId: thread.id,
469468
thread,
470-
projects: readModel.projects,
469+
projects,
471470
preferSessionRuntime: false,
472471
});
473472
if (!checkpointCwd) {
@@ -540,16 +539,16 @@ const make = Effect.gen(function* () {
540539
}
541540

542541
const threadId = event.payload.threadId;
543-
const readModel = yield* orchestrationEngine.getReadModel();
544-
const thread = readModel.threads.find((entry) => entry.id === threadId);
542+
const thread = yield* resolveThreadDetail(threadId);
545543
if (!thread) {
546544
return;
547545
}
548546

547+
const projects = yield* resolveThreadProjects(thread.projectId);
549548
const checkpointCwd = yield* resolveCheckpointCwd({
550549
threadId,
551550
thread,
552-
projects: readModel.projects,
551+
projects,
553552
preferSessionRuntime: false,
554553
});
555554
if (!checkpointCwd) {
@@ -587,8 +586,7 @@ const make = Effect.gen(function* () {
587586
) {
588587
const now = new Date().toISOString();
589588

590-
const readModel = yield* orchestrationEngine.getReadModel();
591-
const thread = readModel.threads.find((entry) => entry.id === event.payload.threadId);
589+
const thread = yield* resolveThreadDetail(event.payload.threadId);
592590
if (!thread) {
593591
yield* appendRevertFailureActivity({
594592
threadId: event.payload.threadId,

0 commit comments

Comments
 (0)