Skip to content

Commit

Permalink
feat(yugabytedb): configure retries
Browse files Browse the repository at this point in the history
  • Loading branch information
morigs committed Aug 24, 2022
1 parent bcd0d88 commit 76d0305
Showing 1 changed file with 99 additions and 52 deletions.
151 changes: 99 additions & 52 deletions database/yugabytedb/yugabytedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,50 @@ import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"io/ioutil"
"net/url"
"regexp"
"strconv"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/hashicorp/go-multierror"
"github.com/lib/pq"
"go.uber.org/atomic"
)

func init() {
db := YugabyteDB{}
database.Register("yugabyte", &db)
database.Register("yugabytedb", &db)
database.Register("ysql", &db)
}

var DefaultMigrationsTable = "migrations"
var DefaultLockTable = "migrations_locks"
const (
DefaultRetryMaxInterval = time.Second * 15
DefaultRetryMaxElapsedTime = time.Second * 30
DefaultRetryMaxRetries = 10
DefaultMigrationsTable = "migrations"
DefaultLockTable = "migrations_locks"
)

var (
ErrNilConfig = errors.New("no config")
ErrNoDatabaseName = errors.New("no database name")
ErrMaxRetriesExceeded = errors.New("max retries exceeded")
)

func init() {
db := YugabyteDB{}
database.Register("yugabyte", &db)
database.Register("yugabytedb", &db)
database.Register("ysql", &db)
}

type Config struct {
MigrationsTable string
LockTable string
ForceLock bool
DatabaseName string
MigrationsTable string
LockTable string
ForceLock bool
DatabaseName string
RetryMaxInterval time.Duration
RetryMaxElapsedTime time.Duration
RetryMaxRetries int
}

type YugabyteDB struct {
Expand Down Expand Up @@ -80,6 +89,18 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
config.LockTable = DefaultLockTable
}

if config.RetryMaxInterval == 0 {
config.RetryMaxInterval = DefaultRetryMaxInterval
}

if config.RetryMaxElapsedTime == 0 {
config.RetryMaxElapsedTime = DefaultRetryMaxElapsedTime
}

if config.RetryMaxRetries == 0 {
config.RetryMaxRetries = DefaultRetryMaxRetries
}

px := &YugabyteDB{
db: instance,
config: config,
Expand Down Expand Up @@ -129,11 +150,32 @@ func (c *YugabyteDB) Open(dbURL string) (database.Driver, error) {
forceLock = false
}

maxIntervalStr := purl.Query().Get("x-retry-max-interval")
maxInterval, err := time.ParseDuration(maxIntervalStr)
if err != nil {
maxInterval = DefaultRetryMaxInterval
}

maxElapsedTimeStr := purl.Query().Get("x-retry-max-elapsed-time")
maxElapsedTime, err := time.ParseDuration(maxElapsedTimeStr)
if err != nil {
maxElapsedTime = DefaultRetryMaxElapsedTime
}

maxRetriesStr := purl.Query().Get("x-retry-max-retries")
maxRetries, err := strconv.Atoi(maxRetriesStr)
if err != nil {
maxRetries = DefaultRetryMaxRetries
}

px, err := WithInstance(db, &Config{
DatabaseName: purl.Path,
MigrationsTable: migrationsTable,
LockTable: lockTable,
ForceLock: forceLock,
DatabaseName: purl.Path,
MigrationsTable: migrationsTable,
LockTable: lockTable,
ForceLock: forceLock,
RetryMaxInterval: maxInterval,
RetryMaxElapsedTime: maxElapsedTime,
RetryMaxRetries: maxRetries,
})
if err != nil {
return nil, err
Expand All @@ -150,7 +192,7 @@ func (c *YugabyteDB) Close() error {
// See: https://github.com/yugabyte/yugabyte-db/issues/3642
func (c *YugabyteDB) Lock() error {
return database.CasRestoreOnErr(&c.isLocked, false, true, database.ErrLocked, func() (err error) {
return ExecuteInTx(context.Background(), c.db, &sql.TxOptions{Isolation: sql.LevelSerializable}, func(tx *sql.Tx) (err error) {
return c.doTxWithRetry(context.Background(), &sql.TxOptions{Isolation: sql.LevelSerializable}, func(tx *sql.Tx) (err error) {
aid, err := database.GenerateAdvisoryLockId(c.config.DatabaseName)
if err != nil {
return err
Expand Down Expand Up @@ -228,7 +270,7 @@ func (c *YugabyteDB) Run(migration io.Reader) error {
}

func (c *YugabyteDB) SetVersion(version int, dirty bool) error {
return ExecuteInTx(context.Background(), c.db, &sql.TxOptions{Isolation: sql.LevelSerializable}, func(tx *sql.Tx) error {
return c.doTxWithRetry(context.Background(), &sql.TxOptions{Isolation: sql.LevelSerializable}, func(tx *sql.Tx) error {
if _, err := tx.Exec(`DELETE FROM "` + c.config.MigrationsTable + `"`); err != nil {
return err
}
Expand Down Expand Up @@ -363,50 +405,55 @@ func (c *YugabyteDB) ensureLockTable() error {
return nil
}

func ExecuteInTx(ctx context.Context, db *sql.DB, opts *sql.TxOptions, fn func(tx *sql.Tx) error) (err error) {
// It looks like currently, YugabyteDB doesn't return an error on savepoint release.
// So, we have to retry the whole transaction.
// TODO: do more research on savepoints support, see https://github.com/yugabyte/yugabyte-db/issues/9219
func (c *YugabyteDB) doTxWithRetry(
ctx context.Context,
txOpts *sql.TxOptions,
fn func(tx *sql.Tx) error,
) error {
backOff := c.newBackoff(ctx)

const maxRetries = 50
retryCount := 0

for {
if retryCount-1 >= maxRetries {
return fmt.Errorf("%w: reached %d attempts", ErrMaxRetriesExceeded, retryCount-1)
}

tx, err := db.BeginTx(ctx, opts)
return backoff.Retry(func() error {
tx, err := c.db.BeginTx(ctx, txOpts)
if err != nil {
return err
return backoff.Permanent(err)
}

if err := fn(tx); err != nil {
_ = tx.Rollback()

if !errIsRetryable(err) {
return err
}
defer tx.Rollback()

retryCount++
if err := fn(tx); err != nil && !errIsRetryable(err) {
return backoff.Permanent(err)
} else if err != nil {
return err
}

continue
if err := tx.Commit(); err != nil && !errIsRetryable(err) {
return backoff.Permanent(err)
} else if err != nil {
return err
}

if err := tx.Commit(); err != nil {
_ = tx.Rollback()
return nil
}, backOff)
}

if !errIsRetryable(err) {
return err
}
func (c *YugabyteDB) newBackoff(ctx context.Context) backoff.BackOff {
if ctx == nil {
ctx = context.Background()
}

retryCount++
retrier := backoff.WithMaxRetries(backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: backoff.DefaultInitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: c.config.RetryMaxInterval,
MaxElapsedTime: c.config.RetryMaxElapsedTime,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx), uint64(c.config.RetryMaxRetries))

continue
}
retrier.Reset()

return nil
}
return retrier
}

func errIsRetryable(err error) bool {
Expand Down

0 comments on commit 76d0305

Please sign in to comment.