Skip to content

Commit 4098571

Browse files
authored
Close servers that havent finished reading/writing (#566)
* Close servers that havent finished reading/writing * fix read cancellation safety * Test * More tests * fix test * remove the bad test * reset bans * Refactor cleanup to be cleaner * Fix sharded copy to stdout * preserve health check failure on io error * Remove * remove unrecheable code * more confusing code * more dead code
1 parent 5167249 commit 4098571

File tree

25 files changed

+1172
-202
lines changed

25 files changed

+1172
-202
lines changed

integration/python/test_asyncpg.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import random
66
import string
77
import pytest_asyncio
8+
from io import BytesIO
89

910

1011
@pytest_asyncio.fixture
@@ -231,13 +232,24 @@ async def test_copy(conns):
231232
records = 250
232233
for i in range(50):
233234
for conn in conns:
235+
# Test COPY FROM (TO table)
234236
rows = [[x, f"value_{x}", datetime.now()] for x in range(records)]
235237
await conn.copy_records_to_table(
236238
"sharded", records=rows, columns=["id", "value", "created_at"]
237239
)
238240
count = await conn.fetch("SELECT COUNT(*) FROM sharded")
239241
assert len(count) == 1
240242
assert count[0][0] == records
243+
244+
# Test COPY TO STDOUT
245+
buffer = BytesIO()
246+
copied_data = await conn.copy_from_table(
247+
"sharded", columns=["id", "value", "created_at"], output=buffer
248+
)
249+
buffer.seek(0)
250+
lines = buffer.read().decode('utf-8').strip().split('\n')
251+
assert len(lines) == records, f"expected {records} lines in COPY output, got {len(lines)}"
252+
241253
await conn.execute("DELETE FROM sharded")
242254

243255

integration/ruby/ar_spec.rb

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
require_relative 'rspec_helper'
44
require 'pp'
5+
require 'timeout'
56

67
class Sharded < ActiveRecord::Base
78
self.table_name = 'sharded'
@@ -187,4 +188,129 @@ def conn(db, prepared)
187188
end
188189
end
189190
end
191+
192+
describe 'chaos testing with interrupted queries' do
193+
before do
194+
conn('failover', false)
195+
ActiveRecord::Base.connection.execute 'DROP TABLE IF EXISTS sharded'
196+
ActiveRecord::Base.connection.execute 'CREATE TABLE sharded (id BIGSERIAL PRIMARY KEY, value TEXT)'
197+
end
198+
199+
it 'handles interrupted queries and continues operating normally' do
200+
interrupted_count = 0
201+
successful_count = 0
202+
mutex = Mutex.new
203+
204+
# Apply latency toxic to slow down query transmission,
205+
# making it easier to interrupt queries mid-flight
206+
Toxiproxy[:primary].toxic(:latency, latency: 100, jitter: 50).apply do
207+
# Phase 1: Chaos - interrupt queries randomly with thread kills
208+
chaos_threads = []
209+
killer_threads = []
210+
211+
# Start 10 query threads
212+
10.times do |thread_id|
213+
t = Thread.new do
214+
100.times do |i|
215+
begin
216+
case rand(3)
217+
when 0
218+
# SELECT query
219+
Sharded.where('id > ?', 0).limit(10).to_a
220+
when 1
221+
# INSERT query
222+
Sharded.create value: "thread_#{thread_id}_iter_#{i}"
223+
when 2
224+
# Transaction with multiple operations
225+
Sharded.transaction do
226+
rec = Sharded.create value: "tx_#{thread_id}_#{i}"
227+
Sharded.where(id: rec.id).first if rec.id
228+
end
229+
end
230+
mutex.synchronize { successful_count += 1 }
231+
rescue StandardError => e
232+
# Killed mid-query or other error
233+
mutex.synchronize { interrupted_count += 1 }
234+
end
235+
end
236+
end
237+
chaos_threads << t
238+
end
239+
240+
# Start killer thread that randomly kills query threads
241+
killer = Thread.new do
242+
50.times do
243+
sleep(rand(0.01..0.05))
244+
alive_threads = chaos_threads.select(&:alive?)
245+
if alive_threads.any?
246+
victim = alive_threads.sample
247+
victim.kill
248+
mutex.synchronize { interrupted_count += 1 }
249+
end
250+
end
251+
end
252+
killer_threads << killer
253+
254+
# Wait for killer to finish
255+
killer_threads.each(&:join)
256+
257+
# Wait for remaining threads (with timeout)
258+
chaos_threads.each { |t| t.join(0.1) }
259+
260+
puts "Chaos phase complete: #{successful_count} successful, #{interrupted_count} interrupted"
261+
expect(interrupted_count).to be > 0
262+
end # End toxiproxy latency
263+
264+
# Give PgDog time to clean up broken connections
265+
sleep(0.5)
266+
267+
# Disconnect all connections to clear bad state
268+
ActiveRecord::Base.connection_pool.disconnect!
269+
270+
# Wait a bit more for cleanup
271+
sleep(0.5)
272+
273+
# Phase 2: Verify database continues to operate normally
274+
verification_errors = []
275+
errors_mutex = Mutex.new
276+
277+
verification_threads = 10.times.map do |thread_id|
278+
Thread.new do
279+
20.times do |i|
280+
begin
281+
# Simple queries that don't depend on finding specific records
282+
# INSERT
283+
rec = Sharded.create value: "verify_#{thread_id}_#{i}"
284+
expect(rec.id).to be > 0
285+
286+
# SELECT with basic query
287+
results = Sharded.where('value LIKE ?', 'verify_%').limit(5).to_a
288+
expect(results).to be_a(Array)
289+
290+
# COUNT query
291+
count = Sharded.where('id > ?', 0).count
292+
expect(count).to be >= 0
293+
rescue PG::Error => e
294+
# PG errors should fail the test
295+
raise
296+
rescue StandardError => e
297+
errors_mutex.synchronize { verification_errors << e }
298+
end
299+
end
300+
end
301+
end
302+
303+
verification_threads.each(&:join)
304+
305+
# Verify no errors occurred during verification
306+
expect(verification_errors).to be_empty, "Verification errors: #{verification_errors.map(&:message).join(', ')}"
307+
308+
# Verify we can still execute basic queries
309+
ActiveRecord::Base.connection.execute('SELECT 1')
310+
311+
# Verify count works
312+
count = Sharded.count
313+
expect(count).to be >= 0
314+
end
315+
end
190316
end

integration/ruby/lb_spec.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
describe 'random' do
1313
it 'distributes traffic evenly' do
1414
conn = failover
15+
# Reset stats and bans
16+
admin.exec "RECONNECT"
1517

1618
before = admin_stats('failover')
1719
250.times do

pgdog/src/admin/show_pools.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ impl Command for ShowPools {
4040
Field::numeric("errors"),
4141
Field::numeric("re_synced"),
4242
Field::numeric("out_of_sync"),
43+
Field::numeric("force_closed"),
4344
Field::bool("online"),
4445
Field::text("replica_lag"),
4546
Field::bool("schema_admin"),
@@ -73,6 +74,7 @@ impl Command for ShowPools {
7374
.add(state.errors)
7475
.add(state.re_synced)
7576
.add(state.out_of_sync)
77+
.add(state.force_close)
7678
.add(state.online)
7779
.add(state.replica_lag.simple_display())
7880
.add(cluster.schema_admin());

pgdog/src/admin/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ async fn show_pools_reports_schema_admin_flag() {
114114
"errors",
115115
"re_synced",
116116
"out_of_sync",
117+
"force_closed",
117118
"online",
118119
"replica_lag",
119120
"schema_admin",

pgdog/src/backend/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ pub enum Error {
8484
#[error("protocol is out of sync")]
8585
ProtocolOutOfSync,
8686

87+
#[error("rollback left server in inconsistent state")]
88+
RollbackFailed,
89+
8790
#[error("decoder is missing required data to decode row")]
8891
DecoderRowError,
8992

pgdog/src/backend/pool/connection/binding.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ impl Binding {
9797
}
9898

9999
let message = server.read().await?;
100+
100101
read = true;
101102
if let Some(message) = state.forward(message)? {
102103
return Ok(message);

pgdog/src/backend/pool/connection/mirror/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl Mirror {
5454
prepared_statements: PreparedStatements::new(),
5555
params: params.clone(),
5656
timeouts: Timeouts::from_config(&config.config.general),
57-
stream: Stream::DevNull,
57+
stream: Stream::dev_null(),
5858
transaction: None,
5959
cross_shard_disabled: config.config.general.cross_shard_disabled,
6060
}

pgdog/src/backend/pool/connection/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,12 +229,14 @@ impl Connection {
229229
/// Only await this future inside a `select!`. One of the conditions
230230
/// suspends this loop indefinitely and expects another `select!` branch
231231
/// to cancel it.
232+
///
232233
pub(crate) async fn read(&mut self) -> Result<Message, Error> {
233234
select! {
234235
notification = self.pub_sub.recv() => {
235236
Ok(notification.ok_or(Error::ProtocolOutOfSync)?.message()?)
236237
}
237238

239+
// BUG: This is not cancellation-safe.
238240
message = self.binding.read() => {
239241
message
240242
}

pgdog/src/backend/pool/connection/multi_shard/mod.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ struct Counters {
3939
bind_complete: usize,
4040
command_complete: Option<Message>,
4141
transaction_error: bool,
42+
copy_done: usize,
43+
copy_out: usize,
44+
copy_data: usize,
4245
}
4346

4447
/// Multi-shard state.
@@ -246,6 +249,25 @@ impl MultiShard {
246249
}
247250
}
248251

252+
'c' => {
253+
self.counters.copy_done += 1;
254+
if self.counters.copy_done.is_multiple_of(self.shards) {
255+
forward = Some(message);
256+
}
257+
}
258+
259+
'd' => {
260+
self.counters.copy_data += 1;
261+
forward = Some(message);
262+
}
263+
264+
'H' => {
265+
self.counters.copy_out += 1;
266+
if self.counters.copy_out.is_multiple_of(self.shards) {
267+
forward = Some(message);
268+
}
269+
}
270+
249271
't' => {
250272
self.counters.parameter_description += 1;
251273
if self

0 commit comments

Comments
 (0)