Skip to content

Commit 4aa2f04

Browse files
authored
Merge pull request #12 from lupodevelop/adt-codecs-implementation
v3.1.0 implementation
2 parents bbe7baf + 2a0fe2e commit 4aa2f04

25 files changed

+880
-109
lines changed

.github/workflows/test.yml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ name: test
22

33
on:
44
push:
5-
branches: [main]
5+
branches:
6+
- master
7+
- main
68
pull_request:
79

810
jobs:
@@ -12,8 +14,10 @@ jobs:
1214
- uses: actions/checkout@v4
1315
- uses: erlef/setup-beam@v1
1416
with:
15-
otp-version: "27.0"
17+
otp-version: "28"
1618
gleam-version: "1.14.0"
19+
rebar3-version: "3"
20+
# elixir-version: "1"
1721
- run: gleam deps download
1822
- run: gleam test
1923
- run: gleam format --check src test

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@
44
erl_crash.dump
55
**/build
66
*.log
7+
/notes
78

CHANGELOG.md

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,19 @@
11
# Changelog
22

3+
## v3.1.0 — 2026-03-04
4+
5+
### Added
6+
7+
- **Cluster Monitoring** (`distribute/cluster/monitor`) — Typed event-driven
8+
notifications for `NodeUp` and `NodeDown`.
9+
- `distribute.subscribe(subject)` — Start a monitored subscription.
10+
- `distribute.unsubscribe(monitor_subject)` — Stop a subscription.
11+
- **ADT/Variant Codecs** (`distribute/codec/variant`) — A builder pattern to
12+
easily create codecs for Custom Types (enums) with payload support.
13+
- `variant.new()`, `variant.add()`, `variant.unit()`, `variant.build()`
14+
- **Telemetry** (`distribute/internal/telemetry`) — Erlang `:telemetry` events
15+
for send, receive, encode, decode, registry, and cluster operations.
16+
317
## v3.0.0 — 2026-02-11
418

519
Ground-up rewrite. Smaller API, proper OTP actors, compile-time type safety
@@ -38,4 +52,4 @@ connection pool, retry. Also removed `whereis_global(name, encoder, decoder)`.
3852
- `global.reply(reply_to, response, encoder)` — send a response back.
3953
- `composite.option(c)`, `composite.result(ok, err)`,
4054
`composite.tuple2(a, b)`, `composite.tuple3(a, b, c)`.
41-
- `registry.named(name, codec)` — short form of `typed_name`.
55+
- `registry.named(name, codec)` — short form of `typed_name`.

