-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpubsub.js
More file actions
168 lines (150 loc) · 4.81 KB
/
Copy pathpubsub.js
File metadata and controls
168 lines (150 loc) · 4.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
/**
* @fileoverview Pub/Sub System for api-ape Server
*
* Manages channel subscriptions and message publishing.
*
* @module server/lib/broadcast/pubsub
* @see {@link module:server/lib/broadcast} for the main broadcast module
*/
const { apeLog } = require("../../../utils/apeLogger");
const { _clients } = require("./clients");
/**
* Per-publish log verbosity. By default the pub/sub layer emits ONE
* line per publish ("📣 Published to …"), which is useful during
* channel-wiring debugging but catastrophic once a real producer (e.g.
* a multi-GB Ollama model pull streaming NDJSON progress frames) runs
* in a terminal that also hosts another api-ape server. Gate the log
* behind an opt-in env var so the default posture is quiet and only
* operators who need the trace pay the noise cost.
*
* Subscribe/unsubscribe logs stay on — those are one-shot lifecycle
* events, not per-message chatter, and are valuable at session-start
* debugging even in production.
*
* Evaluated lazily (per call) instead of at module load because test
* harnesses flip the env mid-run; a one-time boot read would stick to
* whatever the first process saw.
*
* @returns {boolean} True when APIAPE_PUBSUB_LOG is set to a truthy value
* (anything other than unset/empty/"0"/"false"/"off", case-insensitive).
* @private
*/
function isPublishLoggingEnabled() {
const v = process.env.APIAPE_PUBSUB_LOG;
if (v === undefined) return false;
const lowered = String(v).toLowerCase();
return !(lowered === "" || lowered === "0" || lowered === "false" || lowered === "off");
}
/**
* Subscription tracking for pub/sub channels
* Maps channel name to Set of subscribed clientIds
* @type {Map<string, Set<string>>}
* @private
*/
const _subscriptions = new Map();
/**
* Reverse lookup: client to their subscribed channels (for cleanup on disconnect)
* @type {Map<string, Set<string>>}
* @private
*/
const _clientSubscriptions = new Map();
/**
* Last published message per channel (sent to new subscribers)
* @type {Map<string, any>}
* @private
*/
const _lastMessages = new Map();
/**
* Subscribe a client to a channel
*
* @param {string} clientId - The client's unique identifier
* @param {string} channel - The channel name to subscribe to
* @returns {{channel: string, lastMessage: any}|null} Last message if exists
*/
function subscribe(clientId, channel) {
if (!_subscriptions.has(channel)) {
_subscriptions.set(channel, new Set());
}
_subscriptions.get(channel).add(clientId);
if (!_clientSubscriptions.has(clientId)) {
_clientSubscriptions.set(clientId, new Set());
}
_clientSubscriptions.get(clientId).add(channel);
apeLog.log(`Client ${clientId} subscribed to "${channel}"`);
if (_lastMessages.has(channel)) {
return { channel, lastMessage: _lastMessages.get(channel) };
}
return null;
}
/**
* Unsubscribe a client from a channel
*
* @param {string} clientId - The client's unique identifier
* @param {string} channel - The channel name to unsubscribe from
*/
function unsubscribe(clientId, channel) {
const subscribers = _subscriptions.get(channel);
if (subscribers) {
subscribers.delete(clientId);
if (subscribers.size === 0) {
_subscriptions.delete(channel);
}
}
const clientChannels = _clientSubscriptions.get(clientId);
if (clientChannels) {
clientChannels.delete(channel);
if (clientChannels.size === 0) {
_clientSubscriptions.delete(clientId);
}
}
apeLog.log(`Client ${clientId} unsubscribed from "${channel}"`);
}
/**
* Publish a message to all subscribers of a channel
*
* @param {string} channel - The channel name (used as message type)
* @param {any} data - Data payload to send
*/
function publish(channel, data) {
_lastMessages.set(channel, data);
const subscribers = _subscriptions.get(channel);
if (!subscribers || subscribers.size === 0) {
if (isPublishLoggingEnabled()) {
apeLog.log(`Published to "${channel}" (0 subscribers)`);
}
return;
}
if (isPublishLoggingEnabled()) {
apeLog.log(`Publishing to "${channel}" (${subscribers.size} subscribers)`);
}
subscribers.forEach((clientId) => {
const wrapper = _clients.get(clientId);
if (wrapper) {
wrapper.send(channel, data);
}
});
}
/**
* Clean up all subscriptions for a disconnected client
*
* @param {string} clientId - The client's unique identifier
*/
function cleanupClientSubscriptions(clientId) {
const clientChannels = _clientSubscriptions.get(clientId);
if (clientChannels) {
clientChannels.forEach((channel) => {
const subscribers = _subscriptions.get(channel);
subscribers.delete(clientId);
if (subscribers.size === 0) {
_subscriptions.delete(channel);
}
});
_clientSubscriptions.delete(clientId);
}
}
module.exports = {
subscribe,
unsubscribe,
publish,
cleanupClientSubscriptions,
};