Skip to content

Commit 05fd91d

Browse files
committed
Remove speculative Arrow overlap trimming
Keep the JDBC-parity row count cap for CloudFetch payloads and avoid adding overlap handling that is not reproducible. Signed-off-by: Madhavendra Rathore <madhavendra.rathore@databricks.com>
1 parent f88a618 commit 05fd91d

2 files changed

Lines changed: 11 additions & 112 deletions

File tree

internal/rows/arrowbased/arrowRecordIterator.go

Lines changed: 11 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -38,67 +38,29 @@ type arrowRecordIterator struct {
3838
isFinished bool
3939
arrowSchemaBytes []byte
4040
arrowSchema *arrow.Schema
41-
nextRowNumber int64
42-
hasNextRowNumber bool
4341
}
4442

4543
var _ rows.ArrowBatchIterator = (*arrowRecordIterator)(nil)
4644

4745
// Retrieve the next arrow record
4846
func (ri *arrowRecordIterator) Next() (arrow.Record, error) {
49-
for {
50-
if !ri.HasNext() {
51-
// returning EOF indicates that there are no more records to iterate
52-
return nil, io.EOF
53-
}
54-
55-
// make sure we have the current batch
56-
err := ri.getCurrentBatch()
57-
if err != nil {
58-
return nil, err
59-
}
60-
61-
// return next record in current batch
62-
r, err := ri.currentBatch.Next()
63-
if err != nil {
64-
ri.checkFinished()
65-
return nil, err
66-
}
67-
68-
r2 := ri.skipReturnedRows(r)
69-
ri.checkFinished()
70-
if r2 == nil {
71-
continue
72-
}
73-
74-
return r2, nil
47+
if !ri.HasNext() {
48+
// returning EOF indicates that there are no more records to iterate
49+
return nil, io.EOF
7550
}
76-
}
7751

78-
func (ri *arrowRecordIterator) skipReturnedRows(r SparkArrowRecord) arrow.Record {
79-
if !ri.hasNextRowNumber {
80-
ri.nextRowNumber = r.Start()
81-
ri.hasNextRowNumber = true
52+
// make sure we have the current batch
53+
err := ri.getCurrentBatch()
54+
if err != nil {
55+
return nil, err
8256
}
8357

84-
if r.End() < ri.nextRowNumber {
85-
r.Release()
86-
return nil
87-
}
58+
// return next record in current batch
59+
r, err := ri.currentBatch.Next()
8860

89-
if r.Start() < ri.nextRowNumber {
90-
start := ri.nextRowNumber - r.Start()
91-
sliced := r.NewSlice(start, r.NumRows())
92-
r.Release()
93-
if sliced == nil {
94-
return nil
95-
}
96-
ri.nextRowNumber += sliced.NumRows()
97-
return sliced
98-
}
61+
ri.checkFinished()
9962

100-
ri.nextRowNumber = r.End() + 1
101-
return r
63+
return r, err
10264
}
10365

10466
// Indicate whether there are any more records available

internal/rows/arrowbased/arrowRecordIterator_test.go

Lines changed: 0 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -194,69 +194,6 @@ func TestArrowRecordIterator(t *testing.T) {
194194
})
195195
}
196196

197-
func TestArrowRecordIterator_SkipsOverlappingReturnedRows(t *testing.T) {
198-
var releasedOverlappingRecord bool
199-
var slicedStart int64
200-
var slicedEnd int64
201-
202-
overlappingRecord := fakeRecord{
203-
fnNumRows: func() int64 { return 10 },
204-
fnRelease: func() {
205-
releasedOverlappingRecord = true
206-
},
207-
fnNewSlice: func(i, j int64) arrow.Record {
208-
slicedStart = i
209-
slicedEnd = j
210-
return fakeRecord{fnNumRows: func() int64 { return j - i }}
211-
},
212-
}
213-
214-
rs := &arrowRecordIterator{
215-
batchIterator: &fakeBatchIterator{
216-
index: -1,
217-
batches: []SparkArrowBatch{
218-
&sparkArrowBatch{
219-
Delimiter: rowscanner.NewDelimiter(0, 5),
220-
arrowRecords: []SparkArrowRecord{
221-
&sparkArrowRecord{
222-
Delimiter: rowscanner.NewDelimiter(0, 5),
223-
Record: fakeRecord{fnNumRows: func() int64 { return 5 }},
224-
},
225-
},
226-
},
227-
&sparkArrowBatch{
228-
Delimiter: rowscanner.NewDelimiter(0, 10),
229-
arrowRecords: []SparkArrowRecord{
230-
&sparkArrowRecord{
231-
Delimiter: rowscanner.NewDelimiter(0, 10),
232-
Record: overlappingRecord,
233-
},
234-
},
235-
},
236-
},
237-
},
238-
}
239-
defer rs.Close()
240-
241-
r1, err := rs.Next()
242-
assert.NoError(t, err)
243-
assert.Equal(t, int64(5), r1.NumRows())
244-
r1.Release()
245-
246-
r2, err := rs.Next()
247-
assert.NoError(t, err)
248-
assert.Equal(t, int64(5), r2.NumRows())
249-
r2.Release()
250-
251-
assert.True(t, releasedOverlappingRecord)
252-
assert.Equal(t, int64(5), slicedStart)
253-
assert.Equal(t, int64(10), slicedEnd)
254-
255-
r3, err := rs.Next()
256-
assert.Nil(t, r3)
257-
assert.ErrorIs(t, err, io.EOF)
258-
}
259-
260197
func TestLimitArrowRecordsUsesExpectedRowCount(t *testing.T) {
261198
var releasedOriginal bool
262199
var releasedExtra bool

0 commit comments

Comments
 (0)