@@ -127,8 +127,8 @@ impl Guard {
127127
128128 if cleanup. needed ( ) {
129129 debug ! (
130- "[cleanup] {} , server in \" {}\" state [{}]" ,
131- cleanup,
130+ "[cleanup] running {} cleanup queries , server in \" {}\" state [{}]" ,
131+ cleanup. len ( ) ,
132132 server. stats( ) . state,
133133 server. addr( )
134134 ) ;
@@ -191,11 +191,16 @@ impl Drop for Guard {
191191mod test {
192192 use std:: time:: Duration ;
193193
194- use tokio:: time:: sleep;
194+ use tokio:: time:: { sleep, timeout , Instant } ;
195195
196196 use crate :: {
197- backend:: pool:: { test:: pool, Address , Config , Pool , PoolConfig , Request } ,
198- net:: { Describe , Flush , Parse , Protocol , Query , Sync } ,
197+ backend:: {
198+ pool:: {
199+ cleanup:: Cleanup , test:: pool, Address , Config , Guard , Pool , PoolConfig , Request ,
200+ } ,
201+ server:: test:: test_server,
202+ } ,
203+ net:: { Describe , Flush , Parse , Protocol , ProtocolMessage , Query , Sync } ,
199204 } ;
200205
201206 #[ tokio:: test]
@@ -292,14 +297,7 @@ mod test {
292297 } ;
293298
294299 let pool = Pool :: new ( & PoolConfig {
295- address : Address {
296- host : "127.0.0.1" . into ( ) ,
297- port : 5432 ,
298- database_name : "pgdog" . into ( ) ,
299- user : "pgdog" . into ( ) ,
300- password : "pgdog" . into ( ) ,
301- ..Default :: default ( )
302- } ,
300+ address : Address :: new_test ( ) ,
303301 config,
304302 } ) ;
305303 pool. launch ( ) ;
@@ -318,10 +316,79 @@ mod test {
318316
319317 sleep ( Duration :: from_millis ( 500 ) ) . await ;
320318
321- let state = pool. lock ( ) ;
322- assert_eq ! ( state. errors, 0 ) ;
323- assert_eq ! ( state. idle( ) , 0 ) ;
324- assert_eq ! ( state. total( ) , 0 ) ;
325- assert_eq ! ( state. force_close, 1 ) ;
319+ {
320+ let state = pool. lock ( ) ;
321+ assert_eq ! ( state. errors, 0 ) ;
322+ assert_eq ! ( state. idle( ) , 0 ) ;
323+ assert_eq ! ( state. total( ) , 0 ) ;
324+ assert_eq ! ( state. force_close, 1 ) ;
325+ }
326+
327+ // Will create new connection.
328+ let mut server = pool. get ( & Request :: default ( ) ) . await . unwrap ( ) ;
329+ let one: Vec < i32 > = server. fetch_all ( "SELECT 1" ) . await . unwrap ( ) ;
330+ assert_eq ! ( one[ 0 ] , 1 ) ;
331+ }
332+
333+ #[ tokio:: test]
334+ async fn test_cleanup_close_drain ( ) {
335+ crate :: logger ( ) ;
336+
337+ let mut server = Guard :: new (
338+ Pool :: new_test ( ) ,
339+ Box :: new ( test_server ( ) . await ) ,
340+ Instant :: now ( ) ,
341+ ) ;
342+ server. prepared_statements_mut ( ) . set_capacity ( 1 ) ;
343+
344+ for i in 0 ..5 {
345+ server
346+ . send (
347+ & vec ! [
348+ ProtocolMessage :: from( Parse :: named( format!( "test_{}" , i) , "SELECT 1" ) ) ,
349+ Flush . into( ) ,
350+ ]
351+ . into ( ) ,
352+ )
353+ . await
354+ . unwrap ( ) ;
355+
356+ let ok = server. read ( ) . await . unwrap ( ) ;
357+ assert_eq ! ( ok. code( ) , '1' ) ;
358+ assert ! ( server. done( ) ) ;
359+ }
360+ assert_eq ! ( server. prepared_statements( ) . len( ) , 5 ) ;
361+ server
362+ . send ( & vec ! [ Query :: new( "SHOW prepared_statements" ) . into( ) ] . into ( ) )
363+ . await
364+ . unwrap ( ) ;
365+ let mut guard = server;
366+ let mut server = guard. server . take ( ) . unwrap ( ) ;
367+ let cleanup = Cleanup :: new ( & guard, & mut server) ;
368+ assert_eq ! ( cleanup. close( ) . len( ) , 4 ) ;
369+ assert ! ( server. needs_drain( ) ) ;
370+
371+ Guard :: cleanup_internal ( & mut server, cleanup) . await . unwrap ( ) ;
372+
373+ assert ! ( server. done( ) ) ;
374+ assert ! ( !server. needs_drain( ) ) ;
375+
376+ let one: Vec < i32 > = server. fetch_all ( "SELECT 1" ) . await . unwrap ( ) ;
377+ assert_eq ! ( one[ 0 ] , 1 ) ;
378+ }
379+
380+ #[ tokio:: test]
381+ async fn test_cancel_safety_partial_send ( ) {
382+ let mut server = test_server ( ) . await ;
383+ let select = ( 0 ..50_000_000 ) . into_iter ( ) . map ( |_| 'b' ) . collect :: < String > ( ) ;
384+ let select = Query :: new ( format ! ( "SELECT '{}'" , select) ) ;
385+ let res = timeout (
386+ Duration :: from_millis ( 1 ) ,
387+ server. send ( & vec ! [ select. into( ) ] . into ( ) ) ,
388+ )
389+ . await ;
390+ assert ! ( res. is_err( ) ) ;
391+ assert ! ( server. force_close( ) ) ;
392+ assert ! ( server. io_in_progress( ) )
326393 }
327394}
0 commit comments