README.md

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,8 @@ let user_id_codec = codec.map(codec.int(), UserId, fn(uid) {
110110
```
111111

112112
Gleam has no derive macros or reflection, so codecs for complex types
113-
are manual. The combinators handle the serialization you just wire
114-
the fields together.
113+
are manual. The combinators handle the serialization so you just wire
114+
the fields together!
115115

116116
## Modules
117117

@@ -122,11 +122,57 @@ the fields together.
122122
| `distribute/cluster` | `net_kernel` start/connect/ping |
123123
| `distribute/codec` | Binary codecs for primitives + `subject()` |
124124
| `distribute/codec/composite` | Option, Result, Tuple codecs |
125+
| `distribute/codec/variant` | Build codecs for Custom Types (ADTs) |
125126
| `distribute/codec/tagged` | Tagged messages with version field |
126127
| `distribute/global` | `GlobalSubject(msg)`, `call`, `reply` |
128+
| `distribute/cluster/monitor` | `NodeUp`, `NodeDown` typed events |
127129
| `distribute/registry` | `TypedName(msg)`, `:global` registration |
128130
| `distribute/receiver` | Typed receive, OTP actor wrappers |
129131

132+
### Custom Type Codecs
133+
134+
Seamlessly encode and decode your Algebraic Data Types (enums) with a fluent builder.
135+
136+
```gleam
137+
pub type MyMessage {
138+
Text(String)
139+
Ping
140+
}
141+
142+
import distribute/codec
143+
import distribute/codec/variant
144+
145+
let my_codec =
146+
variant.new()
147+
|> variant.add(0, "Text", codec.string(), Text, fn(m) {
148+
case m { Text(s) -> Ok(s); _ -> Error(Nil) }
149+
})
150+
|> variant.unit(1, "Ping", Ping, fn(m) { m == Ping })
151+
|> variant.build()
152+
```
153+
154+
### Cluster Monitoring
155+
156+
Subscribe to cluster events (`NodeUp`, `NodeDown`) to react to node topology changes.
157+
158+
```gleam
159+
import distribute
160+
import distribute/cluster/monitor
161+
162+
let subj = process.new_subject()
163+
let assert Ok(m) = distribute.subscribe(subj)
164+
165+
// In your actor/process
166+
case process.receive(subj, 5000) {
167+
Ok(monitor.NodeUp(node)) -> io.println("Node joined: " <> node)
168+
Ok(monitor.NodeDown(node)) -> io.println("Node left: " <> node)
169+
_ -> Nil
170+
}
171+
172+
// Later
173+
distribute.unsubscribe(m)
174+
```
175+
130176
## Caveats
131177

132178
**What the types catch** — within one codebase, `TypedName` and

gleam.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
name = "distribute"
2-
version = "3.0.0"
2+
version = "3.1.0"
33
description = "Typed distributed messaging for Gleam on the BEAM."
44
licences = ["MIT"]
55
repository = { type = "github", user = "lupodevelop", repo = "distribute" }
@@ -13,6 +13,7 @@ target = "erlang"
1313
gleam_stdlib = ">= 0.60.0 and < 2.0.0"
1414
gleam_erlang = ">= 1.0.0 and < 2.0.0"
1515
gleam_otp = ">= 1.0.0 and < 2.0.0"
16+
telemetry = ">= 1.0.0 and < 2.0.0"
1617

1718
[dev-dependencies]
1819
gleeunit = ">= 1.0.0 and < 2.0.0"

manifest.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@ packages = [
66
{ name = "gleam_otp", version = "1.2.0", build_tools = ["gleam"], requirements = ["gleam_erlang", "gleam_stdlib"], otp_app = "gleam_otp", source = "hex", outer_checksum = "BA6A294E295E428EC1562DC1C11EA7530DCB981E8359134BEABC8493B7B2258E" },
77
{ name = "gleam_stdlib", version = "0.68.1", build_tools = ["gleam"], requirements = [], otp_app = "gleam_stdlib", source = "hex", outer_checksum = "F7FAEBD8EF260664E86A46C8DBA23508D1D11BB3BCC6EE1B89B3BC3E5C83FF1E" },
88
{ name = "gleeunit", version = "1.9.0", build_tools = ["gleam"], requirements = ["gleam_stdlib"], otp_app = "gleeunit", source = "hex", outer_checksum = "DA9553CE58B67924B3C631F96FE3370C49EB6D6DC6B384EC4862CC4AAA718F3C" },
9+
{ name = "telemetry", version = "1.3.0", build_tools = ["rebar3"], requirements = [], otp_app = "telemetry", source = "hex", outer_checksum = "7015FC8919DBE63764F4B4B87A95B7C0996BD539E0D499BE6EC9D7F3875B79E6" },
910
]
1011

1112
[requirements]
1213
gleam_erlang = { version = ">= 1.0.0 and < 2.0.0" }
1314
gleam_otp = { version = ">= 1.0.0 and < 2.0.0" }
1415
gleam_stdlib = { version = ">= 0.60.0 and < 2.0.0" }
1516
gleeunit = { version = ">= 1.0.0 and < 2.0.0" }
17+
telemetry = { version = ">= 1.0.0 and < 2.0.0" }

src/cluster_ffi.erl

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
-module(cluster_ffi).
22
-export([start_node/2, connect/1, nodes/0, self_node/0, ping/1,
3-
is_ok_atom/1, get_error_reason/1, is_true/1, is_ignored/1]).
3+
is_ok_atom/1, get_error_reason/1, is_true/1, is_ignored/1,
4+
nodeup_atom/0, nodedown_atom/0, monitor_nodes/1,
5+
atom_to_string/1, get_node_from_tuple/1,
6+
simulate_node_event/3]).
47

58
-import(distribute_ffi_utils, [to_atom_safe/1]).
69

@@ -80,3 +83,18 @@ to_atom_force(Atom) when is_atom(Atom) ->
8083
is_valid_node_input(Bin) when is_binary(Bin) ->
8184
byte_size(Bin) =< 512 andalso
8285
binary:match(Bin, <<"\0">>) =:= nomatch.
86+
87+
nodeup_atom() -> nodeup.
88+
nodedown_atom() -> nodedown.
89+
90+
monitor_nodes(Flag) ->
91+
net_kernel:monitor_nodes(Flag).
92+
93+
atom_to_string(Atom) when is_atom(Atom) ->
94+
atom_to_binary(Atom, utf8).
95+
96+
get_node_from_tuple({_, Node}) -> Node.
97+
98+
simulate_node_event(Pid, Tag, NodeName) ->
99+
NodeAtom = binary_to_atom(NodeName, utf8),
100+
Pid ! {Tag, NodeAtom}.

src/distribute.gleam

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
1-
//// Facade for common distributed operations: start a node, register
2-
//// actors, send messages, look things up.
1+
//// Facade for common distributed operations.
32

43
import distribute/actor as dist_actor
54
import distribute/cluster
5+
import distribute/cluster/monitor
66
import distribute/codec
77
import distribute/global
88
import distribute/receiver
99
import distribute/registry
10+
import gleam/erlang/process
1011
import gleam/otp/actor
1112

1213
pub fn version() -> String {
13-
"3.0.0"
14+
"3.1.0"
1415
}
1516

1617
// -- Cluster -----------------------------------------------------------------
@@ -84,3 +85,17 @@ pub fn lookup(
8485
pub fn unregister(name: String) -> Result(Nil, registry.RegisterError) {
8586
registry.unregister(name)
8687
}
88+
89+
// -- Cluster Monitoring ------------------------------------------------------
90+
91+
pub fn subscribe(
92+
user_subject: process.Subject(monitor.ClusterEvent),
93+
) -> Result(process.Subject(monitor.ControlMessage), actor.StartError) {
94+
monitor.subscribe(user_subject)
95+
}
96+
97+
pub fn unsubscribe(
98+
monitor_subject: process.Subject(monitor.ControlMessage),
99+
) -> Nil {
100+
monitor.unsubscribe(monitor_subject)
101+
}

src/distribute/actor.gleam

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ pub fn start_supervised(
119119
}
120120
}
121121

122-
/// Start N supervised actors, registered as `name_1` .. `name_N`.
122+
/// Start N supervised actors, registered as name_1 .. name_N.
123123
pub fn pool(
124124
typed_name: registry.TypedName(msg),
125125
size: Int,

src/distribute/cluster.gleam

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,8 @@ fn get_error_reason(value: dynamic.Dynamic) -> String
6060
// ---------------------------------------------------------------------------
6161

6262
/// Start a distributed BEAM node.
63-
///
64-
/// `name` must contain `@` (e.g. `"myapp@127.0.0.1"`).
65-
/// `cookie` must be ≤ 255 characters.
63+
/// Name must contain @ (e.g. myapp@127.0.0.1).
64+
/// Cookie must be 255 characters or fewer.
6665
pub fn start_node(name: String, cookie: String) -> Result(Nil, StartError) {
6766
case validate_node_name(name) {
6867
Error(e) -> Error(e)
@@ -80,7 +79,7 @@ pub fn start_node(name: String, cookie: String) -> Result(Nil, StartError) {
8079
}
8180
}
8281

83-
/// Connect to a remote node. Returns `Ok(Nil)` on success.
82+
/// Connect to a remote node. Returns Ok(Nil) on success.
8483
pub fn connect(node: String) -> Result(Nil, ConnectError) {
8584
case string.contains(node, "@") {
8685
False -> Error(NodeNotFound)
@@ -108,7 +107,7 @@ pub fn self_node() -> String {
108107
self_node_ffi()
109108
}
110109

111-
/// Ping a remote node. Returns `True` if it responds.
110+
/// Ping a remote node. Returns True if it responds.
112111
pub fn ping(node: String) -> Bool {
113112
ping_ffi(node)
114113
}

0 commit comments

Comments
 (0)