Skip to content
Open
1 change: 1 addition & 0 deletions .bazelignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ app/project-manager-shim/node_modules
app/table-expression/node_modules
app/rust-ffi/node_modules
app/ydoc-channel/node_modules
app/ydoc-inspect/node_modules
app/ydoc-server/node_modules
app/ydoc-server-nodejs/node_modules
app/ydoc-server-polyglot/node_modules
Expand Down
1 change: 1 addition & 0 deletions app/gui/vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const IS_ELECTRON_DEV_MODE = process.env.ELECTRON_DEV_MODE === 'true'

if (isDevMode) {
process.env.ENSO_IDE_YDOC_SERVER_URL ||= 'ws://__HOSTNAME__:5976'
process.env.ENSO_IDE_YDOC_LS_DEBUG ||= 'true'
}

// Used by vite middleware inside devtools plugin. Specifying this by an option doesn't work when `componentInspector` is false.
Expand Down
143 changes: 142 additions & 1 deletion app/ydoc-channel/src/YjsChannel.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, expect, it } from 'vitest'
import * as Y from 'yjs'
import { YjsChannel, type ChannelCodec } from './YjsChannel.js'
import { YjsChannel, type ChannelCodec, type TapDirection } from './YjsChannel.js'

// Mock CloseEvent for Node.js environment
if (typeof globalThis.CloseEvent === 'undefined') {
Expand Down Expand Up @@ -520,6 +520,147 @@ describe('YjsChannel', () => {
})
})

describe('tap', () => {
it('should receive sent messages with direction send', () => {
const doc = new Y.Doc()
const channel = new YjsChannel<string>(doc, 'tap-test')

const tapped: { msg: string; dir: TapDirection }[] = []
channel.tap((msg, dir) => tapped.push({ msg, dir }))

channel.send('hello')

expect(tapped).toEqual([{ msg: 'hello', dir: 'send' }])
})

it('should receive incoming messages with direction receive', () => {
const doc = new Y.Doc()
const sender = new YjsChannel<string>(doc, 'tap-test')
const receiver = new YjsChannel<string>(doc, 'tap-test')

const tapped: { msg: string; dir: TapDirection }[] = []
receiver.tap((msg, dir) => tapped.push({ msg, dir }))

// Receiver needs a subscriber so the observer processes messages
receiver.subscribe(() => {})

sender.send('from sender')

expect(tapped).toEqual([{ msg: 'from sender', dir: 'receive' }])
})

it('should not consume messages (non-destructive)', () => {
const doc = new Y.Doc()
const sender = new YjsChannel<string>(doc, 'tap-test')
const receiver = new YjsChannel<string>(doc, 'tap-test')

const tapped: string[] = []
const received: string[] = []

receiver.tap((msg) => tapped.push(msg))
receiver.subscribe((msg) => received.push(msg))

sender.send('msg1')

// Both tap and subscribe should see the message
expect(tapped).toEqual(['msg1'])
expect(received).toEqual(['msg1'])
})

it('should support unsubscribing', () => {
const doc = new Y.Doc()
const channel = new YjsChannel<string>(doc, 'tap-test')

const tapped: string[] = []
const untap = channel.tap((msg) => tapped.push(msg))

channel.send('before')
untap()
channel.send('after')

expect(tapped).toEqual(['before'])
})

it('should receive incoming messages even when no handlers are subscribed', () => {
const doc = new Y.Doc()
const sender = new YjsChannel<string>(doc, 'tap-test')
const receiver = new YjsChannel<string>(doc, 'tap-test')

const tapped: { msg: string; dir: TapDirection }[] = []
receiver.tap((msg, dir) => tapped.push({ msg, dir }))

// No subscribe() or addEventListener — only a tap is registered.
// This mirrors the inspect scenario: the tap is set up before
// the Java side subscribes to the channel.
sender.send('early message')

// Tap should still be notified
expect(tapped).toEqual([{ msg: 'early message', dir: 'receive' }])

// Message must remain in the array for a future handler
expect(doc.getArray<string>('tap-test').toArray()).toContain('early message')

// A later subscriber should still receive the message
const received: string[] = []
receiver.subscribe((msg) => received.push(msg))
expect(received).toEqual(['early message'])
})

it('should be cleared on close', () => {
const doc = new Y.Doc()
const sender = new YjsChannel<string>(doc, 'tap-test')
const receiver = new YjsChannel<string>(doc, 'tap-test')

const tapped: string[] = []
receiver.tap((msg) => tapped.push(msg))
receiver.subscribe(() => {})

sender.send('before close')
receiver.close()
sender.send('after close')

expect(tapped).toEqual(['before close'])
})
})

