Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
c1faca1
feat(pam): real-time session log sync via incremental batch uploads
bernie-g Apr 7, 2026
705acde
docs(pam): update session recording docs to reflect incremental uploads
bernie-g Apr 7, 2026
1920591
fix(pam): add org-level gateway permission check to uploadEventBatch
bernie-g Apr 7, 2026
e05feb7
style(pam): reformat refetchInterval ternary
bernie-g Apr 7, 2026
d890840
fix(pam): remove PAM_SESSION_GET audit log from polled read endpoint
bernie-g Apr 8, 2026
74253b6
Merge remote-tracking branch 'origin/main' into feat/pam-session-real…
bernie-g Apr 8, 2026
0ce5cf3
fix(pam): paginate session logs to prevent unbounded memory load
bernie-g Apr 8, 2026
f2ce797
fix(pam): remove live refresh from session view, restore PAM_SESSION_…
bernie-g Apr 8, 2026
0305400
fix(pam): validate event batch payload schema before encrypting
bernie-g Apr 8, 2026
8c15154
fix(pam): skip audit log for event batch upserts (retries)
bernie-g Apr 8, 2026
05b16f3
fix(pam): handle invalid JSON in event-batch upload with 400
bernie-g Apr 9, 2026
53ac562
fix(pam): cast xmax result to satisfy TypeScript
bernie-g Apr 9, 2026
a684151
feat(pam): replace infinite scroll with cursor-based live log polling
bernie-g Apr 9, 2026
2f4a8e1
feat(pam): live poll for active sessions, load more for completed
bernie-g Apr 9, 2026
b1c146f
feat(pam): paginate completed session logs by event count (5000/page)
bernie-g Apr 9, 2026
34d3565
feat(pam): add LIVE indicator to session logs and revert page size to…
bernie-g Apr 9, 2026
fd32ef2
feat(pam): move LIVE badge inline next to session logs header
bernie-g Apr 9, 2026
f4929d5
feat(pam): animate LIVE badge with pulse
bernie-g Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions backend/src/@types/knex.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,11 @@ import {
TPamResourceRotationRulesUpdate
} from "@app/db/schemas/pam-resource-rotation-rules";
import { TPamResources, TPamResourcesInsert, TPamResourcesUpdate } from "@app/db/schemas/pam-resources";
import {
TPamSessionEventBatches,
TPamSessionEventBatchesInsert,
TPamSessionEventBatchesUpdate
} from "@app/db/schemas/pam-session-event-batches";
import { TPamSessions, TPamSessionsInsert, TPamSessionsUpdate } from "@app/db/schemas/pam-sessions";
import {
TProjectMicrosoftTeamsConfigs,
Expand Down Expand Up @@ -1592,6 +1597,11 @@ declare module "knex/types/tables" {
>;
[TableName.PamAccount]: KnexOriginal.CompositeTableType<TPamAccounts, TPamAccountsInsert, TPamAccountsUpdate>;
[TableName.PamSession]: KnexOriginal.CompositeTableType<TPamSessions, TPamSessionsInsert, TPamSessionsUpdate>;
[TableName.PamSessionEventBatch]: KnexOriginal.CompositeTableType<
TPamSessionEventBatches,
TPamSessionEventBatchesInsert,
TPamSessionEventBatchesUpdate
>;
[TableName.PamDiscoverySource]: KnexOriginal.CompositeTableType<
TPamDiscoverySources,
TPamDiscoverySourcesInsert,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Knex } from "knex";

import { TableName } from "../schemas";
import { createOnUpdateTrigger, dropOnUpdateTrigger } from "../utils";

export async function up(knex: Knex): Promise<void> {
if (!(await knex.schema.hasTable(TableName.PamSessionEventBatch))) {
await knex.schema.createTable(TableName.PamSessionEventBatch, (t) => {
t.uuid("id", { primaryKey: true }).defaultTo(knex.fn.uuid());

t.uuid("sessionId").notNullable();
t.foreign("sessionId").references("id").inTable(TableName.PamSession).onDelete("CASCADE");
t.index("sessionId");

t.bigInteger("startOffset").notNullable();
t.binary("encryptedEventsBlob").notNullable();

t.timestamps(true, true, true);

t.unique(["sessionId", "startOffset"]);
});

await createOnUpdateTrigger(knex, TableName.PamSessionEventBatch);
}
}

export async function down(knex: Knex): Promise<void> {
await dropOnUpdateTrigger(knex, TableName.PamSessionEventBatch);
await knex.schema.dropTableIfExists(TableName.PamSessionEventBatch);
}
1 change: 1 addition & 0 deletions backend/src/db/schemas/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ export * from "./pam-folders";
export * from "./pam-resource-favorites";
export * from "./pam-resource-rotation-rules";
export * from "./pam-resources";
export * from "./pam-session-event-batches";
export * from "./pam-sessions";
export * from "./pki-acme-accounts";
export * from "./pki-acme-auths";
Expand Down
1 change: 1 addition & 0 deletions backend/src/db/schemas/models.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ export enum TableName {
PamResource = "pam_resources",
PamAccount = "pam_accounts",
PamSession = "pam_sessions",
PamSessionEventBatch = "pam_session_event_batches",
PamDiscoverySource = "pam_discovery_sources",
PamDiscoverySourceRun = "pam_discovery_source_runs",
PamDiscoverySourceResource = "pam_discovery_source_resources",
Expand Down
25 changes: 25 additions & 0 deletions backend/src/db/schemas/pam-session-event-batches.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Code generated by automation script, DO NOT EDIT.
// Automated by pulling database and generating zod schema
// To update. Just run npm run generate:schema
// Written by akhilmhdh.

import { z } from "zod";

import { zodBuffer } from "@app/lib/zod";

import { TImmutableDBKeys } from "./models";

export const PamSessionEventBatchesSchema = z.object({
id: z.string().uuid(),
sessionId: z.string().uuid(),
startOffset: z.coerce.number(),
encryptedEventsBlob: zodBuffer,
createdAt: z.date(),
updatedAt: z.date()
});

export type TPamSessionEventBatches = z.infer<typeof PamSessionEventBatchesSchema>;
export type TPamSessionEventBatchesInsert = Omit<z.input<typeof PamSessionEventBatchesSchema>, TImmutableDBKeys>;
export type TPamSessionEventBatchesUpdate = Partial<
Omit<z.input<typeof PamSessionEventBatchesSchema>, TImmutableDBKeys>
>;
94 changes: 94 additions & 0 deletions backend/src/ee/routes/v1/pam-session-router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import {
HttpEventSchema,
PamSessionCommandLogSchema,
SanitizedSessionSchema,
SessionLogsPageSchema,
TerminalEventSchema
} from "@app/ee/services/pam-session/pam-session-schemas";
import { BadRequestError } from "@app/lib/errors";
import { readLimit, writeLimit } from "@app/server/config/rateLimiter";
import { getTelemetryDistinctId } from "@app/server/lib/telemetry";
import { verifyAuth } from "@app/server/plugins/auth/verify-auth";
Expand All @@ -30,6 +32,10 @@ const SessionCredentialsSchema = z.union([
]);

export const registerPamSessionRouter = async (server: FastifyZodProvider) => {
server.addContentTypeParser("application/octet-stream", { parseAs: "buffer" }, (_req, body, done) => {
done(null, body);
});

// Meant to be hit solely by gateway identities
server.route({
method: "GET",
Expand Down Expand Up @@ -249,6 +255,36 @@ export const registerPamSessionRouter = async (server: FastifyZodProvider) => {
}
});

server.route({
method: "GET",
url: "/:sessionId/logs",
config: {
rateLimit: readLimit
},
schema: {
description: "Get paginated PAM session logs",
params: z.object({
sessionId: z.string().uuid()
}),
querystring: z.object({
offset: z.coerce.number().int().nonnegative().default(0),
limit: z.coerce.number().int().min(1).max(100).default(20)
}),
response: {
200: SessionLogsPageSchema
}
},
onRequest: verifyAuth([AuthMode.JWT]),
handler: async (req) => {
return server.services.pamSession.getSessionLogs(
req.params.sessionId,
req.query.offset,
req.query.limit,
req.permission
);
}
});

server.route({
method: "GET",
url: "/:sessionId",
Expand Down Expand Up @@ -322,4 +358,62 @@ export const registerPamSessionRouter = async (server: FastifyZodProvider) => {
return response;
}
});

// Meant to be hit solely by gateway identities
server.route({
method: "POST",
url: "/:sessionId/event-batches",
config: {
rateLimit: writeLimit
},
schema: {
description: "Upload a PAM session event batch",
params: z.object({
sessionId: z.string().uuid()
}),
querystring: z.object({
startOffset: z.coerce.number().int().nonnegative()
}),
body: z.instanceof(Buffer),
response: {
200: z.object({ ok: z.literal(true) })
}
},
onRequest: verifyAuth([AuthMode.IDENTITY_ACCESS_TOKEN]),
handler: async (req) => {
const EventBatchSchema = z.array(z.union([PamSessionCommandLogSchema, TerminalEventSchema, HttpEventSchema]));
try {
EventBatchSchema.parse(JSON.parse(req.body.toString()));
} catch (e) {
if (e instanceof SyntaxError) throw new BadRequestError({ message: "Invalid JSON in request body" });
throw e;
}

const { projectId, wasInserted } = await server.services.pamSession.uploadEventBatch(
{
sessionId: req.params.sessionId,
startOffset: req.query.startOffset,
events: req.body
},
req.permission
);

if (wasInserted) {
await server.services.auditLog.createAuditLog({
...req.auditLogInfo,
orgId: req.permission.orgId,
projectId,
event: {
type: EventType.PAM_SESSION_EVENT_BATCH_UPLOAD,
metadata: {
sessionId: req.params.sessionId,
startOffset: req.query.startOffset
}
}
});
}

return { ok: true as const };
}
});
};
10 changes: 10 additions & 0 deletions backend/src/ee/services/audit-log/audit-log-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ export enum EventType {
PAM_SESSION_TERMINATE = "pam-session-terminate",
PAM_SESSION_GET = "pam-session-get",
PAM_SESSION_LIST = "pam-session-list",
PAM_SESSION_EVENT_BATCH_UPLOAD = "pam-session-event-batch-upload",
PAM_FOLDER_CREATE = "pam-folder-create",
PAM_FOLDER_UPDATE = "pam-folder-update",
PAM_FOLDER_DELETE = "pam-folder-delete",
Expand Down Expand Up @@ -4747,6 +4748,14 @@ interface PamSessionListEvent {
};
}

interface PamSessionEventBatchUploadEvent {
type: EventType.PAM_SESSION_EVENT_BATCH_UPLOAD;
metadata: {
sessionId: string;
startOffset: number;
};
}

interface PamFolderCreateEvent {
type: EventType.PAM_FOLDER_CREATE;
metadata: {
Expand Down Expand Up @@ -6163,6 +6172,7 @@ export type Event =
| PamSessionTerminateEvent
| PamSessionGetEvent
| PamSessionListEvent
| PamSessionEventBatchUploadEvent
| PamFolderCreateEvent
| PamFolderUpdateEvent
| PamFolderDeleteEvent
Expand Down
35 changes: 35 additions & 0 deletions backend/src/ee/services/pam-session/pam-session-event-batch-dal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Knex } from "knex";

import { TDbClient } from "@app/db";
import { TableName } from "@app/db/schemas";
import { ormify } from "@app/lib/knex";

export type TPamSessionEventBatchDALFactory = ReturnType<typeof pamSessionEventBatchDALFactory>;

export const pamSessionEventBatchDALFactory = (db: TDbClient) => {
const orm = ormify(db, TableName.PamSessionEventBatch);

const findBySessionIdPaginated = async (
sessionId: string,
{ offset, limit }: { offset: number; limit: number },
tx?: Knex
) => {
return (tx || db.replicaNode())(TableName.PamSessionEventBatch)
.where("sessionId", sessionId)
.orderBy("startOffset", "asc")
.limit(limit)
.offset(offset)
.select("*");
};

const upsertBatch = async (sessionId: string, startOffset: number, encryptedEventsBlob: Buffer, tx?: Knex) => {
const result = await (tx || db)(TableName.PamSessionEventBatch)
.insert({ sessionId, startOffset, encryptedEventsBlob })
.onConflict(["sessionId", "startOffset"])
.merge(["encryptedEventsBlob"]) // on re-upload of the same offset, overwrite the blob instead of erroring or skipping
.returning(db.raw("(xmax = 0) as inserted"));
return { wasInserted: (result[0] as unknown as { inserted: boolean })?.inserted === true };
};

return { ...orm, findBySessionIdPaginated, upsertBatch };
};
21 changes: 20 additions & 1 deletion backend/src/ee/services/pam-session/pam-session-fns.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { TPamSessions } from "@app/db/schemas";
import { TPamSessionEventBatches, TPamSessions } from "@app/db/schemas";
import { TKmsServiceFactory } from "@app/services/kms/kms-service";
import { KmsDataKey } from "@app/services/kms/kms-types";

Expand All @@ -25,6 +25,25 @@ export const decryptSessionCommandLogs = async ({
return JSON.parse(decryptedPlainTextBlob.toString()) as (TPamSessionCommandLog | TTerminalEvent)[];
};

export const decryptBatches = async (
batches: TPamSessionEventBatches[],
projectId: string,
kmsService: Pick<TKmsServiceFactory, "createCipherPairWithDataKey">
) => {
const { decryptor } = await kmsService.createCipherPairWithDataKey({
type: KmsDataKey.SecretManager,
projectId
});

const events: (TPamSessionCommandLog | TTerminalEvent)[] = [];
for (const batch of batches) {
const plain = decryptor({ cipherTextBlob: batch.encryptedEventsBlob });
const batchEvents = JSON.parse(plain.toString()) as (TPamSessionCommandLog | TTerminalEvent)[];
events.push(...batchEvents);
}
return events;
};

export const decryptSession = async (
session: TPamSessions,
projectId: string,
Expand Down
6 changes: 6 additions & 0 deletions backend/src/ee/services/pam-session/pam-session-schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@ export const SanitizedSessionSchema = PamSessionsSchema.omit({
logs: z.array(z.union([PamSessionCommandLogSchema, HttpEventSchema, TerminalEventSchema])),
gatewayIdentityId: z.string().nullable().optional()
});

export const SessionLogsPageSchema = z.object({
logs: z.array(z.union([PamSessionCommandLogSchema, TerminalEventSchema, HttpEventSchema])),
hasMore: z.boolean(),
batchCount: z.number()
});
Loading
Loading