-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample_test.go
More file actions
194 lines (159 loc) · 4.92 KB
/
example_test.go
File metadata and controls
194 lines (159 loc) · 4.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package phase_test
import (
"context"
"fmt"
"time"
"github.com/aelse/phase"
)
func ExamplePhaser() {
f := func(ctx context.Context) {
next, _ := phase.Next(ctx)
select {
case <-next.Done():
fmt.Println("context ended")
case <-time.NewTicker(10 * time.Millisecond).C:
fmt.Println("ticker fired")
}
// Wait until child phases end.
// There are none in this example but we demonstrate correct behaviour.
<-next.ChildrenDone()
// Signal that our phase has ended.
next.Close()
}
ctx := context.Background()
f(ctx)
ctx, cancel := context.WithCancel(ctx)
cancel()
f(ctx)
// Output: ticker fired
// context ended
}
func ExamplePhaser_orderedShutdown() {
f := func(p phase.Phaser, message string) {
defer p.Close()
<-p.Done()
<-p.ChildrenDone()
fmt.Println(message)
}
ctx, cancel := context.WithCancel(context.Background())
phaser, _ := phase.Next(ctx)
defer phaser.Close()
p0, _ := phase.Next(phaser)
p1, _ := phase.Next(p0)
p2, _ := phase.Next(p1)
go f(p0, "p0 ended")
go f(p1, "p1 ended")
go f(p2, "p2 ended")
// We expect contexts to end in order: p2, p1, p0.
// Cancel Phaser chain
cancel()
<-phaser.ChildrenDone()
fmt.Println("finished!")
// Output: p2 ended
// p1 ended
// p0 ended
// finished!
}
func ExamplePhaser_funcTree() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
phaser, _ := phase.Next(ctx)
defer phaser.Close()
type recursiveFunc func(f recursiveFunc, ctx context.Context, i, depth int)
f := func(f recursiveFunc, ctx context.Context, i, depth int) {
if i >= depth {
return
}
// Create a phase to cover this function scope, and defer Close.
p, _ := phase.Next(ctx)
defer p.Close()
// Create child funcs up to depth.
go f(f, p, i+1, depth)
// Wait on phase to be done, either due to propagated cancelation
// or a call to phase.Cancel targeting `p` (n/a in this example).
fmt.Printf("func<%d> doing work, waiting for context end\n", i)
<-p.Done()
// Child phases must return first
<-p.ChildrenDone()
fmt.Printf("func<%d> returning\n", i)
}
go f(f, phaser, 0, 5)
// Top level context timeout propagates down and we wait on the phaser.
<-ctx.Done()
fmt.Println("Waiting on top level phaser to complete")
<-phaser.ChildrenDone()
fmt.Println("Top level phaser ended. Bye!")
}
func ExamplePhaser_contexts() {
// Create a top lever phaser which will be cancelled at end of main.
ctx, cancel := context.WithCancel(context.Background())
p0, _ := phase.Next(ctx)
defer p0.Close()
go func(ctx context.Context) {
fmt.Println("started p0 func")
p1, _ := phase.Next(ctx)
defer p1.Close()
type ctxStr string
// Run some other goroutines which take an ordinary context.
for i := range 5 {
// Phasers can be used like any other context. Let's set a value.
ctx := context.WithValue(p1, ctxStr("goroutine"), i)
go func(ctx context.Context) {
//nolint:forcetypeassert
num := ctx.Value(ctxStr("goroutine")).(int)
fmt.Printf("goroutine(%d) started\n", num)
<-ctx.Done()
fmt.Printf("goroutine(%d) finished\n", num)
}(ctx)
}
// Wait for our initiating context cancellation to perform cleanup.
<-ctx.Done()
fmt.Println("Waiting for descendent phases to end (none in this example)")
<-p1.ChildrenDone()
// There is no guarantee on order of completion for the goroutines
// as they only deal in contexts not Phasers. I can try to manage this
// in some other way.
fmt.Println("Waiting a few seconds for goroutines to finish")
time.Sleep(3 * time.Second)
}(p0)
fmt.Println("Shutdown in 5 seconds")
time.Sleep(5 * time.Second)
cancel()
fmt.Println("Waiting on children")
<-p0.ChildrenDone() // Wait until everything has finished.
fmt.Println("Bye!")
}
func ExamplePhaser_phaserDI() {
// Simulate any component that needs to perform cleanup at end of context.
// The first are is a phase.Phaser rather than context.Context so that the
// programmer knows to deal with handling the phase.
// This is a hint to the programmer, not a difference in implementation.
component := func(phaser phase.Phaser, name string) {
defer phaser.Close()
fmt.Printf("%s started\n", name)
<-phaser.Done()
// There might be child phases, so call Wait
<-phaser.ChildrenDone()
fmt.Printf("%s shutting down\n", name)
time.Sleep(time.Second)
}
// Create a top level phase which will be cancelled at end of main.
ctx, cancel := context.WithCancel(context.Background())
p0, _ := phase.Next(ctx)
defer p0.Close()
// Create a tree of phasers from the root and use dependency injection to pass it
// to each component.
p1, _ := phase.Next(p0)
go component(p1, "db")
p2, _ := phase.Next(p1)
go component(p2, "data pipeline")
p3, _ := phase.Next(p2)
go component(p3, "web server")
fmt.Println("Shutdown in 2 seconds")
time.Sleep(2 * time.Second)
// Cancel this context, which cascades down.
cancel()
// Wait until everything has finished.
<-p0.ChildrenDone()
fmt.Println("Bye!")
}