forked from onflow/flow-go
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathfinalized_snapshot.go
More file actions
137 lines (112 loc) · 3.87 KB
/
Copy pathfinalized_snapshot.go
File metadata and controls
137 lines (112 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package synchronization
import (
"fmt"
"sync"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/consensus/hotstuff/notifications/pubsub"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/lifecycle"
"github.com/onflow/flow-go/state/protocol"
)
// FinalizedHeaderCache represents the cached value of the latest finalized header.
// It is used in Engine to access latest valid data.
type FinalizedHeaderCache struct {
mu sync.RWMutex
log zerolog.Logger
state protocol.State
lastFinalizedHeader *flow.Header
finalizationEventNotifier engine.Notifier // notifier for finalization events
lm *lifecycle.LifecycleManager
stopped chan struct{}
}
// NewFinalizedHeaderCache creates a new finalized header cache.
func NewFinalizedHeaderCache(log zerolog.Logger, state protocol.State, finalizationDistributor *pubsub.FinalizationDistributor) (*FinalizedHeaderCache, error) {
cache := &FinalizedHeaderCache{
state: state,
lm: lifecycle.NewLifecycleManager(),
log: log.With().Str("component", "finalized_snapshot_cache").Logger(),
finalizationEventNotifier: engine.NewNotifier(),
stopped: make(chan struct{}),
}
snapshot, err := cache.getHeader()
if err != nil {
return nil, fmt.Errorf("could not apply last finalized state")
}
cache.lastFinalizedHeader = snapshot
finalizationDistributor.AddOnBlockFinalizedConsumer(cache.onFinalizedBlock)
return cache, nil
}
// Get returns the last locally cached finalized header.
func (f *FinalizedHeaderCache) Get() *flow.Header {
f.mu.RLock()
defer f.mu.RUnlock()
return f.lastFinalizedHeader
}
func (f *FinalizedHeaderCache) getHeader() (*flow.Header, error) {
finalSnapshot := f.state.Final()
head, err := finalSnapshot.Head()
if err != nil {
return nil, fmt.Errorf("could not get last finalized header: %w", err)
}
return head, nil
}
// updateHeader updates latest locally cached finalized header.
func (f *FinalizedHeaderCache) updateHeader() error {
f.log.Debug().Msg("updating header")
head, err := f.getHeader()
if err != nil {
f.log.Err(err).Msg("failed to get header")
return err
}
f.log.Debug().
Str("block_id", head.ID().String()).
Uint64("height", head.Height).
Msg("got new header")
f.mu.Lock()
defer f.mu.Unlock()
if f.lastFinalizedHeader.Height < head.Height {
f.lastFinalizedHeader = head
}
return nil
}
func (f *FinalizedHeaderCache) Ready() <-chan struct{} {
f.lm.OnStart(func() {
go f.finalizationProcessingLoop()
})
return f.lm.Started()
}
func (f *FinalizedHeaderCache) Done() <-chan struct{} {
f.lm.OnStop(func() {
<-f.stopped
})
return f.lm.Stopped()
}
// onFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer`
// (1) Updates local state of last finalized snapshot.
//
// CAUTION: the input to this callback is treated as trusted; precautions should be taken that messages
// from external nodes cannot be considered as inputs to this function
func (f *FinalizedHeaderCache) onFinalizedBlock(block *model.Block) {
f.log.Debug().Str("block_id", block.BlockID.String()).Msg("received new block finalization callback")
// notify that there is new finalized block
f.finalizationEventNotifier.Notify()
}
// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events
func (f *FinalizedHeaderCache) finalizationProcessingLoop() {
defer close(f.stopped)
f.log.Debug().Msg("starting finalization processing loop")
notifier := f.finalizationEventNotifier.Channel()
for {
select {
case <-f.lm.ShutdownSignal():
return
case <-notifier:
err := f.updateHeader()
if err != nil {
f.log.Fatal().Err(err).Msg("could not process latest finalized block")
}
}
}
}