Skip to content

Commit 85b863a

Browse files
authored
fix: Synchronously refresh publication and clear cache if failed (#2490)
Followup to #2487 In the above PR I fixed an issue that would cause the publication manager to exit, which made Electric go into a restart loop. There were two issues that compounded that problem: 1. I had previously converted the recovery of the shapes on the publication manager asynchronous, using `cast` instead of `call`, which means that Electric went ahead with initialisation and marked itself healthy despite not managing to actually recover - if it had waited for the recovery to finish and fail the container would never have been marked as healthy and never deployed. 2. If a recovery fails, for whatever reason, it might enter into an infinite recovery loop as it is likely some of the persisted state from which we are trying to recover is causing the issue. We should be able to recover from that since we consider the storage/cache disposable. In order to solve the above issues, I implement the following: 1. Convert the calls back to synchronous calls, increasing the timeout for `ShapeCache` initialisation slightly, as that was the original purpose of making them asynchronous (it might take a while to recover things). a. I'm not entirely sure if this is a bit of an anti-pattern - we could place the initialisation in a `{:continue, :restore_shapes}` after the init - my concern is that if the rest of Electric continues on and initialises everything else before the restore is done we might have a problem. 2. Use `try..catch..after` blocks to ensure cleanup occurs if any recovery operation fails, such that Electric will still manage to start after the supervisor restarts the process and the old, problematic state is cleaned up. a. It's a bit of a brutal approach, I've tested it by tweaking the publication manager to fail every now and then and it works really nicely, where it will erase the stored shapes and restart and continues on like normal. b. We could theoretically place this at a higher level, such as the `start_link` of the `ShapeCache`, so that we always clear the cache if initialisation fails, but I wanted to keep the scope tight and only to things that we have observed during real operation. I can spend time to add a sort of integration test with a failing `PublicationManager` to better simulate this situation in a test suite rather than ad hoc. Tagging @balegas as well as we discussed this change
1 parent 077aad2 commit 85b863a

File tree

5 files changed

+91
-28
lines changed

5 files changed

+91
-28
lines changed

.changeset/hungry-beers-jog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
Synchronously recover shapes and publication to ensure Electric boot is successfuly, and clear cache if it fails.

packages/sync-service/lib/electric/replication/publication_manager.ex

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,7 @@ defmodule Electric.Replication.PublicationManager do
107107
@spec recover_shape(Shape.t(), Keyword.t()) :: :ok
108108
def recover_shape(shape, opts \\ []) do
109109
server = Access.get(opts, :server, name(opts))
110-
GenServer.cast(server, {:recover_shape, shape})
111-
:ok
110+
GenServer.call(server, {:recover_shape, shape})
112111
end
113112

114113
@spec remove_shape(Shape.t(), Keyword.t()) :: :ok
@@ -124,8 +123,12 @@ defmodule Electric.Replication.PublicationManager do
124123
@spec refresh_publication(Keyword.t()) :: :ok
125124
def refresh_publication(opts \\ []) do
126125
server = Access.get(opts, :server, name(opts))
127-
GenServer.cast(server, :refresh_publication)
128-
:ok
126+
timeout = Access.get(opts, :timeout, 10_000)
127+
128+
case GenServer.call(server, :refresh_publication, timeout) do
129+
:ok -> :ok
130+
{:error, err} -> raise err
131+
end
129132
end
130133

131134
def start_link(opts) do
@@ -194,15 +197,15 @@ defmodule Electric.Replication.PublicationManager do
194197
{:noreply, state}
195198
end
196199

197-
@impl true
198-
def handle_cast({:recover_shape, shape}, state) do
199-
state = update_relation_filters_for_shape(shape, :add, state)
200+
def handle_call(:refresh_publication, from, state) do
201+
state = add_waiter(from, state)
202+
state = schedule_update_publication(state.update_debounce_timeout, state)
200203
{:noreply, state}
201204
end
202205

203-
def handle_cast(:refresh_publication, state) do
204-
state = schedule_update_publication(state.update_debounce_timeout, state)
205-
{:noreply, state}
206+
def handle_call({:recover_shape, shape}, _from, state) do
207+
state = update_relation_filters_for_shape(shape, :add, state)
208+
{:reply, :ok, state}
206209
end
207210

208211
@impl true

packages/sync-service/lib/electric/shape_cache.ex

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,10 @@ defmodule Electric.ShapeCache do
8888
GenServer.start_link(
8989
__MODULE__,
9090
Map.new(opts) |> Map.put(:db_pool, db_pool) |> Map.put(:name, name),
91-
name: name
91+
name: name,
92+
# allow extra time for shape cache to initialise as it needs to
93+
# restore existing shapes
94+
timeout: Keyword.get(opts, :timeout, 20_000)
9295
)
9396
end
9497
end
@@ -209,9 +212,21 @@ defmodule Electric.ShapeCache do
209212
recover_shapes(state)
210213
end
211214

212-
# ensure publication filters are in line with existing shapes
215+
# ensure publication filters are in line with existing shapes,
216+
# and clean up cache if publication fails to update
213217
{publication_manager, publication_manager_opts} = opts.publication_manager
214-
publication_manager.refresh_publication(publication_manager_opts)
218+
219+
try do
220+
:ok = publication_manager.refresh_publication(publication_manager_opts)
221+
rescue
222+
error ->
223+
clean_up_all_shapes(state)
224+
reraise error, __STACKTRACE__
225+
catch
226+
:exit, reason ->
227+
clean_up_all_shapes(state)
228+
exit(reason)
229+
end
215230

216231
# do this after finishing this function so that we're subscribed to the
217232
# producer before it starts forwarding its demand
@@ -270,15 +285,21 @@ defmodule Electric.ShapeCache do
270285
end
271286

272287
defp clean_up_shape(state, shape_handle) do
273-
Electric.Shapes.DynamicConsumerSupervisor.stop_shape_consumer(
274-
state.consumer_supervisor,
275-
state.stack_id,
276-
shape_handle
277-
)
288+
try do
289+
Electric.Shapes.DynamicConsumerSupervisor.stop_shape_consumer(
290+
state.consumer_supervisor,
291+
state.stack_id,
292+
shape_handle
293+
)
294+
after
295+
# another failsafe for ensuring data is cleaned if consumer fails to
296+
# clean itself - this is not guaranteed to run in case of an actual exit
297+
# but provides an additional safeguard
278298

279-
shape_handle
280-
|> Electric.ShapeCache.Storage.for_shape(state.storage)
281-
|> Electric.ShapeCache.Storage.unsafe_cleanup!()
299+
shape_handle
300+
|> Electric.ShapeCache.Storage.for_shape(state.storage)
301+
|> Electric.ShapeCache.Storage.unsafe_cleanup!()
302+
end
282303

283304
:ok
284305
end
@@ -304,9 +325,11 @@ defmodule Electric.ShapeCache do
304325
# recover publication filter state
305326
publication_manager.recover_shape(shape, publication_manager_opts)
306327
[LogOffset.extract_lsn(latest_offset)]
307-
rescue
308-
e ->
309-
Logger.error("Failed to recover shape #{shape_handle}: #{inspect(e)}")
328+
catch
329+
kind, reason when kind in [:exit, :error] ->
330+
Logger.error(
331+
"Failed to recover shape #{shape_handle}: #{Exception.format(kind, reason, __STACKTRACE__)}"
332+
)
310333

311334
# clean up corrupted data to avoid persisting bad state
312335
Electric.ShapeCache.Storage.for_shape(shape_handle, state.storage)

packages/sync-service/test/electric/shape_cache_test.exs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -893,6 +893,36 @@ defmodule Electric.ShapeCacheTest do
893893
assert {^shape_handle, ^offset} = ShapeCache.get_or_create_shape_handle(@shape, opts)
894894
end
895895

896+
test "invalidates shapes that we fail to restore", %{shape_cache_opts: opts} = context do
897+
{shape_handle1, _} = ShapeCache.get_or_create_shape_handle(@shape, opts)
898+
:started = ShapeCache.await_snapshot_start(shape_handle1, opts)
899+
900+
Mock.PublicationManager
901+
|> stub(:remove_shape, fn _, _ -> :ok end)
902+
|> expect(:recover_shape, 1, fn _, _ -> :ok end)
903+
|> expect(:refresh_publication, 1, fn _ -> raise "failed recovery" end)
904+
|> allow(self(), fn -> Shapes.Consumer.whereis(context[:stack_id], shape_handle1) end)
905+
|> allow(self(), fn -> Process.whereis(opts[:server]) end)
906+
907+
# Should fail to start shape cache and clean up shapes
908+
Process.flag(:trap_exit, true)
909+
910+
assert_raise MatchError, ~r/%RuntimeError{message: \"failed recovery\"/, fn ->
911+
restart_shape_cache(%{
912+
context
913+
| publication_manager: {Mock.PublicationManager, []}
914+
})
915+
end
916+
917+
Process.flag(:trap_exit, false)
918+
919+
# Next restart should not recover shape
920+
restart_shape_cache(context)
921+
{shape_handle2, _} = ShapeCache.get_or_create_shape_handle(@shape, opts)
922+
:started = ShapeCache.await_snapshot_start(shape_handle2, opts)
923+
assert shape_handle1 != shape_handle2
924+
end
925+
896926
defp restart_shape_cache(context) do
897927
stop_shape_cache(context)
898928

@@ -916,7 +946,9 @@ defmodule Electric.ShapeCacheTest do
916946
{pid, Process.monitor(pid)}
917947
end
918948

919-
Shapes.DynamicConsumerSupervisor.stop_all_consumers(ctx.consumer_supervisor)
949+
if Enum.count(consumers) > 0 do
950+
Shapes.DynamicConsumerSupervisor.stop_all_consumers(ctx.consumer_supervisor)
951+
end
920952

921953
for {pid, ref} <- consumers do
922954
assert_receive {:DOWN, ^ref, :process, ^pid, _}

packages/typescript-client/test/integration.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -635,8 +635,8 @@ describe(`HTTP Sync`, () => {
635635
const shapeOffset = res.headers.get(`electric-offset`)!
636636
const fakeMidOffset = shapeOffset
637637
.split(`_`)
638-
.map(Number)
639-
.map((x, i) => (i === 0 ? x - 1 : x))
638+
.map(BigInt)
639+
.map((x, i) => (i === 0 ? x - BigInt(1) : x))
640640
.join(`_`)
641641
const etag = res.headers.get(`etag`)
642642
expect(etag, `Response should have etag header`).not.toBe(null)
@@ -927,7 +927,7 @@ describe(`HTTP Sync`, () => {
927927
// the next up to date message should have had
928928
// a 409 interleaved before it that instructed the
929929
// client to go and fetch data from scratch
930-
expect(statusCodesReceived).toHaveLength(5)
930+
expect(statusCodesReceived.length).greaterThanOrEqual(5)
931931
expect(statusCodesReceived[2]).toBe(409)
932932
expect(statusCodesReceived[3]).toBe(200)
933933
return res()

0 commit comments

Comments
 (0)