feat: support ctx in sqlx/sqlc, listed in ROADMAP (#1535)

* feat: support ctx in sqlx/sqlc

* chore: update roadmap

* fix: context.Canceled should be acceptable

* use %w to wrap errors

* chore: remove unused vars
This commit is contained in:
Kevin Wan
2022-02-16 19:31:43 +08:00
committed by GitHub
parent 7c63676be4
commit 607bae27fa
12 changed files with 458 additions and 152 deletions

View File

@@ -1,6 +1,7 @@
package sqlx
import (
"context"
"database/sql"
"github.com/zeromicro/go-zero/core/breaker"
@@ -14,11 +15,17 @@ type (
// Session stands for raw connections or transaction sessions
Session interface {
Exec(query string, args ...interface{}) (sql.Result, error)
ExecCtx(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
Prepare(query string) (StmtSession, error)
PrepareCtx(ctx context.Context, query string) (StmtSession, error)
QueryRow(v interface{}, query string, args ...interface{}) error
QueryRowCtx(ctx context.Context, v interface{}, query string, args ...interface{}) error
QueryRowPartial(v interface{}, query string, args ...interface{}) error
QueryRowPartialCtx(ctx context.Context, v interface{}, query string, args ...interface{}) error
QueryRows(v interface{}, query string, args ...interface{}) error
QueryRowsCtx(ctx context.Context, v interface{}, query string, args ...interface{}) error
QueryRowsPartial(v interface{}, query string, args ...interface{}) error
QueryRowsPartialCtx(ctx context.Context, v interface{}, query string, args ...interface{}) error
}
// SqlConn only stands for raw connections, so Transact method can be called.
@@ -27,7 +34,8 @@ type (
// RawDB is for other ORM to operate with, use it with caution.
// Notice: don't close it.
RawDB() (*sql.DB, error)
Transact(func(session Session) error) error
Transact(fn func(Session) error) error
TransactCtx(ctx context.Context, fn func(context.Context, Session) error) error
}
// SqlOption defines the method to customize a sql connection.
@@ -37,10 +45,15 @@ type (
StmtSession interface {
Close() error
Exec(args ...interface{}) (sql.Result, error)
ExecCtx(ctx context.Context, args ...interface{}) (sql.Result, error)
QueryRow(v interface{}, args ...interface{}) error
QueryRowCtx(ctx context.Context, v interface{}, args ...interface{}) error
QueryRowPartial(v interface{}, args ...interface{}) error
QueryRowPartialCtx(ctx context.Context, v interface{}, args ...interface{}) error
QueryRows(v interface{}, args ...interface{}) error
QueryRowsCtx(ctx context.Context, v interface{}, args ...interface{}) error
QueryRowsPartial(v interface{}, args ...interface{}) error
QueryRowsPartialCtx(ctx context.Context, v interface{}, args ...interface{}) error
}
// thread-safe
@@ -58,7 +71,9 @@ type (
sessionConn interface {
Exec(query string, args ...interface{}) (sql.Result, error)
ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error)
Query(query string, args ...interface{}) (*sql.Rows, error)
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
}
statement struct {
@@ -68,7 +83,9 @@ type (
stmtConn interface {
Exec(args ...interface{}) (sql.Result, error)
ExecContext(ctx context.Context, args ...interface{}) (sql.Result, error)
Query(args ...interface{}) (*sql.Rows, error)
QueryContext(ctx context.Context, args ...interface{}) (*sql.Rows, error)
}
)
@@ -112,6 +129,11 @@ func NewSqlConnFromDB(db *sql.DB, opts ...SqlOption) SqlConn {
}
func (db *commonSqlConn) Exec(q string, args ...interface{}) (result sql.Result, err error) {
return db.ExecCtx(context.Background(), q, args...)
}
func (db *commonSqlConn) ExecCtx(ctx context.Context, q string, args ...interface{}) (
result sql.Result, err error) {
err = db.brk.DoWithAcceptable(func() error {
var conn *sql.DB
conn, err = db.connProv()
@@ -120,7 +142,7 @@ func (db *commonSqlConn) Exec(q string, args ...interface{}) (result sql.Result,
return err
}
result, err = exec(conn, q, args...)
result, err = exec(ctx, conn, q, args...)
return err
}, db.acceptable)
@@ -128,6 +150,10 @@ func (db *commonSqlConn) Exec(q string, args ...interface{}) (result sql.Result,
}
func (db *commonSqlConn) Prepare(query string) (stmt StmtSession, err error) {
return db.PrepareCtx(context.Background(), query)
}
func (db *commonSqlConn) PrepareCtx(ctx context.Context, query string) (stmt StmtSession, err error) {
err = db.brk.DoWithAcceptable(func() error {
var conn *sql.DB
conn, err = db.connProv()
@@ -136,7 +162,7 @@ func (db *commonSqlConn) Prepare(query string) (stmt StmtSession, err error) {
return err
}
st, err := conn.Prepare(query)
st, err := conn.PrepareContext(ctx, query)
if err != nil {
return err
}
@@ -152,25 +178,45 @@ func (db *commonSqlConn) Prepare(query string) (stmt StmtSession, err error) {
}
func (db *commonSqlConn) QueryRow(v interface{}, q string, args ...interface{}) error {
return db.queryRows(func(rows *sql.Rows) error {
return db.QueryRowCtx(context.Background(), v, q, args...)
}
func (db *commonSqlConn) QueryRowCtx(ctx context.Context, v interface{}, q string,
args ...interface{}) error {
return db.queryRows(ctx, func(rows *sql.Rows) error {
return unmarshalRow(v, rows, true)
}, q, args...)
}
func (db *commonSqlConn) QueryRowPartial(v interface{}, q string, args ...interface{}) error {
return db.queryRows(func(rows *sql.Rows) error {
return db.QueryRowPartialCtx(context.Background(), v, q, args...)
}
func (db *commonSqlConn) QueryRowPartialCtx(ctx context.Context, v interface{},
q string, args ...interface{}) error {
return db.queryRows(ctx, func(rows *sql.Rows) error {
return unmarshalRow(v, rows, false)
}, q, args...)
}
func (db *commonSqlConn) QueryRows(v interface{}, q string, args ...interface{}) error {
return db.queryRows(func(rows *sql.Rows) error {
return db.QueryRowsCtx(context.Background(), v, q, args...)
}
func (db *commonSqlConn) QueryRowsCtx(ctx context.Context, v interface{}, q string,
args ...interface{}) error {
return db.queryRows(ctx, func(rows *sql.Rows) error {
return unmarshalRows(v, rows, true)
}, q, args...)
}
func (db *commonSqlConn) QueryRowsPartial(v interface{}, q string, args ...interface{}) error {
return db.queryRows(func(rows *sql.Rows) error {
return db.QueryRowsPartialCtx(context.Background(), v, q, args...)
}
func (db *commonSqlConn) QueryRowsPartialCtx(ctx context.Context, v interface{},
q string, args ...interface{}) error {
return db.queryRows(ctx, func(rows *sql.Rows) error {
return unmarshalRows(v, rows, false)
}, q, args...)
}
@@ -180,13 +226,19 @@ func (db *commonSqlConn) RawDB() (*sql.DB, error) {
}
func (db *commonSqlConn) Transact(fn func(Session) error) error {
return db.TransactCtx(context.Background(), func(_ context.Context, session Session) error {
return fn(session)
})
}
func (db *commonSqlConn) TransactCtx(ctx context.Context, fn func(context.Context, Session) error) error {
return db.brk.DoWithAcceptable(func() error {
return transact(db, db.beginTx, fn)
return transact(ctx, db, db.beginTx, fn)
}, db.acceptable)
}
func (db *commonSqlConn) acceptable(err error) bool {
ok := err == nil || err == sql.ErrNoRows || err == sql.ErrTxDone
ok := err == nil || err == sql.ErrNoRows || err == sql.ErrTxDone || err == context.Canceled
if db.accept == nil {
return ok
}
@@ -194,7 +246,8 @@ func (db *commonSqlConn) acceptable(err error) bool {
return ok || db.accept(err)
}
func (db *commonSqlConn) queryRows(scanner func(*sql.Rows) error, q string, args ...interface{}) error {
func (db *commonSqlConn) queryRows(ctx context.Context, scanner func(*sql.Rows) error,
q string, args ...interface{}) error {
var qerr error
return db.brk.DoWithAcceptable(func() error {
conn, err := db.connProv()
@@ -203,7 +256,7 @@ func (db *commonSqlConn) queryRows(scanner func(*sql.Rows) error, q string, args
return err
}
return query(conn, func(rows *sql.Rows) error {
return query(ctx, conn, func(rows *sql.Rows) error {
qerr = scanner(rows)
return qerr
}, q, args...)
@@ -217,29 +270,49 @@ func (s statement) Close() error {
}
func (s statement) Exec(args ...interface{}) (sql.Result, error) {
return execStmt(s.stmt, s.query, args...)
return s.ExecCtx(context.Background(), args...)
}
func (s statement) ExecCtx(ctx context.Context, args ...interface{}) (sql.Result, error) {
return execStmt(ctx, s.stmt, s.query, args...)
}
func (s statement) QueryRow(v interface{}, args ...interface{}) error {
return queryStmt(s.stmt, func(rows *sql.Rows) error {
return s.QueryRowCtx(context.Background(), v, args...)
}
func (s statement) QueryRowCtx(ctx context.Context, v interface{}, args ...interface{}) error {
return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error {
return unmarshalRow(v, rows, true)
}, s.query, args...)
}
func (s statement) QueryRowPartial(v interface{}, args ...interface{}) error {
return queryStmt(s.stmt, func(rows *sql.Rows) error {
return s.QueryRowPartialCtx(context.Background(), v, args...)
}
func (s statement) QueryRowPartialCtx(ctx context.Context, v interface{}, args ...interface{}) error {
return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error {
return unmarshalRow(v, rows, false)
}, s.query, args...)
}
func (s statement) QueryRows(v interface{}, args ...interface{}) error {
return queryStmt(s.stmt, func(rows *sql.Rows) error {
return s.QueryRowsCtx(context.Background(), v, args...)
}
func (s statement) QueryRowsCtx(ctx context.Context, v interface{}, args ...interface{}) error {
return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error {
return unmarshalRows(v, rows, true)
}, s.query, args...)
}
func (s statement) QueryRowsPartial(v interface{}, args ...interface{}) error {
return queryStmt(s.stmt, func(rows *sql.Rows) error {
return s.QueryRowsPartialCtx(context.Background(), v, args...)
}
func (s statement) QueryRowsPartialCtx(ctx context.Context, v interface{}, args ...interface{}) error {
return queryStmt(ctx, s.stmt, func(rows *sql.Rows) error {
return unmarshalRows(v, rows, false)
}, s.query, args...)
}