Skip to content

Commit 33da311

Browse files
committed
Start implementing joining fetch.
1 parent 80a6970 commit 33da311

10 files changed

Lines changed: 251 additions & 119 deletions

File tree

js/moq/src/ietf/connection.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { unreachable } from "../util/index.ts";
77
import * as Control from "./control.ts";
88
import { Fetch, FetchCancel, FetchError, FetchOk } from "./fetch.ts";
99
import { GoAway } from "./goaway.ts";
10-
import { Group } from "./object.ts";
10+
import { GroupHeader } from "./group.ts";
1111
import { Publish, PublishError, PublishOk } from "./publish.ts";
1212
import {
1313
PublishNamespace,
@@ -263,7 +263,7 @@ export class Connection implements Established {
263263
async #runObjectStream(stream: Reader) {
264264
try {
265265
// we don't support other stream types yet
266-
const header = await Group.decode(stream);
266+
const header = await GroupHeader.decode(stream);
267267
console.debug("received group header", header);
268268
await this.#subscriber.handleGroup(header, stream);
269269
} catch (err) {

js/moq/src/ietf/fetch.ts

Lines changed: 133 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,71 @@
11
import type * as Path from "../path.ts";
22
import type { Reader, Writer } from "../stream.ts";
3+
import { Location } from "./location.js";
34
import * as Message from "./message.ts";
5+
import * as Namespace from "./namespace.ts";
6+
import { Parameters } from "./parameters.ts";
7+
8+
export const FetchType = {
9+
Standalone: 0x1,
10+
Relative: 0x2,
11+
Absolute: 0x3,
12+
} as const;
13+
14+
export type FetchType =
15+
| {
16+
type: typeof FetchType.Standalone;
17+
namespace: Path.Valid;
18+
track: string;
19+
start: Location;
20+
end: Location;
21+
}
22+
| {
23+
type: typeof FetchType.Relative;
24+
subscribeId: bigint;
25+
groupOffset: number;
26+
}
27+
| {
28+
type: typeof FetchType.Absolute;
29+
subscribeId: bigint;
30+
groupId: number;
31+
};
432

533
export class Fetch {
634
static id = 0x16;
735

836
requestId: bigint;
9-
trackNamespace: Path.Valid;
10-
trackName: string;
1137
subscriberPriority: number;
1238
groupOrder: number;
13-
startGroup: bigint;
14-
startObject: bigint;
15-
endGroup: bigint;
16-
endObject: bigint;
17-
18-
constructor(
19-
requestId: bigint,
20-
trackNamespace: Path.Valid,
21-
trackName: string,
22-
subscriberPriority: number,
23-
groupOrder: number,
24-
startGroup: bigint,
25-
startObject: bigint,
26-
endGroup: bigint,
27-
endObject: bigint,
28-
) {
39+
fetchType: FetchType;
40+
41+
constructor(requestId: bigint, subscriberPriority: number, groupOrder: number, fetchType: FetchType) {
2942
this.requestId = requestId;
30-
this.trackNamespace = trackNamespace;
31-
this.trackName = trackName;
3243
this.subscriberPriority = subscriberPriority;
3344
this.groupOrder = groupOrder;
34-
this.startGroup = startGroup;
35-
this.startObject = startObject;
36-
this.endGroup = endGroup;
37-
this.endObject = endObject;
38-
}
39-
40-
async #encode(_w: Writer): Promise<void> {
41-
throw new Error("FETCH messages are not supported");
45+
this.fetchType = fetchType;
46+
}
47+
48+
async #encode(w: Writer): Promise<void> {
49+
await w.u62(this.requestId);
50+
await w.u8(this.subscriberPriority);
51+
await w.u8(this.groupOrder);
52+
await w.u53(this.fetchType.type);
53+
if (this.fetchType.type === FetchType.Standalone) {
54+
await Namespace.encode(w, this.fetchType.namespace);
55+
await w.string(this.fetchType.track);
56+
this.fetchType.start.encode(w);
57+
this.fetchType.end.encode(w);
58+
} else if (this.fetchType.type === FetchType.Relative) {
59+
await w.u62(this.fetchType.subscribeId);
60+
await w.u53(this.fetchType.groupOffset);
61+
} else if (this.fetchType.type === FetchType.Absolute) {
62+
await w.u62(this.fetchType.subscribeId);
63+
await w.u53(this.fetchType.groupId);
64+
} else {
65+
const fetchType: never = this.fetchType;
66+
throw new Error(`unknown fetch type: ${fetchType}`);
67+
}
68+
await w.u53(0); // no parameters
4269
}
4370

4471
async encode(w: Writer): Promise<void> {
@@ -49,22 +76,74 @@ export class Fetch {
4976
return Message.decode(r, Fetch.#decode);
5077
}
5178

52-
static async #decode(_r: Reader): Promise<Fetch> {
53-
throw new Error("FETCH messages are not supported");
79+
static async #decode(r: Reader): Promise<Fetch> {
80+
const requestId = await r.u62();
81+
const subscriberPriority = await r.u8();
82+
const groupOrder = await r.u8();
83+
const fetchType = await r.u53();
84+
85+
if (fetchType === FetchType.Standalone) {
86+
const namespace = await Namespace.decode(r);
87+
const track = await r.string();
88+
const start = await Location.decode(r);
89+
const end = await Location.decode(r);
90+
await Parameters.decode(r); // ignore parameters
91+
return new Fetch(requestId, subscriberPriority, groupOrder, {
92+
type: FetchType.Standalone,
93+
namespace,
94+
track,
95+
start,
96+
end,
97+
});
98+
}
99+
100+
if (fetchType === FetchType.Relative) {
101+
const subscribeId = await r.u62();
102+
const groupOffset = await r.u53();
103+
await Parameters.decode(r); // ignore parameters
104+
return new Fetch(requestId, subscriberPriority, groupOrder, {
105+
type: FetchType.Relative,
106+
subscribeId,
107+
groupOffset,
108+
});
109+
}
110+
111+
if (fetchType === FetchType.Absolute) {
112+
const subscribeId = await r.u62();
113+
const groupId = await r.u53();
114+
await Parameters.decode(r); // ignore parameters
115+
return new Fetch(requestId, subscriberPriority, groupOrder, {
116+
type: FetchType.Absolute,
117+
subscribeId,
118+
groupId,
119+
});
120+
}
121+
122+
throw new Error(`unknown fetch type: ${fetchType}`);
54123
}
55124
}
56125

57126
export class FetchOk {
58127
static id = 0x18;
59128

60129
requestId: bigint;
130+
groupOrder: number;
131+
endOfTrack: boolean;
132+
endLocation: Location;
61133

62-
constructor(requestId: bigint) {
134+
constructor(requestId: bigint, groupOrder: number, endOfTrack: boolean, endLocation: Location) {
63135
this.requestId = requestId;
136+
this.groupOrder = groupOrder;
137+
this.endOfTrack = endOfTrack;
138+
this.endLocation = endLocation;
64139
}
65140

66-
async #encode(_w: Writer): Promise<void> {
67-
throw new Error("FETCH_OK messages are not supported");
141+
async #encode(w: Writer): Promise<void> {
142+
await w.u62(this.requestId);
143+
await w.u8(this.groupOrder);
144+
await w.bool(this.endOfTrack);
145+
this.endLocation.encode(w);
146+
await w.u53(0); // no parameters
68147
}
69148

70149
async encode(w: Writer): Promise<void> {
@@ -75,8 +154,13 @@ export class FetchOk {
75154
return Message.decode(r, FetchOk.#decode);
76155
}
77156

78-
static async #decode(_r: Reader): Promise<FetchOk> {
79-
throw new Error("FETCH_OK messages are not supported");
157+
static async #decode(r: Reader): Promise<FetchOk> {
158+
const requestId = await r.u62();
159+
const groupOrder = await r.u8();
160+
const endOfTrack = await r.bool();
161+
const endLocation = await Location.decode(r);
162+
await Parameters.decode(r); // ignore parameters
163+
return new FetchOk(requestId, groupOrder, endOfTrack, endLocation);
80164
}
81165
}
82166

@@ -93,8 +177,10 @@ export class FetchError {
93177
this.reasonPhrase = reasonPhrase;
94178
}
95179

96-
async #encode(_w: Writer): Promise<void> {
97-
throw new Error("FETCH_ERROR messages are not supported");
180+
async #encode(w: Writer): Promise<void> {
181+
await w.u62(this.requestId);
182+
await w.u53(this.errorCode);
183+
await w.string(this.reasonPhrase);
98184
}
99185

100186
async encode(w: Writer): Promise<void> {
@@ -105,8 +191,11 @@ export class FetchError {
105191
return Message.decode(r, FetchError.#decode);
106192
}
107193

108-
static async #decode(_r: Reader): Promise<FetchError> {
109-
throw new Error("FETCH_ERROR messages are not supported");
194+
static async #decode(r: Reader): Promise<FetchError> {
195+
const requestId = await r.u62();
196+
const errorCode = await r.u53();
197+
const reasonPhrase = await r.string();
198+
return new FetchError(requestId, errorCode, reasonPhrase);
110199
}
111200
}
112201

@@ -119,8 +208,8 @@ export class FetchCancel {
119208
this.requestId = requestId;
120209
}
121210

122-
async #encode(_w: Writer): Promise<void> {
123-
throw new Error("FETCH_CANCEL messages are not supported");
211+
async #encode(w: Writer): Promise<void> {
212+
await w.u62(this.requestId);
124213
}
125214

126215
async encode(w: Writer): Promise<void> {
@@ -131,7 +220,8 @@ export class FetchCancel {
131220
return Message.decode(r, FetchCancel.#decode);
132221
}
133222

134-
static async #decode(_r: Reader): Promise<FetchCancel> {
135-
throw new Error("FETCH_CANCEL messages are not supported");
223+
static async #decode(r: Reader): Promise<FetchCancel> {
224+
const requestId = await r.u62();
225+
return new FetchCancel(requestId);
136226
}
137227
}

js/moq/src/ietf/frame.ts

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
import type { Reader, Writer } from "../stream.ts";
2+
import type { GroupFlags } from "./group.ts";
3+
4+
const GROUP_END = 0x03;
5+
6+
export class Frame {
7+
// undefined means end of group
8+
payload?: Uint8Array;
9+
10+
constructor(payload?: Uint8Array) {
11+
this.payload = payload;
12+
}
13+
14+
async encode(w: Writer, flags: GroupFlags): Promise<void> {
15+
await w.u53(0); // id_delta = 0
16+
17+
if (flags.hasExtensions) {
18+
await w.u53(0); // extensions length = 0
19+
}
20+
21+
if (this.payload !== undefined) {
22+
await w.u53(this.payload.byteLength);
23+
24+
if (this.payload.byteLength === 0) {
25+
await w.u53(0); // status = normal
26+
} else {
27+
await w.write(this.payload);
28+
}
29+
} else {
30+
await w.u53(0); // length = 0
31+
await w.u53(GROUP_END);
32+
}
33+
}
34+
35+
static async decode(r: Reader, flags: GroupFlags): Promise<Frame> {
36+
console.debug("reading frame delta");
37+
const delta = await r.u53();
38+
console.debug("read frame delta", delta);
39+
if (delta !== 0) {
40+
console.warn(`object ID delta is not supported, ignoring: ${delta}`);
41+
}
42+
43+
if (flags.hasExtensions) {
44+
const extensionsLength = await r.u53();
45+
// We don't care about extensions
46+
await r.read(extensionsLength);
47+
}
48+
49+
const payloadLength = await r.u53();
50+
51+
if (payloadLength > 0) {
52+
const payload = await r.read(payloadLength);
53+
return new Frame(payload);
54+
}
55+
56+
const status = await r.u53();
57+
58+
if (flags.hasEnd) {
59+
// Empty frame
60+
if (status === 0) return new Frame(new Uint8Array(0));
61+
} else if (status === 0 || status === GROUP_END) {
62+
// TODO status === 0 should be an empty frame, but moq-rs seems to be sending it incorrectly on group end.
63+
return new Frame();
64+
}
65+
66+
throw new Error(`Unsupported object status: ${status}`);
67+
}
68+
}

0 commit comments

Comments
 (0)