Skip to content

Commit 0a95da1

Browse files
authored
sync-service: improve shapes api encapsulation (#2376)
- Api.predefined_shape/2 allows for configuring an endpoint with a pre-defined shape - All http logic is now encapsulated by the api, including not-modified handling and cache-control headers
1 parent 059a69a commit 0a95da1

File tree

10 files changed

+346
-165
lines changed

10 files changed

+346
-165
lines changed

.changeset/seven-guests-appear.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
Allow for accessing via an api that serves only a pre-configured shape, move all http logic into api

packages/sync-service/lib/electric/plug/serve_shape_plug.ex

Lines changed: 1 addition & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -5,30 +5,19 @@ defmodule Electric.Plug.ServeShapePlug do
55
# The halt/1 function is redefined further down below
66
import Plug.Conn, except: [halt: 1]
77

8-
alias Electric.Plug.Utils
98
alias Electric.Shapes.Api
10-
alias Electric.Replication.LogOffset
119
alias Electric.Telemetry.OpenTelemetry
1210
alias Plug.Conn
1311

1412
require Logger
1513

16-
defguardp is_live_request(conn) when conn.assigns.request.params.live
17-
18-
# Aliasing for pattern matching
19-
@before_all_offset LogOffset.before_all()
20-
2114
plug :fetch_query_params
2215

2316
# start_telemetry_span needs to always be the first plug after fetching query params.
2417
plug :start_telemetry_span
2518
plug :put_resp_content_type, "application/json"
2619

2720
plug :validate_request
28-
plug :put_schema_header
29-
plug :put_resp_cache_headers
30-
plug :generate_etag
31-
plug :validate_and_put_etag
3221
plug :serve_shape_log
3322

3423
# end_telemetry_span needs to always be the last plug here.
@@ -54,98 +43,8 @@ defmodule Electric.Plug.ServeShapePlug do
5443
end
5544
end
5645

57-
# Only adds schema header when not in live mode
58-
defp put_schema_header(conn, _) when not is_live_request(conn) do
59-
%{assigns: %{request: request}} = conn
60-
schema = Api.schema(request) |> Jason.encode!()
61-
put_resp_header(conn, "electric-schema", schema)
62-
end
63-
64-
defp put_schema_header(conn, _), do: conn
65-
66-
defp generate_etag(%Conn{} = conn, _) do
67-
%{assigns: %{request: request}} = conn
68-
69-
%{
70-
handle: active_shape_handle,
71-
offset: chunk_end_offset
72-
} = request.response
73-
74-
conn
75-
|> assign(
76-
:etag,
77-
"#{active_shape_handle}:#{request.params.offset}:#{chunk_end_offset}"
78-
)
79-
end
80-
81-
defp validate_and_put_etag(%Conn{} = conn, _) do
82-
%{assigns: %{request: request}} = conn
83-
84-
if_none_match =
85-
get_req_header(conn, "if-none-match")
86-
|> Enum.flat_map(&String.split(&1, ","))
87-
|> Enum.map(&String.trim/1)
88-
|> Enum.map(&String.trim(&1, <<?">>))
89-
90-
cond do
91-
conn.assigns.etag in if_none_match ->
92-
conn
93-
|> send_resp(304, "")
94-
|> halt()
95-
96-
not request.params.live ->
97-
put_resp_header(conn, "etag", conn.assigns.etag)
98-
99-
true ->
100-
conn
101-
end
102-
end
103-
104-
defp put_resp_cache_headers(%Conn{} = conn, _) do
105-
%{assigns: %{request: request}} = conn
106-
107-
case request do
108-
# If the offset is -1, set a 1 week max-age, 1 hour s-maxage (shared cache)
109-
# and 1 month stale-while-revalidate We want private caches to cache the
110-
# initial offset for a long time but for shared caches to frequently
111-
# revalidate so they're serving a fairly fresh copy of the initials shape
112-
# log.
113-
%{params: %{offset: @before_all_offset}} ->
114-
conn
115-
|> put_resp_header(
116-
"cache-control",
117-
"public, max-age=604800, s-maxage=3600, stale-while-revalidate=2629746"
118-
)
119-
120-
# For live requests we want short cache lifetimes and to update the live cursor
121-
%{params: %{live: true}, api: api} ->
122-
conn
123-
|> put_resp_header(
124-
"cache-control",
125-
"public, max-age=5, stale-while-revalidate=5"
126-
)
127-
|> put_resp_header(
128-
"electric-cursor",
129-
api.long_poll_timeout
130-
|> Utils.get_next_interval_timestamp(conn.query_params["cursor"])
131-
|> Integer.to_string()
132-
)
133-
134-
%{params: %{live: false}, api: api} ->
135-
conn
136-
|> put_resp_header(
137-
"cache-control",
138-
"public, max-age=#{api.max_age}, stale-while-revalidate=#{api.stale_age}"
139-
)
140-
end
141-
end
142-
14346
defp serve_shape_log(%Conn{assigns: %{request: request}} = conn, _) do
144-
response = Api.serve_shape_log(request)
145-
146-
conn
147-
|> assign(:response, response)
148-
|> Api.Response.send(response)
47+
Api.serve_shape_log(conn, request)
14948
end
15049

15150
defp open_telemetry_attrs(%Conn{assigns: assigns} = conn) do

packages/sync-service/lib/electric/shapes/api.ex

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ defmodule Electric.Shapes.Api do
1313

1414
@options [
1515
inspector: [type: :mod_arg, required: true],
16-
pg_id: [type: :string],
16+
pg_id: [type: {:or, [nil, :string]}],
1717
registry: [type: :atom, required: true],
1818
shape_cache: [type: :mod_arg, required: true],
1919
stack_events_registry: [type: :atom, required: true],
@@ -40,6 +40,7 @@ defmodule Electric.Shapes.Api do
4040
:pg_id,
4141
:registry,
4242
:persistent_kv,
43+
:shape,
4344
:shape_cache,
4445
:stack_events_registry,
4546
:stack_id,
@@ -105,19 +106,60 @@ defmodule Electric.Shapes.Api do
105106
end
106107

107108
defp validate_encoder!(%Api{} = api) do
108-
Map.update!(api, :encoder, &Electric.Shapes.Api.Encoder.validate!/1)
109+
Map.update!(api, :encoder, &Shapes.Api.Encoder.validate!/1)
109110
end
110111

112+
shape_schema_options =
113+
Keyword.merge(Keyword.drop(Shapes.Shape.schema_options(), [:inspector]),
114+
table: [type: :string],
115+
schema: [type: :string],
116+
namespace: [type: :string]
117+
)
118+
119+
shape_schema = NimbleOptions.new!(shape_schema_options)
120+
121+
@type shape_opts() :: [unquote(NimbleOptions.option_typespec(shape_schema))]
122+
111123
@doc """
112-
Validate the parameters for the request.
124+
Create a version of the given configured Api instance that is specific to the
125+
given shape.
113126
114-
Options:
127+
This allows you to provide a locked-down version of the API that ignores
128+
shape-definition parameters such as `table`, `where` and `columns` and only
129+
honours the shape-tailing parameters such as `offset` and `handle`.
130+
"""
131+
@spec predefined_shape(t(), shape_opts()) :: {:ok, t()} | {:error, term()}
132+
def predefined_shape(%Api{} = api, shape_params) do
133+
with {:ok, params} <- normalise_shape_params(shape_params),
134+
opts = Keyword.merge(params, inspector: api.inspector),
135+
{:ok, shape} <- Shapes.Shape.new(opts) do
136+
{:ok, %{api | shape: shape}}
137+
end
138+
end
115139

116-
- `seek: boolean()` - (default: true) once validated should we load the shape's
117-
latest offset information.
140+
defp normalise_shape_params(params) do
141+
case Keyword.fetch(params, :relation) do
142+
{:ok, {n, t}} when is_binary(n) and is_binary(t) ->
143+
{:ok, params}
144+
145+
:error ->
146+
{table_params, shape_params} = Keyword.split(params, [:table, :namespace, :schema])
147+
148+
case {table_params[:table], table_params[:namespace] || table_params[:schema]} do
149+
{nil, nil} ->
150+
{:error, "No relation or table specified"}
151+
152+
{table, nil} when is_binary(table) ->
153+
{:ok, Keyword.put(shape_params, :relation, {"public", table})}
154+
155+
{table, namespace} ->
156+
{:ok, Keyword.put(shape_params, :relation, {namespace, table})}
157+
end
158+
end
159+
end
118160

119-
- `load: boolean()` - (default: true) validate and optionallly create a shape
120-
based on the handle and shape parameters.
161+
@doc """
162+
Validate the parameters for the request.
121163
"""
122164
@spec validate(t(), %{(atom() | binary()) => term()}) ::
123165
{:ok, Request.t()} | {:error, Response.t()}
@@ -139,9 +181,15 @@ defmodule Electric.Shapes.Api do
139181

140182
defp validate_params(api, params) do
141183
with {:ok, request_params} <- Api.Params.validate(api, params) do
142-
request_for_params(api, request_params, %Response{
143-
shape_definition: request_params.shape_definition
144-
})
184+
request_for_params(
185+
api,
186+
request_params,
187+
%Response{
188+
api: api,
189+
params: request_params,
190+
shape_definition: request_params.shape_definition
191+
}
192+
)
145193
end
146194
end
147195

@@ -352,6 +400,39 @@ defmodule Electric.Shapes.Api do
352400
end)
353401
end
354402

403+
def serve_shape_log(%Plug.Conn{} = conn, %Request{} = request) do
404+
response =
405+
case if_not_modified(conn, request) do
406+
{:halt, response} ->
407+
response
408+
409+
{:cont, request} ->
410+
serve_shape_log(request)
411+
end
412+
413+
conn
414+
|> Plug.Conn.assign(:response, response)
415+
|> Response.send(response)
416+
end
417+
418+
def if_not_modified(conn, request) do
419+
etag = Response.etag(request.response, quote: false)
420+
421+
if etag in if_none_match(conn) do
422+
%{response: response} = Request.update_response(request, &%{&1 | status: 304, body: []})
423+
{:halt, response}
424+
else
425+
{:cont, request}
426+
end
427+
end
428+
429+
defp if_none_match(%Plug.Conn{} = conn) do
430+
Plug.Conn.get_req_header(conn, "if-none-match")
431+
|> Enum.flat_map(&String.split(&1, ","))
432+
|> Enum.map(&String.trim/1)
433+
|> Enum.map(&String.trim(&1, <<?">>))
434+
end
435+
355436
defp validate_serve_usage!(request) do
356437
case {request.new_changes_pid, self()} do
357438
{nil, _} ->

packages/sync-service/lib/electric/shapes/api/params.ex

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ defmodule Electric.Shapes.Api.Params do
5656
def validate(%Electric.Shapes.Api{} = api, params) do
5757
params
5858
|> cast_params()
59-
|> validate_required([:table, :offset])
59+
|> validate_required([:offset])
6060
|> cast_offset()
6161
|> validate_handle_with_offset()
6262
|> validate_live_with_offset()
63-
|> cast_root_table(inspector: api.inspector)
63+
|> cast_root_table(api)
6464
|> apply_action(:validate)
6565
|> convert_error(api)
6666
end
@@ -77,7 +77,7 @@ defmodule Electric.Shapes.Api.Params do
7777
%{changes: %{table: _table}} = changeset ->
7878
changeset
7979
|> validate_required([:table])
80-
|> cast_root_table(inspector: api.inspector)
80+
|> cast_root_table(api)
8181
|> apply_action(:validate)
8282
|> convert_error(api)
8383

@@ -151,7 +151,21 @@ defmodule Electric.Shapes.Api.Params do
151151

152152
def cast_root_table(%Ecto.Changeset{valid?: false} = changeset, _), do: changeset
153153

154-
def cast_root_table(%Ecto.Changeset{} = changeset, opts) do
154+
def cast_root_table(%Ecto.Changeset{} = changeset, %Api{shape: nil} = api) do
155+
changeset
156+
|> validate_required([:table])
157+
|> define_shape(api)
158+
end
159+
160+
def cast_root_table(%Ecto.Changeset{} = changeset, %Api{shape: %Shape{} = shape}) do
161+
put_change(changeset, :shape_definition, shape)
162+
end
163+
164+
defp define_shape(%Ecto.Changeset{valid?: false} = changeset, _api) do
165+
changeset
166+
end
167+
168+
defp define_shape(%Ecto.Changeset{} = changeset, api) do
155169
table = fetch_change!(changeset, :table)
156170
where = fetch_field!(changeset, :where)
157171
columns = get_change(changeset, :columns, nil)
@@ -160,16 +174,14 @@ defmodule Electric.Shapes.Api.Params do
160174

161175
case Shape.new(
162176
table,
163-
opts ++
164-
[
165-
where: where,
166-
columns: columns,
167-
replica: replica,
168-
storage: %{compaction: if(compaction_enabled?, do: :enabled, else: :disabled)}
169-
]
177+
where: where,
178+
columns: columns,
179+
replica: replica,
180+
inspector: api.inspector,
181+
storage: %{compaction: if(compaction_enabled?, do: :enabled, else: :disabled)}
170182
) do
171-
{:ok, result} ->
172-
put_change(changeset, :shape_definition, result)
183+
{:ok, shape} ->
184+
put_change(changeset, :shape_definition, shape)
173185

174186
{:error, {field, reasons}} ->
175187
Enum.reduce(List.wrap(reasons), changeset, fn

0 commit comments

Comments
 (0)