Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions buffer/buffer_needs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package buffer

import "math"

func BestRoot(availableBufs, size int32) int32 {
// いくらかは余裕を持たせておく
actualAvailable := availableBufs - 2
if actualAvailable <= 1 {
return 1
}

var k int32 = math.MaxInt32
i := 1.0
for k > actualAvailable {
i++
k = int32(math.Ceil(math.Pow(float64(size), 1/i)))
}
return k
}

func BestFactor(availableBufs, size int32) int32 {
// いくらかは余裕を持たせておく
actualAvailable := availableBufs - 2
if actualAvailable <= 1 {
return 1
}

var k int32 = size
i := 1.0
for k > actualAvailable {
i++
k = int32(math.Ceil(float64(size) / i))
}

return k
}
128 changes: 128 additions & 0 deletions plan/multi_buffer_product_plan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
package plan

import (
"fmt"

"github.com/team-gonyo/gonyo-db/query"
"github.com/team-gonyo/gonyo-db/record"
"github.com/team-gonyo/gonyo-db/scan"
"github.com/team-gonyo/gonyo-db/tx"
)

type MultiBufferProductPlan struct {
tx *tx.Transaction
lhs, rhs Plan
schema *record.Schema
}

var _ Plan = (*MultiBufferProductPlan)(nil)

func NewMultiBufferProductPcan(tx *tx.Transaction, lhs, rhs Plan) *MultiBufferProductPlan {
sch := record.NewSchema()
sch.AddAll(lhs.Schema())
sch.AddAll(rhs.Schema())

return &MultiBufferProductPlan{
tx: tx,
lhs: lhs,
rhs: rhs,
schema: sch,
}
}

func (m *MultiBufferProductPlan) Open() (scan.Scan, error) {
leftScan, err := m.lhs.Open()
if err != nil {
return nil, fmt.Errorf("tried to open lhs scan for multi buffer product: %w", err)
}

temp, err := m.copyRecordsFrom(m.rhs)
if err != nil {
return nil, fmt.Errorf("tried to create copy records from rhs in MultiBuffProduct: %w", err)
}

return query.NewMultiBufferProductScan(m.tx, leftScan, temp.TableName() + ".tbl", temp.Layout())
}

func (m *MultiBufferProductPlan) copyRecordsFrom(p Plan) (*query.TempTable, error) {
src, err := p.Open()
if err != nil {
return nil, fmt.Errorf("tried to open TablePlan in MultiBufferProduct: %w", err)
}
schema := p.Schema()
tt, err := query.NewTempTable(m.tx, schema)
if err != nil {
return nil, fmt.Errorf("tried to create TempTable in MultiBufferProduct: %w", err)
}
dest, err := tt.Open()
if err != nil {
return nil, fmt.Errorf("tried to open TempTable in MultiBufferProduct: %w", err)
}

for {
hasMore, err := src.Next()
if err != nil {
return nil, fmt.Errorf("error on advancing src table in MultiBufferProduct: %w", err)
}
if !hasMore {
break
}

err = dest.Insert()
if err != nil {
return nil, fmt.Errorf("failed to insert record in TempTable: %w", err)
}
for _, fieldName := range schema.Fields() {
val, err := src.GetVal(fieldName)
if err != nil {
return nil, fmt.Errorf("failed to retrieve data from src in MultiBufferProduct#copy: %w", err)
}
err = dest.SetVal(fieldName, val)
if err != nil {
return nil, err
}
}
}

src.Close()
dest.Close()
return tt, nil
}

func (m *MultiBufferProductPlan) BlocksAccessed() (int, error) {
// this guesses at the # of chunks
availabble := m.tx.AvailableBuffs()
size, err := NewMaterializePlan(m.tx, m.rhs).BlocksAccessed()
if err != nil {
return 0, fmt.Errorf("failed to calculate MaterializedPlan's block access in MultiBufferProductPlan#BlockAccess: %w", err)
}
numChunks := size / int(availabble)

rhsBlockAccesses, err := m.rhs.BlocksAccessed()
if err != nil {
return 0, fmt.Errorf("failed to calculate rhs block access in MultiBufferProductPlan#BlockAccess: %w", err)
}

lhsBlockAccesses, err := m.lhs.BlocksAccessed()
if err != nil {
return 0, fmt.Errorf("failed to calculate lhs block access in MultiBufferProductPlan#BlockAccess: %w", err)
}

return rhsBlockAccesses + (lhsBlockAccesses * numChunks), nil
}

func (m *MultiBufferProductPlan) DistinctValues(fldName string) int {
if m.lhs.Schema().HasField(fldName) {
return m.lhs.DistinctValues(fldName)
} else {
return m.rhs.DistinctValues(fldName)
}
}

func (m *MultiBufferProductPlan) RecordsOutput() int {
return m.lhs.RecordsOutput() * m.rhs.RecordsOutput()
}

func (m *MultiBufferProductPlan) Schema() *record.Schema {
return m.schema
}
68 changes: 68 additions & 0 deletions plan/multi_buffer_product_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package plan

import (
"testing"
)

