Skip to content

Commit 15562da

Browse files
committed
Maybe JS joining fetch.
1 parent 33da311 commit 15562da

9 files changed

Lines changed: 410 additions & 161 deletions

File tree

js/moq/src/ietf/connection.ts

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import * as Path from "../path.js";
55
import { type Reader, Readers, type Stream } from "../stream.ts";
66
import { unreachable } from "../util/index.ts";
77
import * as Control from "./control.ts";
8-
import { Fetch, FetchCancel, FetchError, FetchOk } from "./fetch.ts";
8+
import { Fetch, FetchCancel, FetchError, FetchHeader, FetchOk } from "./fetch.ts";
99
import { GoAway } from "./goaway.ts";
1010
import { GroupHeader } from "./group.ts";
1111
import { Publish, PublishError, PublishOk } from "./publish.ts";
@@ -183,13 +183,13 @@ export class Connection implements Established {
183183
} else if (msg instanceof PublishError) {
184184
throw new Error("PUBLISH_ERROR messages are not supported");
185185
} else if (msg instanceof Fetch) {
186-
throw new Error("FETCH messages are not supported");
186+
await this.#publisher.handleFetch(msg);
187187
} else if (msg instanceof FetchOk) {
188-
throw new Error("FETCH_OK messages are not supported");
188+
this.#subscriber.handleFetchOk(msg);
189189
} else if (msg instanceof FetchError) {
190-
throw new Error("FETCH_ERROR messages are not supported");
190+
this.#subscriber.handleFetchError(msg);
191191
} else if (msg instanceof FetchCancel) {
192-
throw new Error("FETCH_CANCEL messages are not supported");
192+
this.#publisher.handleFetchCancel(msg);
193193
} else if (msg instanceof MaxRequestId) {
194194
this.#control.maxRequestId(msg.requestId);
195195
} else if (msg instanceof RequestsBlocked) {
@@ -262,10 +262,17 @@ export class Connection implements Established {
262262
*/
263263
async #runObjectStream(stream: Reader) {
264264
try {
265-
// we don't support other stream types yet
266-
const header = await GroupHeader.decode(stream);
267-
console.debug("received group header", header);
268-
await this.#subscriber.handleGroup(header, stream);
265+
// TODO support varints for stream type; this is not correct
266+
const typ = (await stream.peek(1))[0];
267+
if (typ === FetchHeader.id) {
268+
const fetch = await FetchHeader.decode(stream);
269+
console.debug("received fetch header", fetch);
270+
await this.#subscriber.handleFetch(fetch, stream);
271+
} else {
272+
const header = await GroupHeader.decode(stream);
273+
console.debug("received group header", header);
274+
await this.#subscriber.handleGroup(header, stream);
275+
}
269276
} catch (err) {
270277
console.error("error processing object stream", err);
271278
}

js/moq/src/ietf/fetch.ts

Lines changed: 96 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import type * as Path from "../path.ts";
22
import type { Reader, Writer } from "../stream.ts";
3+
import { GroupOrder } from "./group.ts";
34
import { Location } from "./location.js";
45
import * as Message from "./message.ts";
56
import * as Namespace from "./namespace.ts";
67
import { Parameters } from "./parameters.ts";
78

9+
const FETCH_END = 0x03;
10+
811
export const FetchType = {
912
Standalone: 0x1,
1013
Relative: 0x2,
@@ -35,10 +38,10 @@ export class Fetch {
3538

3639
requestId: bigint;
3740
subscriberPriority: number;
38-
groupOrder: number;
41+
groupOrder: GroupOrder;
3942
fetchType: FetchType;
4043

41-
constructor(requestId: bigint, subscriberPriority: number, groupOrder: number, fetchType: FetchType) {
44+
constructor(requestId: bigint, subscriberPriority: number, groupOrder: GroupOrder, fetchType: FetchType) {
4245
this.requestId = requestId;
4346
this.subscriberPriority = subscriberPriority;
4447
this.groupOrder = groupOrder;
@@ -48,7 +51,7 @@ export class Fetch {
4851
async #encode(w: Writer): Promise<void> {
4952
await w.u62(this.requestId);
5053
await w.u8(this.subscriberPriority);
51-
await w.u8(this.groupOrder);
54+
await this.groupOrder.encode(w);
5255
await w.u53(this.fetchType.type);
5356
if (this.fetchType.type === FetchType.Standalone) {
5457
await Namespace.encode(w, this.fetchType.namespace);
@@ -79,7 +82,7 @@ export class Fetch {
7982
static async #decode(r: Reader): Promise<Fetch> {
8083
const requestId = await r.u62();
8184
const subscriberPriority = await r.u8();
82-
const groupOrder = await r.u8();
85+
const groupOrder = await GroupOrder.decode(r);
8386
const fetchType = await r.u53();
8487

8588
if (fetchType === FetchType.Standalone) {
@@ -127,11 +130,11 @@ export class FetchOk {
127130
static id = 0x18;
128131

129132
requestId: bigint;
130-
groupOrder: number;
133+
groupOrder: GroupOrder;
131134
endOfTrack: boolean;
132135
endLocation: Location;
133136

134-
constructor(requestId: bigint, groupOrder: number, endOfTrack: boolean, endLocation: Location) {
137+
constructor(requestId: bigint, groupOrder: GroupOrder, endOfTrack: boolean, endLocation: Location) {
135138
this.requestId = requestId;
136139
this.groupOrder = groupOrder;
137140
this.endOfTrack = endOfTrack;
@@ -140,7 +143,7 @@ export class FetchOk {
140143

141144
async #encode(w: Writer): Promise<void> {
142145
await w.u62(this.requestId);
143-
await w.u8(this.groupOrder);
146+
await this.groupOrder.encode(w);
144147
await w.bool(this.endOfTrack);
145148
this.endLocation.encode(w);
146149
await w.u53(0); // no parameters
@@ -156,7 +159,7 @@ export class FetchOk {
156159

157160
static async #decode(r: Reader): Promise<FetchOk> {
158161
const requestId = await r.u62();
159-
const groupOrder = await r.u8();
162+
const groupOrder = await GroupOrder.decode(r);
160163
const endOfTrack = await r.bool();
161164
const endLocation = await Location.decode(r);
162165
await Parameters.decode(r); // ignore parameters
@@ -225,3 +228,88 @@ export class FetchCancel {
225228
return new FetchCancel(requestId);
226229
}
227230
}
231+
232+
export class FetchHeader {
233+
static id = 0x5;
234+
235+
requestId: bigint;
236+
237+
constructor(requestId: bigint) {
238+
this.requestId = requestId;
239+
}
240+
241+
async encode(w: Writer): Promise<void> {
242+
await w.u62(this.requestId);
243+
}
244+
245+
static async decode(r: Reader): Promise<FetchHeader> {
246+
const requestId = await r.u62();
247+
return new FetchHeader(requestId);
248+
}
249+
}
250+
251+
export class FetchObject {
252+
groupId: number;
253+
subgroupId: number;
254+
objectId: number;
255+
publisherPriority: number;
256+
payload?: Uint8Array;
257+
258+
constructor(
259+
groupId: number,
260+
subgroupId: number,
261+
objectId: number,
262+
publisherPriority: number,
263+
payload?: Uint8Array,
264+
) {
265+
this.groupId = groupId;
266+
this.subgroupId = subgroupId;
267+
this.objectId = objectId;
268+
this.publisherPriority = publisherPriority;
269+
this.payload = payload;
270+
}
271+
272+
async encode(w: Writer): Promise<void> {
273+
await w.u53(this.groupId);
274+
await w.u53(this.subgroupId);
275+
await w.u53(this.objectId);
276+
await w.u8(this.publisherPriority);
277+
await w.u53(0); // no extension headers
278+
279+
if (this.payload !== undefined) {
280+
await w.u53(this.payload.byteLength);
281+
if (this.payload.byteLength === 0) {
282+
await w.u53(0); // status = normal
283+
} else {
284+
await w.write(this.payload);
285+
}
286+
} else {
287+
await w.u53(0); // no payload, length = 0
288+
await w.u53(FETCH_END); // no payload, status = end
289+
}
290+
}
291+
292+
static async decode(r: Reader): Promise<FetchObject> {
293+
const groupId = await r.u53();
294+
const subgroupId = await r.u53();
295+
const objectId = await r.u53();
296+
const publisherPriority = await r.u8();
297+
const payloadLength = await r.u53();
298+
299+
let payload: Uint8Array | undefined;
300+
if (payloadLength === 0) {
301+
const status = await r.u53();
302+
if (status === 0) {
303+
payload = new Uint8Array(0);
304+
} else if (status === FETCH_END) {
305+
payload = undefined;
306+
} else {
307+
throw new Error(`unexpected status: ${status}`);
308+
}
309+
} else {
310+
payload = await r.read(payloadLength);
311+
}
312+
313+
return new FetchObject(groupId, subgroupId, objectId, publisherPriority, payload);
314+
}
315+
}

js/moq/src/ietf/frame.ts

Lines changed: 0 additions & 68 deletions
This file was deleted.

js/moq/src/ietf/group.ts

Lines changed: 93 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,34 @@
11
import type { Reader, Writer } from "../stream";
22

3-
export const GroupOrder = {
4-
Any: 0x0,
5-
Ascending: 0x1,
6-
Descending: 0x2,
7-
} as const;
3+
export class GroupOrder {
4+
#value: number;
5+
6+
private constructor(value: number) {
7+
this.#value = value;
8+
}
9+
10+
static readonly Any = new GroupOrder(0x0);
11+
static readonly Ascending = new GroupOrder(0x1);
12+
static readonly Descending = new GroupOrder(0x2);
13+
14+
async encode(w: Writer): Promise<void> {
15+
await w.u8(this.#value);
16+
}
17+
18+
static async decode(r: Reader): Promise<GroupOrder> {
19+
const value = await r.u8();
20+
switch (value) {
21+
case 0x0:
22+
return GroupOrder.Any;
23+
case 0x1:
24+
return GroupOrder.Ascending;
25+
case 0x2:
26+
return GroupOrder.Descending;
27+
default:
28+
throw new Error(`Invalid GroupOrder: ${value}`);
29+
}
30+
}
31+
}
832

933
export interface GroupFlags {
1034
hasExtensions: boolean;
@@ -80,3 +104,67 @@ export class GroupHeader {
80104
return new GroupHeader(trackAlias, groupId, subGroupId, publisherPriority, flags);
81105
}
82106
}
107+
108+
const GROUP_END = 0x03;
109+
110+
export class GroupObject {
111+
id_delta: number;
112+
113+
// undefined means end of group
114+
payload?: Uint8Array;
115+
116+
constructor(id_delta: number, payload?: Uint8Array) {
117+
this.id_delta = id_delta;
118+
this.payload = payload;
119+
}
120+
121+
async encode(w: Writer, flags: GroupFlags): Promise<void> {
122+
await w.u53(this.id_delta);
123+
124+
if (flags.hasExtensions) {
125+
await w.u53(0); // extensions length = 0
126+
}
127+
128+
if (this.payload !== undefined) {
129+
await w.u53(this.payload.byteLength);
130+
131+
if (this.payload.byteLength === 0) {
132+
await w.u53(0); // status = normal
133+
} else {
134+
await w.write(this.payload);
135+
}
136+
} else {
137+
await w.u53(0); // length = 0
138+
await w.u53(GROUP_END);
139+
}
140+
}
141+
142+
static async decode(r: Reader, flags: GroupFlags): Promise<GroupObject> {
143+
const delta = await r.u53();
144+
145+
if (flags.hasExtensions) {
146+
const extensionsLength = await r.u53();
147+
// We don't care about extensions
148+
await r.read(extensionsLength);
149+
}
150+
151+
const payloadLength = await r.u53();
152+
153+
if (payloadLength > 0) {
154+
const payload = await r.read(payloadLength);
155+
return new GroupObject(delta, payload);
156+
}
157+
158+
const status = await r.u53();
159+
160+
if (status === 0) {
161+
return new GroupObject(delta, new Uint8Array(0));
162+
}
163+
164+
if (!flags.hasEnd && status === 3) {
165+
return new GroupObject(delta);
166+
}
167+
168+
throw new Error(`Unsupported object status: ${status}`);
169+
}
170+
}

js/moq/src/ietf/index.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
export * from "./connection.ts";
22
export * from "./control.ts";
3-
export * from "./frame.ts";
43
export * from "./goaway.ts";
54
export * from "./parameters.ts";
65
export * from "./publish.ts";

0 commit comments

Comments
 (0)