-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathpackets.go
More file actions
247 lines (236 loc) · 7.93 KB
/
packets.go
File metadata and controls
247 lines (236 loc) · 7.93 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
package chotki
import (
"errors"
"github.com/cockroachdb/pebble"
"github.com/drpcorg/chotki/host"
"github.com/drpcorg/chotki/indexes"
"github.com/drpcorg/chotki/protocol"
"github.com/drpcorg/chotki/rdx"
)
// Updates the Vkey0, which stores version vectors for different replicas.
// But also updates the corresponding block (check replication protocol description) version vector.
// Version vector is a map: replica src id -> latest seen rdx.ID
func (cho *Chotki) UpdateVTree(id, ref rdx.ID, pb *pebble.Batch) (err error) {
// id itself contains src and id, the final value will be done through merge.
v := protocol.Record('V', id.ZipBytes())
// updating block version vector
err = pb.Merge(host.VKey(ref), v, cho.opts.PebbleWriteOptions)
if err == nil {
// updating global version vector
err = pb.Merge(host.VKey0, v, cho.opts.PebbleWriteOptions)
}
return
}
// During the diff sync it handles the 'D' packets which most of the time contains a single block (look at the replication protocol description).
// It does not immediately apply the changes to DB, instead using a batch.
// The batch will be applied when we finish the diffsync, when we receive the 'V' packet.
func (cho *Chotki) ApplyD(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error) {
rest := body
var rdt byte
for len(rest) > 0 && err == nil {
var dzip, bare []byte
// this is id, but its stored as an offset of ref to save some bytes
dzip, rest = protocol.Take('F', rest)
// It also stored as zigzagged
d := rdx.UnzipUint64(dzip)
// we now restore original id
at := ref.ProPlus(d)
rdt, bare, rest = protocol.TakeAny(rest)
// we updated some classes, so dropping cache
if rdt == 'C' {
cho.types.Clear()
}
err = batch.Merge(host.OKey(at, rdt), bare, cho.opts.PebbleWriteOptions)
// adding full scan index if the object was created
if err == nil && rdt == 'O' {
cid := rdx.IDFromZipBytes(bare)
err = cho.IndexManager.AddFullScanIndex(cid, at, batch)
} else {
// check if we need add other types of indexes
err = cho.IndexManager.OnFieldUpdate(rdt, at, rdx.BadId, bare, batch)
}
}
return
}
// During the diff sync it handles the 'H' packets which contains the version vector of
// other replica. So we put their version vector into the batch to update ours after diff sync.
func (cho *Chotki) ApplyH(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error) {
_, rest := protocol.Take('M', body)
var vbody []byte
vbody, _ = protocol.Take('V', rest)
err = batch.Merge(host.VKey0, vbody, cho.opts.PebbleWriteOptions)
return
}
// During the diff sync it handles the 'V' packets. This packet effectively completes the diffy sync.
// It contains the version vector of the blocks we synced during the diff sync.
// We put then in the batch to update our blocks version vectors.
func (cho *Chotki) ApplyV(id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error) {
rest := body
for len(rest) > 0 {
var rec, idb []byte
// take block version vector
rec, rest = protocol.Take('V', rest)
// take block id
idb, rec = protocol.Take('R', rec)
id := rdx.IDFromZipBytes(idb)
key := host.VKey(id)
if !rdx.VValid(rec) {
err = ErrBadVPacket
} else {
err = batch.Merge(key, rec, cho.opts.PebbleWriteOptions)
}
}
return
}
// Handles 'C' packets which can occur from applying local changes or live sync.
// Creates or updates class definition.
// Classes are special objects. They are stored in separate key range in pebble.
// They also have more simplified behaviour in create/update scenarios: they are just created/replaced as is.
// All classes are created with rid = rdx.ID0, if you pass other ref, it should be real ref of what you want to edit.
func (cho *Chotki) ApplyC(id, ref rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error) {
cid := id
// editing class
if ref != rdx.ID0 {
cid = ref
}
err = batch.Merge(
host.OKey(cid, 'C'),
body,
cho.opts.PebbleWriteOptions)
if err == nil {
err = cho.UpdateVTree(id, cid, batch)
}
if err == nil {
var tasks []indexes.ReindexTask
tasks, err = cho.IndexManager.HandleClassUpdate(id, cid, body)
if err == nil {
for _, task := range tasks {
err = batch.Merge(task.Key(), task.Value(), cho.opts.PebbleWriteOptions)
if err != nil {
break
}
}
}
}
return
}
// Handles 'O' packets which can occur from applying local changes or live sync.
// Creates objects 'O'. 'Y' are special kind of objects, unused at the moment.
// Typically expects an 'O' package:
// 0 — 'O' record that contains class rdx.ID
// 1 - ... - class fields
// First it creates 'O' field.
// Then it goes through the rest of the fields encoded as TLV. The only transformation it does:
// it sets the current replica src id for FIRST/MEL types, because historically they are not set
// when creating those fields (for convinience?)
func (cho *Chotki) ApplyOY(lot byte, id, ref rdx.ID, body []byte, batch *pebble.Batch) (err error) {
// creating 'O' field, ref is class rdx.ID
err = batch.Merge(
host.OKey(id, lot),
ref.ZipBytes(),
cho.opts.PebbleWriteOptions)
rest := body
var fid rdx.ID
// 0 is 'O', so we start from 1
for fno := 1; len(rest) > 0 && err == nil; fno++ {
lit, hlen, blen := protocol.ProbeHeader(rest)
if lit == 0 || lit == '-' {
return rdx.ErrBadPacket
}
var bare, rebar []byte
rlen := hlen + blen
if len(rest) < rlen {
return ErrBadOPacket
}
bare = rest[hlen:rlen]
fid = id.ToOff(uint64(fno))
fkey := host.OKey(fid, lit)
// setting current replica src id for FIRST/MEL types
switch lit {
case 'F', 'I', 'R', 'S', 'T':
rebar, err = rdx.SetSourceFIRST(bare, id.Src())
case 'E', 'L', 'M':
rebar, err = rdx.MelReSource(bare, id.Src())
default:
// for NZ types and mauy be others, we expect that the src id is already set
rebar = bare
}
if err != nil {
break
}
err = batch.Merge(
fkey,
rebar,
cho.opts.PebbleWriteOptions)
rest = rest[rlen:]
if err == nil {
err = cho.IndexManager.OnFieldUpdate(lit, fid, ref, rebar, batch)
}
}
if err == nil {
err = cho.UpdateVTree(fid, id, batch)
}
if err == nil && lot == 'O' {
err = cho.IndexManager.AddFullScanIndex(ref, id, batch)
}
return
}
var ErrOffsetOpId = errors.New("op id is offset")
// Handles 'E' packets which can occur from applying local changes or live sync.
// Edits obkject fields. Unlike ApplyOY, it does not assume that we update whole object,
// as we can update individual fields.
// It also sets the current replica src id for FIRST/MEL types. Otherwise its just merges bytes into the batch.
func (cho *Chotki) ApplyE(id, r rdx.ID, body []byte, batch *pebble.Batch, calls *[]CallHook) (err error) {
// we either supply id of the object (0 offset) or ref should be an id of the object
if id.Off() != 0 || r.Off() != 0 {
return ErrOffsetOpId
}
rest := body
for len(rest) > 0 && err == nil {
var fint, bare, rebar []byte
var lit byte
// field always starts with 'F', which encodes field offset
fint, rest = protocol.Take('F', rest)
if fint == nil {
return ErrBadEPacket
}
field := rdx.UnzipUint64(fint)
if field > uint64(rdx.OffMask) {
return ErrBadEPacket
}
lit, bare, rest = protocol.TakeAny(rest)
// setting current replica src id for FIRST/MEL types
switch lit {
case 'F', 'I', 'R', 'S', 'T':
rebar, err = rdx.SetSourceFIRST(bare, id.Src())
case 'E', 'L', 'M':
rebar, err = rdx.MelReSource(bare, id.Src())
default:
// for NZ types and mauy be others, we expect that the src id is already set
rebar = bare
}
if err != nil {
break
}
fid := r.ToOff(field)
fkey := host.OKey(fid, lit)
err = batch.Merge(
fkey,
rebar,
cho.opts.PebbleWriteOptions)
if err == nil {
err = cho.IndexManager.OnFieldUpdate(lit, fid, rdx.BadId, rebar, batch)
}
// hooks are used for REPL sometimes (or where used), otherwise unused
hook, ok := cho.hooks.Load(fid)
if ok {
for _, h := range hook {
(*calls) = append((*calls), CallHook{h, fid})
}
}
}
if err == nil {
err = cho.UpdateVTree(id, r, batch)
}
return
}