-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmain.go
More file actions
104 lines (91 loc) · 2.57 KB
/
Copy pathmain.go
File metadata and controls
104 lines (91 loc) · 2.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package main
import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"cloud.google.com/go/spanner"
admin "cloud.google.com/go/spanner/admin/database/apiv1"
"github.com/flowerinthenight/spindle/v3"
)
func main() {
dbstr := flag.String("db", "", "db, fmt: projects/{v}/instances/{v}/databases/{v}")
table := flag.String("table", "testlease", "table name")
name := flag.String("name", "mylock", "lock name")
dbg := flag.Bool("dbg", false, "enable verbose debug logging")
flag.Parse()
// To run, update the database name, table name, and, optionally, the lock name.
// Auth depends on environment's ADC.
ctx := context.Background()
db, err := spanner.NewClient(ctx, *dbstr)
if err != nil {
log.Println(err)
return
}
defer db.Close()
dbAdminClient, err := admin.NewDatabaseAdminClient(ctx)
if err != nil {
log.Println(err)
return
}
defer dbAdminClient.Close()
quit, cancel := context.WithCancel(ctx)
// Try running multiple instances of this binary in separate terminals
// pointing to the same database, table, and lock name. You should see:
// - Only one instance logs "doing leader work" at a time.
// - When you Ctrl+C the leader, it releases the lock and shuts down.
// - Another instance picks up leadership and starts doing work.
id := fmt.Sprintf("node-%d", os.Getpid())
lock, err := spindle.New(
db,
*table,
*name,
spindle.WithId(id),
spindle.WithDuration(15),
spindle.WithDatabaseAdminClient(dbAdminClient),
spindle.WithDebug(*dbg),
spindle.WithLeaderCallback(nil, func(ctx context.Context, state spindle.LeaderState) {
if !state.Leader {
log.Printf("[%s] lost leadership, stopping work", id)
return
}
log.Printf("[%s] selected as leader, token: %v", id, state.Token)
// Do leader work using ctx; cancelled when leadership is lost.
// Use state.Token as a fencing token for downstream conditional writes.
go func() {
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Printf("[%s] leader context cancelled", id)
return
case <-ticker.C:
log.Printf("[%s] doing leader work (token=%v)", id, state.Token)
}
}
}()
}),
)
if err != nil {
log.Println(err)
return
}
done := make(chan error, 1)
lock.Run(quit, done) // start main loop
go func() {
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, syscall.SIGINT, syscall.SIGTERM)
<-sigch
cancel() // triggers lock release if this node is the leader
}()
err = <-done
if err != nil {
log.Println(err)
}
log.Printf("[%s] shut down", id)
}