describe('notifyHandlers', () => {
it('should deliver message to subscribers without touching the Y.Array', () => {
const doc = new Y.Doc()
const channel = new YjsChannel<string>(doc, 'notify-test')

const received: string[] = []
channel.subscribe((msg) => received.push(msg))

channel.notifyHandlers('injected message')

expect(received).toEqual(['injected message'])
// Message should NOT be in the array
expect(doc.getArray<string>('notify-test').length).toBe(0)
})

it('should deliver message to addEventListener listeners', () => {
const doc = new Y.Doc()
const channel = new YjsChannel<string>(doc, 'notify-test')

const received: string[] = []
channel.addEventListener('message', (event) => {
received.push(event.data)
})

channel.notifyHandlers('injected message')

expect(received).toEqual(['injected message'])
})
})

describe('channelName', () => {
it('should expose the channel name', () => {
const doc = new Y.Doc()
const channel = new YjsChannel<string>(doc, 'my-channel')
expect(channel.channelName).toBe('my-channel')
})
})

describe('ChannelCodec', () => {
// A simple codec that doubles numbers on encode and halves on decode
const doubleCodec: ChannelCodec<number, number> = {
Expand Down
77 changes: 66 additions & 11 deletions app/ydoc-channel/src/YjsChannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ interface AddEventListenerOptions {
*/
export type MessageHandler<T = unknown> = (message: T) => void

/**
* Direction of a tapped message.
*/
export type TapDirection = 'send' | 'receive'

/**
* Non-consuming message observer callback type.
* Receives a copy of every message passing through the channel.
*/
export type TapHandler<T> = (message: T, direction: TapDirection) => void

/**
* Codec for converting between the external message type and the internal storage type.
*/
Expand Down Expand Up @@ -60,10 +71,12 @@ export class YjsChannel<
TMessage = unknown,
TStored = TMessage,
> extends ObservableV2<WebSocketEventHandlers> {
readonly channelName: string
private readonly senderId: string
private readonly doc: Y.Doc
private readonly array: Y.Array<TStored>
private readonly handlers: Set<MessageHandler<TMessage>> = new Set()
private readonly tapHandlers: Set<TapHandler<TMessage>> = new Set()
private readonly observeHandler: (event: Y.YArrayEvent<TStored>, tr: Y.Transaction) => void
private readonly codec: ChannelCodec<TMessage, TStored>

Expand All @@ -79,6 +92,7 @@ export class YjsChannel<
codec: ChannelCodec<TMessage, TStored> = identityCodec,
) {
super()
this.channelName = channelName
this.senderId = crypto.randomUUID()
this.doc = doc
this.array = doc.getArray<TStored>(channelName)
Expand All @@ -87,9 +101,11 @@ export class YjsChannel<
this.observeHandler = (event: Y.YArrayEvent<TStored>, transaction: Y.Transaction) => {
// Only notify handlers if the message is from another sender
if (transaction.origin !== this.senderId) {
// If no handlers are subscribed, leave items in the array for later processing.
const hasHandlers = this.handlers.size > 0 || this.hasActiveEventListeners()

// If no handlers or taps are subscribed, leave items in the array for later processing.
// This handles the race condition where messages arrive before handlers are attached.
if (this.handlers.size === 0 && !this.hasActiveEventListeners()) {
if (!hasHandlers && this.tapHandlers.size === 0) {
return
}

Expand All @@ -113,16 +129,23 @@ export class YjsChannel<
}
}

doc.transact(() => {
// Delete the processed items in reverse index order to preserve correct positions
for (let i = inserted.length - 1; i >= 0; i--) {
this.array.delete(inserted[i]!.index, 1)
}
}, this.senderId)
// Only consume (delete) messages when regular handlers are present.
// When only taps exist, leave items for later handler subscription.
if (hasHandlers) {
doc.transact(() => {
// Delete the processed items in reverse index order to preserve correct positions
for (let i = inserted.length - 1; i >= 0; i--) {
this.array.delete(inserted[i]!.index, 1)
}
}, this.senderId)
}

// Notify handlers after deletion
for (const { value } of inserted) {
this.notifyHandlers(this.codec.decode(value))
const decoded = this.codec.decode(value)
if (hasHandlers) {
this.notifyHandlers(decoded)
}
this.notifyTaps(decoded, 'receive')
}
}
}
Expand All @@ -136,6 +159,7 @@ export class YjsChannel<
*/
send(message: TMessage): void {
this.doc.transact(() => this.array.push([this.codec.encode(message)]), this.senderId)
this.notifyTaps(message, 'send')
}

/**
Expand Down Expand Up @@ -170,12 +194,26 @@ export class YjsChannel<
}
}

/**
* Registers a non-consuming observer that receives copies of all messages
* passing through the channel (both sent and received).
* @param handler - The callback to invoke with each message and its direction
* @returns A function to unsubscribe the tap handler
*/
tap(handler: TapHandler<TMessage>): () => void {
this.tapHandlers.add(handler)
return () => {
this.tapHandlers.delete(handler)
}
}

/**
* Removes all message handlers and stops observing the Y.Array.
*/
close(): void {
this.array.unobserve(this.observeHandler)
this.handlers.clear()
this.tapHandlers.clear()
this.emitClose()
}

Expand Down Expand Up @@ -265,8 +303,10 @@ export class YjsChannel<

/**
* Notifies all subscribed handlers with the received message.
* Used by {@link InspectManager} to inject messages. Not part of the public API.
* @internal
*/
private notifyHandlers(message: TMessage): void {
notifyHandlers(message: TMessage): void {
// Create a MessageEvent-like object for WebSocket compatibility
const messageEvent = { data: message } as MessageEvent

Expand All @@ -284,6 +324,21 @@ export class YjsChannel<
}
}

/**
* Notifies all tap handlers with the message and direction.
* Used by {@link InspectManager} to inject messages. Not part of the public API.
* @internal
*/
notifyTaps(message: TMessage, direction: TapDirection): void {
for (const handler of this.tapHandlers) {
try {
handler(message, direction)
} catch (e) {
console.error('Tap handler error', e)
}
}
}

/**
* Returns true if there are active 'message' event listeners registered via on/addEventListener.
* Queries the ObservableV2 internal observer map for a live count.
Expand Down
2 changes: 2 additions & 0 deletions app/ydoc-channel/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ export {
identityCodec,
type ChannelCodec,
type MessageHandler,
type TapDirection,
type TapHandler,
type YjsChannelServer,
} from './YjsChannel.js'
21 changes: 21 additions & 0 deletions app/ydoc-inspect/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"name": "ydoc-inspect",
"version": "0.1.0",
"private": true,
"type": "module",
"scripts": {
"start": "node --inspect dist/main.js",
"compile": "tsc -p tsconfig.build.json",
"typecheck": "tsc -b tsconfig.json --noEmit",
"lint": "eslint . --cache --max-warnings=0"
},
"dependencies": {
"ws": "^8.18.0",
"y-websocket": "^1.5.4",
"yjs": "^13.6.21"
},
"devDependencies": {
"@types/ws": "^8.5.13",
"typescript": "catalog:"
}
}
Loading
Loading