-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconnection.go
More file actions
123 lines (106 loc) · 3.54 KB
/
Copy pathconnection.go
File metadata and controls
123 lines (106 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package flink
import (
"context"
"database/sql/driver"
"fmt"
"time"
)
const fetchResultsPollInterval = time.Second
type flinkConn struct {
client GatewayClient
sessionHandle string
}
func (c *flinkConn) Prepare(query string) (driver.Stmt, error) {
return c.PrepareContext(context.Background(), query)
}
func (c *flinkConn) PrepareContext(ctx context.Context, query string) (driver.Stmt, error) {
return &flinkStmt{conn: c, query: query, ctx: ctx}, nil
}
// Close is a no-op for flinkConn, as the session is managed by the connector.
func (c *flinkConn) Close() error {
return nil
}
func (c *flinkConn) Begin() (driver.Tx, error) {
tx, err := c.BeginTx(context.Background(), driver.TxOptions{})
if err != nil {
return nil, err
}
return tx, nil
}
// BeginTx is not supported by Flink SQL Gateway.
func (c *flinkConn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, error) {
return nil, driver.ErrSkip
}
func (c *flinkConn) Ping(ctx context.Context) error {
err := c.client.Heartbeat(ctx, c.sessionHandle)
if err != nil {
return driver.ErrBadConn
}
return err
}
func (c *flinkConn) closeOperation(ctx context.Context, operationHandle string) {
if operationHandle != "" {
c.client.CloseOperation(ctx, c.sessionHandle, operationHandle)
}
}
func (c *flinkConn) cancelOperation(ctx context.Context, operationHandle string) {
if operationHandle != "" {
c.client.CancelOperation(ctx, c.sessionHandle, operationHandle)
}
}
// ExecContext submits a statement and ignores result rows.
func (c *flinkConn) ExecContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Result, error) {
// todo: handle args
opHandle, err := c.client.ExecuteStatement(ctx, c.sessionHandle, &ExecuteStatementRequest{
Statement: query,
})
defer c.closeOperation(ctx, opHandle)
if err != nil {
return nil, fmt.Errorf("flink: ExecContext failed to submit statement: %w", err)
}
_, err = c.fetchUntilResultsReady(ctx, opHandle)
if err != nil {
return nil, fmt.Errorf("flink: ExecContext failed: %w", err)
}
return driver.RowsAffected(0), nil
}
func (c *flinkConn) fetchUntilResultsReady(ctx context.Context, opHandle string) (*FetchResultsResponseBody, error) {
for {
res, err := c.client.FetchResults(ctx, c.sessionHandle, opHandle, "0", "json")
if err != nil {
return nil, fmt.Errorf("flink: FetchResults failed: %w", err)
}
// Retry fetching until the query is ready, honoring context cancellation
if res.ResultType == ResultTypeNotReady {
select {
case <-ctx.Done():
c.closeOperation(ctx, opHandle)
return nil, ctx.Err()
case <-time.After(fetchResultsPollInterval):
// keep polling at a fixed interval
}
continue
}
return res, nil
}
}
// QueryContext submits a statement and fetches its result rows.
func (c *flinkConn) QueryContext(ctx context.Context, query string, args []driver.NamedValue) (driver.Rows, error) {
// todo: handle args
opHandle, err := c.client.ExecuteStatement(ctx, c.sessionHandle, &ExecuteStatementRequest{
Statement: query,
})
if err != nil {
return nil, fmt.Errorf("flink: QueryContext failed to submit statement: %w", err)
}
res, err := c.fetchUntilResultsReady(ctx, opHandle)
if err != nil {
c.cancelOperation(ctx, opHandle)
return nil, fmt.Errorf("flink: FetchResults failed: %w", err)
}
if !res.IsQueryResult && res.ResultKind != ResultKindSuccessWithContent {
c.closeOperation(ctx, opHandle)
return nil, fmt.Errorf("flink: statement [%s] is not a query", query)
}
return newRows(ctx, c, opHandle, res.Results.Data, res.Results.Columns, res.NextToken())
}