Skip to content

Commit d09d4a7

Browse files
authored
test: unit test for start_rpc (#6493)
1 parent b2c3068 commit d09d4a7

File tree

6 files changed

+238
-103
lines changed

6 files changed

+238
-103
lines changed

src/daemon/context.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,12 +334,13 @@ fn handle_admin_token(
334334
ki.private_key(),
335335
token_exp,
336336
)?;
337-
info!("Admin token: {token}");
338337
let default_token_path = config.client.default_rpc_token_path();
339338
if let Err(e) =
340339
crate::utils::io::write_new_sensitive_file(token.as_bytes(), &default_token_path)
341340
{
342341
tracing::warn!("Failed to save the default admin token file: {e}");
342+
} else {
343+
info!("Admin token is saved to {}", default_token_path.display());
343344
}
344345
if let Some(path) = opts.save_token.as_ref() {
345346
if let Some(dir) = path.parent()
@@ -354,6 +355,7 @@ fn handle_admin_token(
354355
}
355356
std::fs::write(path, &token)
356357
.with_context(|| format!("Failed to save admin token to {}", path.display()))?;
358+
info!("Admin token is saved to {}", path.display());
357359
}
358360

359361
Ok(token)

src/daemon/mod.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ pub async fn start_interruptable(opts: CliOpts, config: Config) -> anyhow::Resul
8080
let start_time = chrono::Utc::now();
8181
let mut terminate = signal(SignalKind::terminate())?;
8282
let (shutdown_send, mut shutdown_recv) = mpsc::channel(1);
83-
8483
let result = tokio::select! {
8584
ret = start(start_time, opts, config, shutdown_send) => ret,
8685
_ = ctrl_c() => {
@@ -375,6 +374,7 @@ fn maybe_start_rpc_service(
375374
chain_follower: &ChainFollower<DbType>,
376375
start_time: chrono::DateTime<chrono::Utc>,
377376
shutdown: mpsc::Sender<()>,
377+
rpc_stop_handle: jsonrpsee::server::StopHandle,
378378
ctx: &AppContext,
379379
) -> anyhow::Result<()> {
380380
if config.client.enable_rpc {
@@ -402,6 +402,12 @@ fn maybe_start_rpc_service(
402402
let snapshot_progress_tracker = ctx.snapshot_progress_tracker.clone();
403403
let msgs_in_tipset = Arc::new(crate::chain::MsgsInTipsetCache::default());
404404
async move {
405+
let rpc_listener = tokio::net::TcpListener::bind(rpc_address)
406+
.await
407+
.map_err(|e| {
408+
anyhow::anyhow!("Unable to listen on RPC endpoint {rpc_address}: {e}")
409+
})
410+
.unwrap();
405411
start_rpc(
406412
RPCState {
407413
state_manager,
@@ -417,7 +423,8 @@ fn maybe_start_rpc_service(
417423
tipset_send,
418424
snapshot_progress_tracker,
419425
},
420-
rpc_address,
426+
rpc_listener,
427+
rpc_stop_handle,
421428
filter_list,
422429
)
423430
.await
@@ -559,11 +566,16 @@ pub(super) async fn start(
559566
});
560567
}
561568
loop {
569+
let (rpc_stop_handle, rpc_server_handle) = jsonrpsee::server::stop_channel();
562570
tokio::select! {
563571
_ = snap_gc_reboot_rx.recv_async() => {
572+
// gracefully shutdown RPC server
573+
if let Err(e) = rpc_server_handle.stop() {
574+
tracing::warn!("failed to stop RPC server: {e}");
575+
}
564576
snap_gc.cleanup_before_reboot().await;
565577
}
566-
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), |ctx, sync_status| {
578+
result = start_services(start_time, &opts, config.clone(), shutdown_send.clone(), rpc_stop_handle, |ctx, sync_status| {
567579
snap_gc.set_db(ctx.db.clone());
568580
snap_gc.set_sync_status(sync_status);
569581
snap_gc.set_car_db_head_epoch(ctx.db.heaviest_tipset().map(|ts|ts.epoch()).unwrap_or_default());
@@ -579,6 +591,7 @@ pub(super) async fn start_services(
579591
opts: &CliOpts,
580592
mut config: Config,
581593
shutdown_send: mpsc::Sender<()>,
594+
rpc_stop_handle: jsonrpsee::server::StopHandle,
582595
on_app_context_and_db_initialized: impl FnOnce(&AppContext, SyncStatus),
583596
) -> anyhow::Result<()> {
584597
// Cleanup the collector prometheus metrics registry on start
@@ -608,6 +621,7 @@ pub(super) async fn start_services(
608621
&chain_follower,
609622
start_time,
610623
shutdown_send.clone(),
624+
rpc_stop_handle,
611625
&ctx,
612626
)?;
613627

src/rpc/methods/auth.rs

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
// Copyright 2019-2026 ChainSafe Systems
22
// SPDX-License-Identifier: Apache-2.0, MIT
33

4-
use crate::auth::*;
5-
use crate::lotus_json::lotus_json_with_self;
6-
use crate::rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError};
4+
use crate::{
5+
KeyStore,
6+
auth::*,
7+
lotus_json::lotus_json_with_self,
8+
rpc::{ApiPaths, Ctx, Permission, RpcMethod, ServerError},
9+
};
710
use anyhow::Result;
811
use chrono::Duration;
912
use enumflags2::BitFlags;
@@ -14,6 +17,18 @@ use serde_with::{DurationSeconds, serde_as};
1417

1518
/// RPC call to create a new JWT Token
1619
pub enum AuthNew {}
20+
21+
impl AuthNew {
22+
pub fn create_token(
23+
keystore: &KeyStore,
24+
token_exp: Duration,
25+
permissions: Vec<String>,
26+
) -> anyhow::Result<String> {
27+
let ki = keystore.get(JWT_IDENTIFIER)?;
28+
Ok(create_token(permissions, ki.private_key(), token_exp)?)
29+
}
30+
}
31+
1732
impl RpcMethod<2> for AuthNew {
1833
const NAME: &'static str = "Filecoin.AuthNew";
1934
const N_REQUIRED_PARAMS: usize = 1;
@@ -28,14 +43,13 @@ impl RpcMethod<2> for AuthNew {
2843
(permissions, expiration_secs): Self::Params,
2944
) -> Result<Self::Ok, ServerError> {
3045
let ks = ctx.keystore.read();
31-
let ki = ks.get(JWT_IDENTIFIER)?;
3246
// Lotus admin tokens do not expire but Forest requires all JWT tokens to
3347
// have an expiration date. So we set the expiration date to 100 years in
3448
// the future to match user-visible behavior of Lotus.
3549
let token_exp = expiration_secs
3650
.map(chrono::Duration::seconds)
3751
.unwrap_or_else(|| chrono::Duration::days(365 * 100));
38-
let token = create_token(permissions, ki.private_key(), token_exp)?;
52+
let token = Self::create_token(&ks, token_exp, permissions)?;
3953
Ok(token.as_bytes().to_vec())
4054
}
4155
}

src/rpc/mod.rs

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -444,11 +444,10 @@ use fvm_ipld_blockstore::Blockstore;
444444
use jsonrpsee::{
445445
Methods,
446446
core::middleware::RpcServiceBuilder,
447-
server::{RpcModule, Server, StopHandle, TowerServiceBuilder, stop_channel},
447+
server::{RpcModule, Server, StopHandle, TowerServiceBuilder},
448448
};
449449
use parking_lot::RwLock;
450450
use std::env;
451-
use std::net::SocketAddr;
452451
use std::sync::{Arc, LazyLock};
453452
use std::time::Duration;
454453
use tokio::sync::mpsc;
@@ -530,7 +529,8 @@ struct PerConnection<RpcMiddleware, HttpMiddleware> {
530529

531530
pub async fn start_rpc<DB>(
532531
state: RPCState<DB>,
533-
rpc_endpoint: SocketAddr,
532+
rpc_listener: tokio::net::TcpListener,
533+
stop_handle: StopHandle,
534534
filter_list: Option<FilterList>,
535535
) -> anyhow::Result<()>
536536
where
@@ -557,8 +557,6 @@ where
557557
let methods: Arc<HashMap<ApiPaths, Methods>> =
558558
Arc::new(modules.into_iter().map(|(k, v)| (k, v.into())).collect());
559559

560-
let (stop_handle, _server_handle) = stop_channel();
561-
562560
let per_conn = PerConnection {
563561
stop_handle: stop_handle.clone(),
564562
svc_builder: Server::builder()
@@ -581,12 +579,10 @@ where
581579
.to_service_builder(),
582580
keystore,
583581
};
584-
585-
let listener = tokio::net::TcpListener::bind(rpc_endpoint).await.unwrap();
586582
tracing::info!("Ready for RPC connections");
587583
loop {
588584
let sock = tokio::select! {
589-
res = listener.accept() => {
585+
res = rpc_listener.accept() => {
590586
match res {
591587
Ok((stream, _remote_addr)) => stream,
592588
Err(e) => {
@@ -789,7 +785,14 @@ pub fn openrpc(path: ApiPaths, include: Option<&[&str]>) -> openrpc_types::OpenR
789785

790786
#[cfg(test)]
791787
mod tests {
792-
use crate::rpc::ApiPaths;
788+
use super::*;
789+
use crate::{
790+
db::MemoryDB, networks::NetworkChain, rpc::common::ShiftingVersion,
791+
tool::offline_server::server::offline_rpc_state,
792+
};
793+
use jsonrpsee::server::stop_channel;
794+
use std::net::{Ipv4Addr, SocketAddr};
795+
use tokio::task::JoinSet;
793796

794797
// `cargo test --lib -- --exact 'rpc::tests::openrpc'`
795798
// `cargo insta review`
@@ -808,4 +811,62 @@ mod tests {
808811
insta::assert_yaml_snapshot!(_spec);
809812
}
810813
}
814+
815+
#[tokio::test(flavor = "multi_thread")]
816+
async fn test_rpc_server() {
817+
let chain = NetworkChain::Calibnet;
818+
let db = Arc::new(MemoryDB::default());
819+
let mut services = JoinSet::new();
820+
let (state, mut shutdown_recv) = offline_rpc_state(chain, db, None, None, &mut services)
821+
.await
822+
.unwrap();
823+
let block_delay_secs = state.chain_config().block_delay_secs;
824+
let shutdown_send = state.shutdown.clone();
825+
let jwt_read_permissions = vec!["read".to_owned()];
826+
let jwt_read = super::methods::auth::AuthNew::create_token(
827+
&state.keystore.read(),
828+
chrono::Duration::hours(1),
829+
jwt_read_permissions.clone(),
830+
)
831+
.unwrap();
832+
let rpc_listener =
833+
tokio::net::TcpListener::bind(SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 0))
834+
.await
835+
.unwrap();
836+
let rpc_address = rpc_listener.local_addr().unwrap();
837+
let (stop_handle, server_handle) = stop_channel();
838+
839+
// Start an RPC server
840+
841+
let handle = tokio::spawn(start_rpc(state, rpc_listener, stop_handle, None));
842+
843+
// Send a few requests
844+
845+
let client = Client::from_url(
846+
format!("http://{}:{}/", rpc_address.ip(), rpc_address.port())
847+
.parse()
848+
.unwrap(),
849+
);
850+
851+
let response = super::methods::common::Version::call(&client, ())
852+
.await
853+
.unwrap();
854+
assert_eq!(
855+
&response.version,
856+
&*crate::utils::version::FOREST_VERSION_STRING
857+
);
858+
assert_eq!(response.block_delay, block_delay_secs);
859+
assert_eq!(response.api_version, ShiftingVersion::new(2, 3, 0));
860+
861+
let response = super::methods::auth::AuthVerify::call(&client, (jwt_read,))
862+
.await
863+
.unwrap();
864+
assert_eq!(response, jwt_read_permissions);
865+
866+
// Gracefully shutdown the RPC server
867+
shutdown_send.send(()).await.unwrap();
868+
shutdown_recv.recv().await;
869+
server_handle.stop().unwrap();
870+
handle.await.unwrap().unwrap();
871+
}
811872
}

src/tool/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Copyright 2019-2026 ChainSafe Systems
22
// SPDX-License-Identifier: Apache-2.0, MIT
33
pub mod main;
4-
mod offline_server;
4+
pub mod offline_server;
55
pub mod subcommands;

0 commit comments

Comments
 (0)