Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/sacp-conductor/src/conductor/mcp_bridge/actor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures::{SinkExt as _, StreamExt as _, channel::mpsc};
use sacp::mcp::{McpClientToServer, McpServerEnd, McpServerToClient};
use sacp::mcp::{McpClientToServer, McpServerPeer, McpServerToClient};
use sacp::schema::McpDisconnectNotification;
use sacp::{Component, DynComponent, MessageCx};
use tracing::info;
Expand Down Expand Up @@ -67,7 +67,7 @@ impl McpBridgeConnectionActor {
.run_until(async move |mcp_client_cx| {
let mut to_mcp_client_rx = to_mcp_client_rx;
while let Some(message) = to_mcp_client_rx.next().await {
mcp_client_cx.send_proxied_message_to(McpServerEnd, message)?;
mcp_client_cx.send_proxied_message_to(McpServerPeer, message)?;
}
Ok(())
})
Expand Down
309 changes: 309 additions & 0 deletions src/sacp-conductor/tests/standalone_mcp_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
//! Tests for running McpServer as a standalone MCP server (not part of ACP).
//!
//! These tests verify that `McpServer` can be used directly with MCP clients
//! via the `Component<McpServerToClient>` implementation.

use rmcp::{ClientHandler, ServiceExt, model::ClientInfo};
use sacp::{
ByteStreams, Component, mcp::McpServerToClient, mcp_server::McpServer, util::run_until,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};

/// Input for the echo tool
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct EchoInput {
message: String,
}

/// Input for the add tool
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct AddInput {
a: i32,
b: i32,
}

/// Output for the add tool (structured output)
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
struct AddOutput {
result: i32,
}

/// Create a test MCP server with echo and add tools
fn create_test_server() -> McpServer<McpServerToClient, impl sacp::JrResponder<McpServerToClient>> {
McpServer::builder("test-server")
.instructions("A test MCP server")
.tool_fn(
"echo",
"Echo a message back",
async |input: EchoInput, _cx| Ok(format!("Echo: {}", input.message)),
sacp::tool_fn!(),
)
.tool_fn(
"add",
"Add two numbers",
async |input: AddInput, _cx| {
Ok(AddOutput {
result: input.a + input.b,
})
},
sacp::tool_fn!(),
)
.build()
}

/// Minimal client handler for rmcp
#[derive(Debug, Clone, Default)]
struct MinimalClientHandler;

impl ClientHandler for MinimalClientHandler {
fn get_info(&self) -> ClientInfo {
ClientInfo::default()
}
}

#[tokio::test]
async fn test_standalone_server_list_tools() -> Result<(), sacp::Error> {
// Create duplex streams for communication
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);

// Create the MCP server
let server = create_test_server();

// Wrap client side as ByteStreams (this is what the MCP server will talk to)
let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());

run_until(
Component::<McpServerToClient>::serve(server, client_as_component),
async move {
// Create rmcp client on the server side of the duplex (the "other end")
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(sacp::util::internal_error)?;

// List tools
let tools_result = client
.list_tools(None)
.await
.map_err(sacp::util::internal_error)?;

// Verify we got both tools
assert_eq!(tools_result.tools.len(), 2);

let tool_names: Vec<&str> =
tools_result.tools.iter().map(|t| t.name.as_ref()).collect();
assert!(tool_names.contains(&"echo"));
assert!(tool_names.contains(&"add"));

// Clean up
client.cancel().await.map_err(sacp::util::internal_error)?;
Ok(())
},
)
.await
}

#[tokio::test]
async fn test_standalone_server_call_echo_tool() -> Result<(), sacp::Error> {
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);

let server = create_test_server();
let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());

