Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ if(BUILD_TESTS)
add_cloudsql_test(btree_index_tests tests/btree_index_tests.cpp)
add_cloudsql_test(storage_manager_tests tests/storage_manager_tests.cpp)
add_cloudsql_test(rpc_server_tests tests/rpc_server_tests.cpp)
add_cloudsql_test(rpc_client_tests tests/rpc_client_tests.cpp)
add_cloudsql_test(operator_tests tests/operator_tests.cpp)
add_cloudsql_test(query_executor_tests tests/query_executor_tests.cpp)
add_cloudsql_test(distributed_executor_tests tests/distributed_executor_tests.cpp)
Expand Down
13 changes: 7 additions & 6 deletions tests/operator_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -800,9 +800,9 @@ TEST_F(OperatorTests, AggregateMultipleAggregates) {

Tuple tuple;
EXPECT_TRUE(agg->next(tuple));
EXPECT_EQ(tuple.get(0).to_int64(), 60); // SUM
EXPECT_EQ(tuple.get(1).to_int64(), 3); // COUNT
EXPECT_EQ(tuple.get(2).to_float64(), 20.0); // AVG
EXPECT_EQ(tuple.get(0).to_int64(), 60); // SUM
EXPECT_EQ(tuple.get(1).to_int64(), 3); // COUNT
EXPECT_EQ(tuple.get(2).to_float64(), 20.0); // AVG
EXPECT_FALSE(agg->next(tuple));
agg->close();
}
Expand Down Expand Up @@ -905,7 +905,8 @@ TEST_F(OperatorTests, HashJoinRightOuter) {
// RIGHT join output: matched rows + unmatched right rows with NULLs
// Matched: (2, 2)
// Unmatched right: (NULL, 3), (NULL, 4)
std::vector<std::pair<int64_t, int64_t>> results; // (left_value, right_value); use INT64_MIN as sentinel for NULL
std::vector<std::pair<int64_t, int64_t>>
results; // (left_value, right_value); use INT64_MIN as sentinel for NULL
Tuple tuple;
while (join->next(tuple)) {
int64_t left_val = tuple.get(0).is_null() ? INT64_MIN : tuple.get(0).to_int64();
Expand Down Expand Up @@ -991,11 +992,11 @@ TEST_F(OperatorTests, HashJoinNullKeys) {
Schema left_schema = make_schema({{"id", common::ValueType::TYPE_INT64}});
std::vector<Tuple> left_data;
left_data.push_back(make_tuple({common::Value::make_int64(1)})); // matches 1
left_data.push_back(make_tuple({common::Value()})); // NULL - currently matches NULL
left_data.push_back(make_tuple({common::Value()})); // NULL - currently matches NULL

Schema right_schema = make_schema({{"id", common::ValueType::TYPE_INT64}});
std::vector<Tuple> right_data;
right_data.push_back(make_tuple({common::Value()})); // NULL - currently matches
right_data.push_back(make_tuple({common::Value()})); // NULL - currently matches
right_data.push_back(make_tuple({common::Value::make_int64(1)})); // matches 1

auto left_scan = make_buffer_scan("left_table", left_data, left_schema);
Expand Down
229 changes: 229 additions & 0 deletions tests/rpc_client_tests.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/**
* @file rpc_client_tests.cpp
* @brief Unit tests for RpcClient - internal RPC client for node-to-node communication
*/

#include <gtest/gtest.h>

#include <atomic>
#include <csignal>
#include <cstdint>
#include <cstring>
#include <memory>
#include <thread>
#include <vector>

#include "network/rpc_client.hpp"
#include "network/rpc_message.hpp"
#include "network/rpc_server.hpp"

using namespace cloudsql::network;

namespace {

// Ignore SIGPIPE to prevent crashes when writing to closed sockets
struct SigpipeGuard {
SigpipeGuard() { std::signal(SIGPIPE, SIG_IGN); }
};
SigpipeGuard g_sigpipe;

class RpcClientTests : public ::testing::Test {
protected:
void SetUp() override {
port_ = TEST_PORT_BASE_ + next_port_++;
server_ = std::make_unique<RpcServer>(port_);
handler_called_ = false;
}

void TearDown() override {
if (server_) {
server_->stop();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}

static constexpr uint16_t TEST_PORT_BASE_ = 6400;
static std::atomic<uint16_t> next_port_;
uint16_t port_;
std::unique_ptr<RpcServer> server_;
std::atomic<bool> handler_called_{false};
};

std::atomic<uint16_t> RpcClientTests::next_port_{0};

TEST_F(RpcClientTests, ConnectAndDisconnect) {
server_->start();

RpcClient client("127.0.0.1", port_);
EXPECT_TRUE(client.connect());
EXPECT_TRUE(client.is_connected());

client.disconnect();
EXPECT_FALSE(client.is_connected());
}

TEST_F(RpcClientTests, ConnectRefused) {
// No server started - connection should fail
RpcClient client("127.0.0.1", port_);
EXPECT_FALSE(client.connect());
EXPECT_FALSE(client.is_connected());
}

TEST_F(RpcClientTests, ConnectInvalidAddress) {
// Use an address that nothing is listening on
RpcClient client("127.0.0.1", port_);
// Port not in use, but connection refused happens at TCP level
EXPECT_FALSE(client.connect());
}
Comment on lines +65 to +77

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

ConnectInvalidAddress duplicates ConnectRefused.

Both tests connect to 127.0.0.1:port_ with no server running and assert connect() returns false — they exercise the same TCP-connection-refused path. The comment at Line 75 even acknowledges this. If the intent is to cover a genuinely invalid/unreachable address, use e.g. an unroutable IP (192.0.2.1, TEST-NET-1) or an invalid hostname so connect() fails in DNS / routing rather than with ECONNREFUSED; otherwise this test is redundant and can be removed.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/rpc_client_tests.cpp` around lines 65 - 77, The ConnectInvalidAddress
test is redundant with ConnectRefused; either remove it or change it to exercise
a different failure mode: update the RpcClient instantiation in
TEST_F(RpcClientTests, ConnectInvalidAddress) to use an unroutable TEST-NET-1 IP
(e.g. "192.0.2.1") or an invalid hostname so RpcClient::connect() fails due to
DNS/routing rather than TCP ECONNREFUSED, then assert
EXPECT_FALSE(client.connect()) and EXPECT_FALSE(client.is_connected()) as
appropriate; keep the original ConnectRefused test using "127.0.0.1" unchanged
if you choose to add the unroutable address instead of deleting the test.


TEST_F(RpcClientTests, CallAfterServerStop) {
server_->start();

// Set a handler that responds immediately
server_->set_handler(RpcType::Heartbeat,
[](const RpcHeader&, const std::vector<uint8_t>&, int fd) {
RpcHeader resp_h;
resp_h.type = RpcType::Heartbeat;
resp_h.payload_len = 0;
char h_buf[RpcHeader::HEADER_SIZE];
resp_h.encode(h_buf);
send(fd, h_buf, RpcHeader::HEADER_SIZE, 0);
});

RpcClient client("127.0.0.1", port_);
ASSERT_TRUE(client.connect());
ASSERT_TRUE(client.is_connected());

// Stop the server
server_->stop();
std::this_thread::sleep_for(std::chrono::milliseconds(50));

// Call after server stop should fail (connection refused/reset)
// Note: is_connected() returns true because it only checks if fd_ >= 0,
// not whether the server is still connected
std::vector<uint8_t> response;
EXPECT_FALSE(client.call(RpcType::Heartbeat, {}, response, 0));
}

TEST_F(RpcClientTests, FullRoundTrip) {
server_->start();

server_->set_handler(RpcType::QueryResults,
[](const RpcHeader& h, const std::vector<uint8_t>& p, int fd) {
// Echo back the payload
RpcHeader resp_h;
resp_h.type = RpcType::QueryResults;
resp_h.payload_len = static_cast<uint16_t>(p.size());
char h_buf[RpcHeader::HEADER_SIZE];
resp_h.encode(h_buf);
send(fd, h_buf, RpcHeader::HEADER_SIZE, 0);
if (!p.empty()) {
send(fd, p.data(), p.size(), 0);
}
});

RpcClient client("127.0.0.1", port_);
ASSERT_TRUE(client.connect());

std::vector<uint8_t> payload = {1, 2, 3, 4, 5};
std::vector<uint8_t> response;
ASSERT_TRUE(client.call(RpcType::QueryResults, payload, response, 0));

EXPECT_EQ(response.size(), 5U);
EXPECT_EQ(response[0], 1);
EXPECT_EQ(response[4], 5);
}

TEST_F(RpcClientTests, ConcurrentCalls) {
server_->start();

std::atomic<int> call_count{0};
server_->set_handler(RpcType::QueryResults,
[&](const RpcHeader& h, const std::vector<uint8_t>& p, int fd) {
call_count++;
RpcHeader resp_h;
resp_h.type = RpcType::QueryResults;
resp_h.payload_len = static_cast<uint16_t>(p.size());
char h_buf[RpcHeader::HEADER_SIZE];
resp_h.encode(h_buf);
send(fd, h_buf, RpcHeader::HEADER_SIZE, 0);
if (!p.empty()) {
send(fd, p.data(), p.size(), 0);
}
});

RpcClient client("127.0.0.1", port_);
ASSERT_TRUE(client.connect());

// Make 5 concurrent calls - RpcClient is single-threaded so they serialize
for (int i = 0; i < 5; i++) {
std::vector<uint8_t> payload = {static_cast<uint8_t>(i)};
std::vector<uint8_t> response;
ASSERT_TRUE(client.call(RpcType::QueryResults, payload, response, 0));
EXPECT_EQ(response.size(), 1U);
EXPECT_EQ(response[0], static_cast<uint8_t>(i));
}

EXPECT_EQ(call_count, 5);
}

TEST_F(RpcClientTests, ReconnectAfterServerRestart) {
// Use a different port to avoid conflicts with other tests
constexpr uint16_t reconnect_port = TEST_PORT_BASE_ + 100;
auto reconnect_server = std::make_unique<RpcServer>(reconnect_port);
reconnect_server->start();

server_->set_handler(RpcType::QueryResults,
[](const RpcHeader& h, const std::vector<uint8_t>& p, int fd) {
RpcHeader resp_h;
resp_h.type = RpcType::QueryResults;
resp_h.payload_len = static_cast<uint16_t>(p.size());
char h_buf[RpcHeader::HEADER_SIZE];
resp_h.encode(h_buf);
send(fd, h_buf, RpcHeader::HEADER_SIZE, 0);
if (!p.empty()) {
send(fd, p.data(), p.size(), 0);
}
});

RpcClient client("127.0.0.1", reconnect_port);
ASSERT_TRUE(client.connect());

std::vector<uint8_t> response;
ASSERT_TRUE(client.call(RpcType::QueryResults, {}, response, 0));

// Stop server
reconnect_server->stop();
std::this_thread::sleep_for(std::chrono::milliseconds(50));

// Start server again on same port
reconnect_server = std::make_unique<RpcServer>(reconnect_port);
reconnect_server->start();

// Reconnect
ASSERT_TRUE(client.connect());
ASSERT_TRUE(client.is_connected());
ASSERT_TRUE(client.call(RpcType::QueryResults, {}, response, 0));

Comment thread
coderabbitai[bot] marked this conversation as resolved.
reconnect_server->stop();
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

TEST_F(RpcClientTests, SendOnlyWithoutResponse) {
server_->start();

std::atomic<int> call_count{0};
server_->set_handler(RpcType::Heartbeat, [&](const RpcHeader& h, const std::vector<uint8_t>& p,
int fd) { call_count++; });

RpcClient client("127.0.0.1", port_);
ASSERT_TRUE(client.connect());

// send_only doesn't wait for response
ASSERT_TRUE(client.send_only(RpcType::Heartbeat, {}, 0));

// Give server time to process
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_EQ(call_count, 1);
}

} // namespace