func TestMultiBufferProduct(t *testing.T) {
db, planner, _ := createTester(t)

// Create a transaction
tx, err := db.NewTx()
if err != nil {
t.Fatalf("Failed to create transaction: %v", err)
}
defer tx.Commit()

// Create a query plan for todo table
plan1, err := planner.CreateQueryPlan("SELECT id, title FROM todo", tx)
if err != nil {
t.Fatalf("Failed to create query plan: %v", err)
}

// create a query plan for todo_detail table
plan2, err := planner.CreateQueryPlan("SELECT todo_id, detail, some_score FROM todo_detail", tx)
if err != nil {
t.Fatalf("Failed to create query plan for todo_detail: %v", err)
}

// Create a MultiBufferProductPlan
multiBufferProductPlan := NewMultiBufferProductPcan(tx, plan1, plan2)
if multiBufferProductPlan == nil {
t.Fatal("Failed to create MultiBufferProductPlan")
}

// check schema
if len(multiBufferProductPlan.Schema().Fields()) != 5 {
t.Errorf("Expected 5 fields, got %d", len(multiBufferProductPlan.Schema().Fields()))
}
if !multiBufferProductPlan.Schema().HasField("id") ||
!multiBufferProductPlan.Schema().HasField("title") ||
!multiBufferProductPlan.Schema().HasField("todo_id") ||
!multiBufferProductPlan.Schema().HasField("detail") {
t.Error("Schema does not contain expected fields")
}

// Open the scan
scan, err := multiBufferProductPlan.Open()
if err != nil {
t.Fatalf("Failed to open scan: %v", err)
}
defer scan.Close()

// check scan length
length := 0
for {
ok, err := scan.Next()
if err != nil {
t.Fatalf("Failed to get next record: %v", err)
}
if !ok {
break
}
length++
}
if length != 12 {
t.Errorf("Expected 6 records, got %d", length)
}
}
136 changes: 136 additions & 0 deletions query/multi_buffer_product_scan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package query

import (
"fmt"
"log"

"github.com/team-gonyo/gonyo-db/buffer"
"github.com/team-gonyo/gonyo-db/record"
"github.com/team-gonyo/gonyo-db/scan"
"github.com/team-gonyo/gonyo-db/tx"
)

type MultiBufferProductScan struct {
tx *tx.Transaction
lhsScan scan.Scan
rhsScan scan.Scan
productScan scan.Scan
fileName string
layout *record.Layout
chunkSize int32 // チャンクに含まれるブロック数
nextBlockNum int32 // 次のチャンク開始ブロック
fileSize int32
}

var _ scan.Scan = (*MultiBufferProductScan)(nil)

func NewMultiBufferProductScan(
tx *tx.Transaction,
lhsScan scan.Scan,
fileName string,
layout *record.Layout,
) (*MultiBufferProductScan, error) {
available := tx.AvailableBuffs()
fileSize, err := tx.Size(fileName)
if err != nil {
return nil, fmt.Errorf("failed to construct MultiBufferProductScan: %w", err)
}

scan := &MultiBufferProductScan{
tx: tx,
lhsScan: lhsScan,
fileName: fileName,
layout: layout,
chunkSize: buffer.BestFactor(available, fileSize),
fileSize: fileSize,
}
scan.BeforeFirst()

return scan, nil
}

func (m *MultiBufferProductScan) BeforeFirst() {
m.nextBlockNum = 0
// FIXME: エラー握りつぶし
_, err := m.useNextChunk()
if err != nil {
log.Printf("failed to invoke MultiBufferProductScan#BeforeFirst(): %v", err)
}
}

// チャンクを移動する
func (m *MultiBufferProductScan) useNextChunk() (bool, error) {
if m.rhsScan != nil {
m.rhsScan.Close() // productScanをcloseするとlhsもcloseされちゃう
}
if m.nextBlockNum >= m.fileSize {
return false, nil
}

// 境界値揃える
end := m.nextBlockNum + m.chunkSize - 1
if end >= m.fileSize {
end = m.fileSize - 1
}

// 新しいチャンク作る
newRhsScan, err := scan.NewChunkScan(m.tx, m.fileName, m.layout, m.nextBlockNum, end)
if err != nil {
return false, fmt.Errorf("failed to move to next chunk: %w", err)
}
m.rhsScan = newRhsScan

// 左を最初に持ってきて
m.lhsScan.BeforeFirst()
m.productScan, err = NewProductScan(m.lhsScan, m.rhsScan)
if err != nil {
return false, fmt.Errorf("failed to open ProdScan internally in MultiBuffProdScan: %w", err)
}
m.nextBlockNum = end + 1
return true, nil
}

func (m *MultiBufferProductScan) Next() (bool, error) {
for {
// 今見てるproduct scanにレコードがあればそれで良い
hasMore, err := m.productScan.Next()
if err != nil {
return false, fmt.Errorf("error on advancing next record in MultiBuffProdScan: %w", err)
}
if hasMore {
break
}

// 今見てるproduct scanにレコードがなかったら、新しいチャンクを用意する
// 新しいチャンクを用意できなければスキャンが終了したということ
hasMore, err = m.useNextChunk()
if err != nil {
return false, fmt.Errorf("MultiBuffProdScan failed to move to next chunk on executing Next(): %w", err)
}
if !hasMore {
return false, nil
}
}

return true, nil
}

func (m *MultiBufferProductScan) Close() {
m.productScan.Close() // lhsもrhsもこいつがCloseしてくれる
}

func (m *MultiBufferProductScan) GetInt(fldName string) (int32, error) {
return m.productScan.GetInt(fldName)
}

func (m *MultiBufferProductScan) GetString(fldName string) (string, error) {
return m.productScan.GetString(fldName)
}

func (m *MultiBufferProductScan) GetVal(fldName string) (scan.Constant, error) {
return m.productScan.GetVal(fldName)
}

func (m *MultiBufferProductScan) HasField(fldName string) bool {
return m.productScan.HasField(fldName)
}
Loading
Loading