run_until(
Component::<McpServerToClient>::serve(server, client_as_component),
async move {
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(sacp::util::internal_error)?;

// Call the echo tool
let result = client
.call_tool(rmcp::model::CallToolRequestParam {
name: "echo".into(),
arguments: Some(
serde_json::json!({ "message": "hello world" })
.as_object()
.unwrap()
.clone(),
),
})
.await
.map_err(sacp::util::internal_error)?;

// Verify the result
let text = result
.content
.first()
.and_then(|c| c.raw.as_text())
.map(|t| t.text.as_str())
.expect("Expected text content");

assert_eq!(text, r#""Echo: hello world""#, "Unexpected echo response");

client.cancel().await.map_err(sacp::util::internal_error)?;
Ok(())
},
)
.await
}

#[tokio::test]
async fn test_standalone_server_call_add_tool() -> Result<(), sacp::Error> {
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);

let server = create_test_server();
let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());

run_until(
Component::<McpServerToClient>::serve(server, client_as_component),
async move {
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(sacp::util::internal_error)?;

// Call the add tool
let result = client
.call_tool(rmcp::model::CallToolRequestParam {
name: "add".into(),
arguments: Some(
serde_json::json!({ "a": 5, "b": 3 })
.as_object()
.unwrap()
.clone(),
),
})
.await
.map_err(sacp::util::internal_error)?;

// The add tool returns structured output (AddOutput)
// Check that we get the expected result
assert!(!result.is_error.unwrap_or(false));

// Structured output should have the result
let content = result.content.first().expect("Expected content");
let text = content.raw.as_text().expect("Expected text content");
assert!(
text.text.contains("8") || text.text.contains("result"),
"Expected result to contain 8, got: {}",
text.text
);

client.cancel().await.map_err(sacp::util::internal_error)?;
Ok(())
},
)
.await
}

#[tokio::test]
async fn test_standalone_server_tool_not_found() -> Result<(), sacp::Error> {
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);

let server = create_test_server();
let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());

run_until(
Component::<McpServerToClient>::serve(server, client_as_component),
async move {
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(sacp::util::internal_error)?;

// Call a non-existent tool
let result = client
.call_tool(rmcp::model::CallToolRequestParam {
name: "nonexistent".into(),
arguments: None,
})
.await;

// Should get an error
assert!(result.is_err(), "Expected error for non-existent tool");

client.cancel().await.map_err(sacp::util::internal_error)?;
Ok(())
},
)
.await
}

#[tokio::test]
async fn test_standalone_server_with_disabled_tools() -> Result<(), sacp::Error> {
let (server_stream, client_stream) = tokio::io::duplex(8192);
let (server_read, server_write) = tokio::io::split(server_stream);
let (client_read, client_write) = tokio::io::split(client_stream);

// Create server with echo tool disabled
let server = McpServer::builder("test-server")
.tool_fn(
"echo",
"Echo a message",
async |input: EchoInput, _cx| Ok(format!("Echo: {}", input.message)),
sacp::tool_fn!(),
)
.tool_fn(
"add",
"Add two numbers",
async |input: AddInput, _cx| {
Ok(AddOutput {
result: input.a + input.b,
})
},
sacp::tool_fn!(),
)
.disable_tool("echo")?
.build();

let client_as_component = ByteStreams::new(client_write.compat_write(), client_read.compat());

run_until(
Component::<McpServerToClient>::serve(server, client_as_component),
async move {
let client = MinimalClientHandler
.serve((server_read, server_write))
.await
.map_err(sacp::util::internal_error)?;

// List tools - should only show "add"
let tools_result = client
.list_tools(None)
.await
.map_err(sacp::util::internal_error)?;
assert_eq!(tools_result.tools.len(), 1);
assert_eq!(tools_result.tools[0].name.as_ref(), "add");

// Calling disabled tool should fail
let result = client
.call_tool(rmcp::model::CallToolRequestParam {
name: "echo".into(),
arguments: Some(
serde_json::json!({ "message": "test" })
.as_object()
.unwrap()
.clone(),
),
})
.await;

assert!(result.is_err(), "Expected error for disabled tool");

client.cancel().await.map_err(sacp::util::internal_error)?;
Ok(())
},
)
.await
}
Loading