Skip to content
This repository was archived by the owner on Dec 16, 2022. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,8 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
// Unreachable.
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected number or shards: %v", rss)
}

log.Infof("Slack2: starting to vstream from keyspace: %s, shard: %s", sgtid.Keyspace, sgtid.Shard)
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, sgtid.TablePKs, vs.filter, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
Expand Down Expand Up @@ -454,7 +456,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return err
}

if err := vs.sendAll(sgtid, eventss); err != nil {
if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return err
}
eventss = nil
Expand Down Expand Up @@ -517,7 +519,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
}

// sendAll sends a group of events together while holding the lock.
func (vs *vstream) sendAll(sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error {
vs.mu.Lock()
defer vs.mu.Unlock()

Expand Down Expand Up @@ -564,7 +566,12 @@ func (vs *vstream) sendAll(sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdat
}
}
}
vs.eventCh <- events

select {
case <-ctx.Done():
return nil
case vs.eventCh <- events:
}
}
return nil
}
Expand Down