Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 236 additions & 0 deletions server/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,234 @@ use crate::engine::session_log::{SessionLog, SessionLogEntry};
use wasmparser::Validator;

pub const DEFAULT_EXECUTION_TIMEOUT_SECS: u64 = 30;

// ── Web API polyfills ────────────────────────────────────────────────────
//
// deno_core provides a bare V8 runtime without Web APIs like Blob,
// TextEncoder, etc. Libraries such as isomorphic-git expect these to exist.

const WEB_POLYFILLS_JS: &str = r#"
(function() {
// ── Buffer polyfill ─────────────────────────────────────────────────
// Node.js Buffer is expected by many npm packages (isomorphic-git, etc.).
// This is a minimal polyfill wrapping Uint8Array.
if (typeof globalThis.Buffer === "undefined") {
class Buffer extends Uint8Array {
static alloc(size, fill = 0) {
const buf = new Buffer(size);
if (fill !== 0) buf.fill(fill);
return buf;
}
static allocUnsafe(size) { return new Buffer(size); }
static from(value, encodingOrOffset, length) {
if (typeof value === "string") {
const encoding = (encodingOrOffset || "utf-8").toLowerCase().replace("-", "");
if (encoding === "base64") {
const bin = atob(value);
const buf = new Buffer(bin.length);
for (let i = 0; i < bin.length; i++) buf[i] = bin.charCodeAt(i);
return buf;
}
if (encoding === "hex") {
const buf = new Buffer(value.length / 2);
for (let i = 0; i < buf.length; i++) buf[i] = parseInt(value.substr(i * 2, 2), 16);
return buf;
}
// utf8
const encoded = new TextEncoder().encode(value);
const buf = new Buffer(encoded.buffer, encoded.byteOffset, encoded.byteLength);
return buf;
}
if (value instanceof ArrayBuffer) {
return new Buffer(value, encodingOrOffset || 0, length !== undefined ? length : value.byteLength - (encodingOrOffset || 0));
}
if (ArrayBuffer.isView(value)) {
return new Buffer(value.buffer, value.byteOffset, value.byteLength);
}
if (Array.isArray(value)) {
const buf = new Buffer(value.length);
for (let i = 0; i < value.length; i++) buf[i] = value[i];
return buf;
}
return new Buffer(value);
}
static isBuffer(obj) { return obj instanceof Buffer; }
static isEncoding(enc) { return ["utf8","utf-8","ascii","hex","base64","binary","latin1"].includes((enc||"").toLowerCase()); }
static concat(list, totalLength) {
if (totalLength === undefined) totalLength = list.reduce((s, b) => s + b.length, 0);
const result = Buffer.alloc(totalLength);
let offset = 0;
for (const buf of list) {
result.set(buf, offset);
offset += buf.length;
if (offset >= totalLength) break;
}
return result;
}
static byteLength(string, encoding) {
if (typeof string !== "string") return string.length || string.byteLength || 0;
return new TextEncoder().encode(string).byteLength;
}
toString(encoding = "utf-8", start = 0, end = this.length) {
const slice = this.subarray(start, end);
const enc = encoding.toLowerCase().replace("-", "");
if (enc === "base64") {
let bin = "";
for (let i = 0; i < slice.length; i++) bin += String.fromCharCode(slice[i]);
return btoa(bin);
}
if (enc === "hex") {
let hex = "";
for (let i = 0; i < slice.length; i++) hex += slice[i].toString(16).padStart(2, "0");
return hex;
}
return new TextDecoder(encoding === "binary" || encoding === "latin1" ? "latin1" : encoding).decode(slice);
}
write(string, offset = 0, length, encoding = "utf-8") {
const encoded = new TextEncoder().encode(string);
const len = Math.min(encoded.length, length !== undefined ? length : this.length - offset, this.length - offset);
this.set(encoded.subarray(0, len), offset);
return len;
}
copy(target, targetStart = 0, sourceStart = 0, sourceEnd = this.length) {
const sub = this.subarray(sourceStart, sourceEnd);
target.set(sub, targetStart);
return sub.length;
}
slice(start, end) { return Buffer.from(this.subarray(start, end)); }
readUInt8(offset = 0) { return this[offset]; }
readUInt16BE(offset = 0) { return (this[offset] << 8) | this[offset + 1]; }
readUInt32BE(offset = 0) { return ((this[offset] << 24) | (this[offset+1] << 16) | (this[offset+2] << 8) | this[offset+3]) >>> 0; }
readUInt16LE(offset = 0) { return this[offset] | (this[offset + 1] << 8); }
readUInt32LE(offset = 0) { return (this[offset] | (this[offset+1] << 8) | (this[offset+2] << 16) | (this[offset+3] << 24)) >>> 0; }
readInt8(offset = 0) { const v = this[offset]; return v > 127 ? v - 256 : v; }
writeUInt8(value, offset = 0) { this[offset] = value & 0xff; return offset + 1; }
writeUInt16BE(value, offset = 0) { this[offset] = (value >> 8) & 0xff; this[offset+1] = value & 0xff; return offset + 2; }
writeUInt32BE(value, offset = 0) { this[offset] = (value >> 24) & 0xff; this[offset+1] = (value >> 16) & 0xff; this[offset+2] = (value >> 8) & 0xff; this[offset+3] = value & 0xff; return offset + 4; }
equals(other) {
if (this.length !== other.length) return false;
for (let i = 0; i < this.length; i++) if (this[i] !== other[i]) return false;
return true;
}
compare(other) {
const len = Math.min(this.length, other.length);
for (let i = 0; i < len; i++) {
if (this[i] < other[i]) return -1;
if (this[i] > other[i]) return 1;
}
if (this.length < other.length) return -1;
if (this.length > other.length) return 1;
return 0;
}
toJSON() { return { type: "Buffer", data: Array.from(this) }; }
}
globalThis.Buffer = Buffer;
}

// ── btoa / atob ───────────────────────────────────────────────────
// Required by esm.sh's node/buffer.mjs shim which does btoa.bind(globalThis).
if (typeof globalThis.btoa === "undefined") {
const chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
globalThis.btoa = function(input) {
let str = String(input), output = "";
for (let i = 0; i < str.length; ) {
const a = str.charCodeAt(i++), b = str.charCodeAt(i++), c = str.charCodeAt(i++);
const triplet = (a << 16) | ((b || 0) << 8) | (c || 0);
output += chars[(triplet >> 18) & 63] + chars[(triplet >> 12) & 63];
output += isNaN(b) ? "=" : chars[(triplet >> 6) & 63];
output += isNaN(c) ? "=" : chars[triplet & 63];
}
return output;
};
}
if (typeof globalThis.atob === "undefined") {
const chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=";
globalThis.atob = function(input) {
let str = String(input).replace(/=+$/, ""), output = "";
for (let i = 0; i < str.length; ) {
const a = chars.indexOf(str[i++]), b = chars.indexOf(str[i++]);
const c = chars.indexOf(str[i++]), d = chars.indexOf(str[i++]);
const triplet = (a << 18) | (b << 12) | (c << 6) | d;
output += String.fromCharCode((triplet >> 16) & 255);
if (c !== 64) output += String.fromCharCode((triplet >> 8) & 255);
if (d !== 64) output += String.fromCharCode(triplet & 255);
}
return output;
};
}

// ── process stub ────────────────────────────────────────────────────
// Some npm packages check process.env or process.browser.
if (typeof globalThis.process === "undefined") {
globalThis.process = { env: {}, browser: true, version: "", versions: {} };
}

// ── Blob polyfill ───────────────────────────────────────────────────
if (typeof globalThis.Blob === "undefined") {
class Blob {
#parts;
#type;
constructor(parts = [], options = {}) {
this.#type = options.type || "";
const buffers = [];
for (const part of parts) {
if (part instanceof Blob) {
buffers.push(part.#parts);
} else if (part instanceof ArrayBuffer) {
buffers.push(new Uint8Array(part));
} else if (ArrayBuffer.isView(part)) {
buffers.push(new Uint8Array(part.buffer, part.byteOffset, part.byteLength));
} else {
buffers.push(new TextEncoder().encode(String(part)));
}
}
let totalLen = 0;
const arrays = [];
const flatten = (item) => {
if (item instanceof Uint8Array) { arrays.push(item); totalLen += item.byteLength; }
else if (Array.isArray(item)) { for (const sub of item) flatten(sub); }
};
for (const b of buffers) flatten(b);
const merged = new Uint8Array(totalLen);
let offset = 0;
for (const arr of arrays) { merged.set(arr, offset); offset += arr.byteLength; }
this.#parts = merged;
}
get size() { return this.#parts.byteLength; }
get type() { return this.#type; }
async arrayBuffer() { return this.#parts.buffer.slice(this.#parts.byteOffset, this.#parts.byteOffset + this.#parts.byteLength); }
async text() { return new TextDecoder().decode(this.#parts); }
slice(start = 0, end = this.size, type = "") {
const sliced = this.#parts.slice(start, end);
const b = new Blob([], { type });
b.#parts = sliced;
return b;
}
stream() {
const data = this.#parts;
return new ReadableStream({ start(c) { c.enqueue(data); c.close(); } });
}
}
globalThis.Blob = Blob;
}
})();
"#;

/// Inject Web API polyfills (Blob, etc.) into the runtime.
fn inject_web_polyfills(runtime: &mut JsRuntime) -> Result<(), String> {
runtime
.execute_script("<web-polyfills>", WEB_POLYFILLS_JS.to_string())
.map_err(|e| format!("Failed to install web polyfills: {}", e))?;
Ok(())
}

/// Overload for JsRuntimeForSnapshot (stateful mode).
fn inject_web_polyfills_snapshot(runtime: &mut JsRuntimeForSnapshot) -> Result<(), String> {
runtime
.execute_script("<web-polyfills>", WEB_POLYFILLS_JS.to_string())
.map_err(|e| format!("Failed to install web polyfills: {}", e))?;
Ok(())
}
/// Default maximum native memory (bytes) a WASM module may declare when no
/// per-module limit is set. 16 MiB.
pub const DEFAULT_WASM_MAX_BYTES: usize = 16 * 1024 * 1024;
Expand Down Expand Up @@ -834,6 +1062,10 @@ pub fn execute_stateless(
if let Err(e) = console::neutralize_dangerous_ops(&mut runtime) {
return Err(e);
}
// Inject Web API polyfills (Blob, etc.).
if let Err(e) = inject_web_polyfills(&mut runtime) {
return Err(e);
}
// Inject fetch() JS wrapper if OPA is configured.
if fetch_config.is_some() {
if let Err(e) = fetch::inject_fetch(&mut runtime) {
Expand Down Expand Up @@ -1005,6 +1237,10 @@ pub fn execute_stateful(
if let Err(e) = console::neutralize_dangerous_ops(&mut runtime) {
return Err(e);
}
// Inject Web API polyfills (Blob, etc.).
if let Err(e) = inject_web_polyfills_snapshot(&mut runtime) {
return Err(e);
}
// Inject fetch() JS wrapper if OPA is configured.
if fetch_config.is_some() {
if let Err(e) = fetch::inject_fetch(&mut runtime) {
Expand Down
38 changes: 31 additions & 7 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use engine::heap_storage::{AnyHeapStorage, S3HeapStorage, WriteThroughCacheHeapS
use engine::heap_tags::HeapTagStore;
use engine::session_log::SessionLog;
use mcp::{McpService, StatelessMcpService};
use session::{SessionVerifier, JwksKeyStore};
use session::{SessionVerifier, JwksKeyStore, OAuthTokenVerifier};
use cluster::{ClusterConfig, ClusterNode};

fn default_max_concurrent() -> usize {
Expand Down Expand Up @@ -55,6 +55,18 @@ struct Cli {
#[arg(long, env = "JWKS_URL")]
jwks_url: Option<String>,

/// OAuth userinfo endpoint URL for validating opaque bearer tokens.
/// When set, Authorization: Bearer tokens are validated by calling this URL
/// with the token. The response claims are merged into mcp_headers for OPA
/// policy evaluation. Example: https://api.github.com/user
#[arg(long, env = "OAUTH_USERINFO_URL")]
oauth_userinfo_url: Option<String>,

/// JSON key in the userinfo response to use as the `sub` claim.
/// Defaults to "id" (appropriate for GitHub). Use "sub" for OIDC-compliant providers.
#[arg(long, default_value = "id")]
oauth_sub_key: String,

/// HTTP port using Streamable HTTP transport (MCP 2025-03-26+, load-balanceable)
#[arg(long, conflicts_with = "sse_port")]
http_port: Option<u16>,
Expand Down Expand Up @@ -494,37 +506,49 @@ async fn main() -> Result<()> {
None
};

// ── Build OAuth token verifier (if --oauth-userinfo-url) ─────────
let oauth_verifier: Option<Arc<OAuthTokenVerifier>> = if let Some(ref url) = cli.oauth_userinfo_url {
tracing::info!("OAuth token verification enabled via {}", url);
Some(Arc::new(OAuthTokenVerifier::new(url.clone(), Some(cli.oauth_sub_key.clone()))))
} else {
None
};

// ── Start transport ─────────────────────────────────────────────────
if let Some(port) = cli.http_port {
tracing::info!("Starting Streamable HTTP transport on port {}", port);
if engine.is_stateful() {
let verifier = session_verifier.clone();
start_streamable_http(engine, port, move |e| McpService::new(e, verifier.clone())).await?;
let oauth = oauth_verifier.clone();
start_streamable_http(engine, port, move |e| McpService::new(e, verifier.clone(), oauth.clone())).await?;
} else {
let verifier = session_verifier.clone();
start_streamable_http(engine, port, move |e| StatelessMcpService::new(e, verifier.clone())).await?;
let oauth = oauth_verifier.clone();
start_streamable_http(engine, port, move |e| StatelessMcpService::new(e, verifier.clone(), oauth.clone())).await?;
}
} else if let Some(port) = cli.sse_port {
tracing::info!("Starting SSE transport on port {}", port);
if engine.is_stateful() {
let verifier = session_verifier.clone();
start_sse_server(engine, port, move |e| McpService::new(e, verifier.clone())).await?;
let oauth = oauth_verifier.clone();
start_sse_server(engine, port, move |e| McpService::new(e, verifier.clone(), oauth.clone())).await?;
} else {
let verifier = session_verifier.clone();
start_sse_server(engine, port, move |e| StatelessMcpService::new(e, verifier.clone())).await?;
let oauth = oauth_verifier.clone();
start_sse_server(engine, port, move |e| StatelessMcpService::new(e, verifier.clone(), oauth.clone())).await?;
}
} else {
tracing::info!("Starting stdio transport");
if engine.is_stateful() {
let service = McpService::new(engine, None)
let service = McpService::new(engine, None, None)
.serve(stdio())
.await
.inspect_err(|e| {
tracing::error!("serving error: {:?}", e);
})?;
service.waiting().await?;
} else {
let service = StatelessMcpService::new(engine, None)
let service = StatelessMcpService::new(engine, None, None)
.serve(stdio())
.await
.inspect_err(|e| {
Expand Down
Loading
Loading