Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 35 additions & 32 deletions arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (ssa *SingleSegmentArena) Release() {
// buffers, allocating new buffers of exponentially-increasing size when
// full. This avoids the potentially-expensive slice copying of SingleSegment.
type MultiSegmentArena struct {
segs []Segment
segs []*Segment

// rawData is set when the individual segments were all demuxed from
// the passed raw data slice.
Expand Down Expand Up @@ -204,12 +204,16 @@ func (msa *MultiSegmentArena) Release() {
msa.rawData = nil

for i := range msa.segs {
if msa.segs[i] == nil {
continue
}
if msa.bp != nil {
zeroSlice(msa.segs[i].data)
msa.bp.Put(msa.segs[i].data)
}
msa.segs[i].data = nil
msa.segs[i].BindTo(nil)
msa.segs[i] = nil
}

if msa.segs != nil {
Expand All @@ -227,15 +231,17 @@ func (msa *MultiSegmentArena) Release() {
// Like MultiSegment, but doesn't use the pool
func multiSegment(b [][]byte) *MultiSegmentArena {
var bp *bufferpool.Pool
var segs []Segment
var segs []*Segment
if b == nil {
bp = &bufferpool.Default
segs = make([]Segment, 0, 5) // Typical size.
segs = make([]*Segment, 0, 5) // Typical size.
} else {
segs = make([]Segment, len(b))
segs = make([]*Segment, len(b))
for i := range b {
segs[i].data = b[i]
segs[i].id = SegmentID(i)
segs[i] = &Segment{
data: b[i],
id: SegmentID(i),
}
}
}
return &MultiSegmentArena{segs: segs, bp: bp}
Expand Down Expand Up @@ -264,7 +270,7 @@ func (msa *MultiSegmentArena) demux(hdr streamHeader, data []byte, bp *bufferpoo
msa.segs = msa.segs[:numSegs]
} else {
inc := numSegs - len(msa.segs)
msa.segs = append(msa.segs, make([]Segment, inc)...)
msa.segs = append(msa.segs, make([]*Segment, inc)...)
}

rawData := data
Expand All @@ -273,9 +279,13 @@ func (msa *MultiSegmentArena) demux(hdr streamHeader, data []byte, bp *bufferpoo
if err != nil {
return err
}

msa.segs[i].data, data = data[:sz:sz], data[sz:]
msa.segs[i].id = i
seg := msa.segs[i]
if seg == nil {
seg = &Segment{id: i}
msa.segs[i] = seg
}
seg.data, data = data[:sz:sz], data[sz:]
seg.id = i
}

msa.rawData = rawData
Expand All @@ -291,23 +301,14 @@ func (msa *MultiSegmentArena) Segment(id SegmentID) *Segment {
if int(id) >= len(msa.segs) {
return nil
}
return &msa.segs[id]
return msa.segs[id]
}

func (msa *MultiSegmentArena) Allocate(sz Size, msg *Message, seg *Segment) (*Segment, address, error) {
// Prefer allocating in seg if it has capacity.
if seg != nil && hasCapacity(seg.data, sz) {
// Double check this segment is part of this arena.
contains := false
for i := range msa.segs {
if &msa.segs[i] == seg {
contains = true
break
}
}

if !contains {
// This is a usage error.
// Membership check: validate by id and exact pointer equality.
if int(seg.id) >= len(msa.segs) || msa.segs[seg.id] != seg {
return nil, 0, errors.New("preferred segment is not part of the arena")
}

Expand All @@ -325,14 +326,17 @@ func (msa *MultiSegmentArena) Allocate(sz Size, msg *Message, seg *Segment) (*Se

var total int64
for i := range msa.segs {
if msa.segs[i] == nil {
continue
}
data := msa.segs[i].data
if hasCapacity(data, sz) {
// Found segment with spare capacity.
addr := address(len(msa.segs[i].data))
addr := address(len(data))
newLen := int(addr) + int(sz)
msa.segs[i].data = msa.segs[i].data[:newLen]
msa.segs[i].data = data[:newLen]
msa.segs[i].BindTo(msg)
return &msa.segs[i], addr, nil
return msa.segs[i], addr, nil
}

if total += int64(cap(data)); total < 0 {
Expand Down Expand Up @@ -363,20 +367,19 @@ func (msa *MultiSegmentArena) Allocate(sz Size, msg *Message, seg *Segment) (*Se
return nil, 0, err
}

// We have determined this will be a new segment. Get the backing
// buffer for it.
// We have determined this will be a new segment. Get the backing buffer.
buf := msa.bp.Get(n)
buf = buf[:sz]

// Setup the segment.
id := SegmentID(len(msa.segs))
msa.segs = append(msa.segs, Segment{
newSeg := &Segment{
data: buf,
id: id,
})
res := &msa.segs[int(id)]
res.BindTo(msg)
return res, 0, nil
}
msa.segs = append(msa.segs, newSeg)
newSeg.BindTo(msg)
return newSeg, 0, nil
}

func (msa *MultiSegmentArena) String() string {
Expand Down