@@ -403,6 +403,224 @@ defmodule Electric.Plug.RouterTest do
403403 ] = Jason . decode! ( conn . resp_body )
404404 end
405405
406+ @ all_types_table_name "all_types_table"
407+ @ tag with_sql: [
408+ "CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')" ,
409+ "CREATE TYPE complex AS (r double precision, i double precision)" ,
410+ "CREATE DOMAIN posint AS integer CHECK (VALUE > 0)" ,
411+ "CREATE TABLE #{ @ all_types_table_name } (
412+ txt VARCHAR,
413+ i2 INT2 PRIMARY KEY,
414+ i4 INT4,
415+ i8 INT8,
416+ f8 FLOAT8,
417+ b BOOLEAN,
418+ json JSON,
419+ jsonb JSONB,
420+ blob BYTEA,
421+ ints INT8[],
422+ ints2 INT8[][],
423+ int4s INT4[],
424+ doubles FLOAT8[],
425+ bools BOOLEAN[],
426+ moods mood[],
427+ moods2 mood[][],
428+ complexes complex[],
429+ posints posint[],
430+ jsons JSONB[],
431+ txts TEXT[]
432+ )"
433+ ]
434+ test "can sync all data types" , % { opts: opts , db_conn: db_conn } do
435+ Postgrex . query! ( db_conn , "
436+ INSERT INTO #{ @ all_types_table_name } (txt, i2, i4, i8, f8, b, json, jsonb, blob, ints, ints2, int4s, doubles, bools, moods, moods2, complexes, posints, jsons, txts)
437+ VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20 )
438+ " , [
439+ "test" ,
440+ 1 ,
441+ 2_147_483_647 ,
442+ 9_223_372_036_854_775_807 ,
443+ 4.5 ,
444+ true ,
445+ % { foo: "bar" } ,
446+ % { foo: "bar" } ,
447+ << 0 , 1 , 255 , 254 >> ,
448+ [ 1 , 2 , 3 ] ,
449+ [
450+ [ 1 , 2 , 3 ] ,
451+ [ 4 , 5 , 6 ]
452+ ] ,
453+ [ 1 , 2 , 3 ] ,
454+ [ 1.2 , - 3.2 , :inf , :"-inf" , :NaN ] ,
455+ [ true , false , true ] ,
456+ [ "sad" , "ok" , "happy" ] ,
457+ [
458+ [ "sad" , "ok" ] ,
459+ [ "ok" , "happy" ]
460+ ] ,
461+ [ { 1.1 , 2.2 } , { 3.3 , 4.4 } ] ,
462+ [ 5 , 9 , 2 ] ,
463+ [ % { foo: "bar" } , % { bar: "baz" } ] ,
464+ [ "foo" , "bar" , "baz" ]
465+ ] )
466+
467+ conn =
468+ conn ( "GET" , "/v1/shape?table=#{ @ all_types_table_name } &offset=-1" ) |> Router . call ( opts )
469+
470+ assert % { status: 200 } = conn
471+ shape_handle = get_resp_shape_handle ( conn )
472+ latest_offset = get_resp_last_offset ( conn )
473+
474+ assert [
475+ % {
476+ "value" => % {
477+ "txt" => "test" ,
478+ "i2" => "1" ,
479+ "i4" => "2147483647" ,
480+ "i8" => "9223372036854775807" ,
481+ "f8" => "4.5" ,
482+ "b" => "true" ,
483+ "json" => "{\" foo\" :\" bar\" }" ,
484+ "jsonb" => "{\" foo\" : \" bar\" }" ,
485+ "blob" => "\\ x0001fffe" ,
486+ "ints" => "{1,2,3}" ,
487+ "ints2" => "{{1,2,3},{4,5,6}}" ,
488+ "int4s" => "{1,2,3}" ,
489+ "doubles" => "{1.2,-3.2,Infinity,-Infinity,NaN}" ,
490+ "bools" => "{t,f,t}" ,
491+ "moods" => "{sad,ok,happy}" ,
492+ "moods2" => "{{sad,ok},{ok,happy}}" ,
493+ "posints" => "{5,9,2}" ,
494+ "complexes" => "{\" (1.1,2.2)\" ,\" (3.3,4.4)\" }" ,
495+ "jsons" => "{\" {\\ \" foo\\ \" : \\ \" bar\\ \" }\" ,\" {\\ \" bar\\ \" : \\ \" baz\\ \" }\" }" ,
496+ "txts" => "{foo,bar,baz}"
497+ } ,
498+ "key" => key
499+ }
500+ ] = Jason . decode! ( conn . resp_body )
501+
502+ task =
503+ Task . async ( fn ->
504+ conn (
505+ "GET" ,
506+ "/v1/shape?table=#{ @ all_types_table_name } &offset=#{ latest_offset } &handle=#{ shape_handle } &live"
507+ )
508+ |> Router . call ( opts )
509+ end )
510+
511+ Postgrex . query! ( db_conn , "UPDATE #{ @ all_types_table_name } SET
512+ txt = $1, i4 = $2, i8 = $3, f8 = $4, b = $5, json = $6,
513+ jsonb = $7, blob = $8, ints = $9, ints2 = $10, int4s = $11,
514+ doubles = $12, bools = $13, moods = $14, moods2 = $15,
515+ complexes = $16, posints = $17, jsons = $18, txts = $19
516+ WHERE i2 = 1
517+ " , [
518+ "changed" ,
519+ 20 ,
520+ 30 ,
521+ 40.5 ,
522+ false ,
523+ % { bar: "foo" } ,
524+ % { bar: "foo" } ,
525+ << 255 , 254 , 0 , 1 >> ,
526+ [ 4 , 5 , 6 ] ,
527+ [
528+ [ 4 , 5 , 6 ] ,
529+ [ 7 , 8 , 9 ]
530+ ] ,
531+ [ 4 , 5 , 6 ] ,
532+ [ - 100.2 , :"-inf" , :NaN , 3.2 ] ,
533+ [ false , true , false ] ,
534+ [ "sad" , "happy" ] ,
535+ [
536+ [ "sad" , "happy" ] ,
537+ [ "happy" , "ok" ]
538+ ] ,
539+ [ { 2.2 , 3.3 } , { 4.4 , 5.5 } ] ,
540+ [ 6 , 10 , 3 ] ,
541+ [ % { bar: "baz" } ] ,
542+ [ "new" , "values" ]
543+ ] )
544+
545+ assert % { status: 200 } = conn = Task . await ( task )
546+
547+ assert [
548+ % {
549+ "key" => ^ key ,
550+ "value" => % {
551+ "txt" => "changed" ,
552+ "i2" => "1" ,
553+ "i4" => "20" ,
554+ "i8" => "30" ,
555+ "f8" => "40.5" ,
556+ "b" => "f" ,
557+ "json" => "{\" bar\" :\" foo\" }" ,
558+ "jsonb" => "{\" bar\" : \" foo\" }" ,
559+ "blob" => "\\ xfffe0001" ,
560+ "ints" => "{4,5,6}" ,
561+ "ints2" => "{{4,5,6},{7,8,9}}" ,
562+ "int4s" => "{4,5,6}" ,
563+ "doubles" => "{-100.2,-Infinity,NaN,3.2}" ,
564+ "bools" => "{f,t,f}" ,
565+ "moods" => "{sad,happy}" ,
566+ "moods2" => "{{sad,happy},{happy,ok}}" ,
567+ "posints" => "{6,10,3}" ,
568+ "complexes" => "{\" (2.2,3.3)\" ,\" (4.4,5.5)\" }" ,
569+ "jsons" => "{\" {\\ \" bar\\ \" : \\ \" baz\\ \" }\" }" ,
570+ "txts" => "{new,values}"
571+ }
572+ } ,
573+ @ up_to_date
574+ ] = Jason . decode! ( conn . resp_body )
575+ end
576+
577+ @ large_binary_table "large_binary_table"
578+ @ tag with_sql: [ "CREATE TABLE #{ @ large_binary_table } (id INT PRIMARY KEY, blob BYTEA)" ]
579+ test "can sync large binaries" , % { opts: opts , db_conn: db_conn } do
580+ # 10 MB
581+ blob_size = 10_000_000
582+
583+ # ensure initial sync works
584+ blob = :rand . bytes ( blob_size )
585+ hex_blob = "\\ x" <> Base . encode16 ( blob , case: :lower )
586+
587+ Postgrex . query! ( db_conn , "INSERT INTO #{ @ large_binary_table } (id, blob) VALUES (1, $1)" , [
588+ blob
589+ ] )
590+
591+ conn =
592+ conn ( "GET" , "/v1/shape?table=#{ @ large_binary_table } &offset=-1" ) |> Router . call ( opts )
593+
594+ assert % { status: 200 } = conn
595+ shape_handle = get_resp_shape_handle ( conn )
596+ latest_offset = get_resp_last_offset ( conn )
597+ assert [ % { "value" => % { "id" => "1" , "blob" => ^ hex_blob } } ] = Jason . decode! ( conn . resp_body )
598+
599+ task =
600+ Task . async ( fn ->
601+ conn (
602+ "GET" ,
603+ "/v1/shape?table=#{ @ large_binary_table } &offset=#{ latest_offset } &handle=#{ shape_handle } &live"
604+ )
605+ |> Router . call ( opts )
606+ end )
607+
608+ # ensure that updates also work
609+ blob = :rand . bytes ( blob_size )
610+ hex_blob = "\\ x" <> Base . encode16 ( blob , case: :lower )
611+
612+ Postgrex . query! ( db_conn , "UPDATE #{ @ large_binary_table } SET blob = $1 WHERE id = 1" , [
613+ blob
614+ ] )
615+
616+ assert % { status: 200 } = conn = Task . await ( task )
617+
618+ assert [
619+ % { "value" => % { "id" => "1" , "blob" => ^ hex_blob } } ,
620+ @ up_to_date
621+ ] = Jason . decode! ( conn . resp_body )
622+ end
623+
406624 @ tag with_sql: [
407625 "CREATE TABLE wide_table (id BIGINT PRIMARY KEY, value1 TEXT NOT NULL, value2 TEXT NOT NULL, value3 TEXT NOT NULL)" ,
408626 "INSERT INTO wide_table VALUES (1, 'test value 1', 'test value 1', 'test value 1')"
0 commit comments