Skip to content

Commit

Permalink
chore: fixing a flaky rsources test (#2680)
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 17, 2022
1 parent 87f57f8 commit ad2626f
Show file tree
Hide file tree
Showing 4 changed files with 188 additions and 85 deletions.
76 changes: 36 additions & 40 deletions services/rsources/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,41 +258,39 @@ func (sh *sourcesHandler) init() error {
return time.After(config.GetDuration("Rsources.stats.cleanup.interval", 1, time.Hour))
}
}
err := setupTables(ctx, sh.localDB, sh.config.LocalHostname)
if err != nil {
sh.log.Debugf("setting up rsources tables in %s", sh.config.LocalHostname)
if err := setupTables(ctx, sh.localDB, sh.config.LocalHostname, sh.log); err != nil {
return err
}
sh.log.Debugf("rsources tables setup successfully in %s", sh.config.LocalHostname)
if sh.sharedDB != nil {
err = setupTables(ctx, sh.sharedDB, "shared")
if err != nil {
sh.log.Debugf("setting up rsources tables for shared db %s", sh.config.SharedConn)
if err := setupTables(ctx, sh.sharedDB, "shared", sh.log); err != nil {
return err
}
err = sh.setupLogicalReplication(ctx)
if err != nil {
return fmt.Errorf("failed to setup logical replication: %w", err)
sh.log.Debugf("rsources tables for shared db %s setup successfully", sh.config.SharedConn)

sh.log.Debugf("setting up rsources logical replication in %s", sh.config.LocalHostname)
if err := sh.setupLogicalReplication(ctx); err != nil {
return fmt.Errorf("failed to setup rsources logical replication in %s: %w", sh.config.LocalHostname, err)
}
sh.log.Debugf("rsources logical replication setup successfully in %s", sh.config.LocalHostname)
}
return nil
}

func setupTables(ctx context.Context, db *sql.DB, defaultDbName string) error {
err := setupFailedKeysTable(ctx, db, defaultDbName)
if err != nil {
return fmt.Errorf("failed to setup %s failed keys table: %w", defaultDbName, err)
func setupTables(ctx context.Context, db *sql.DB, defaultDbName string, log logger.Logger) error {
if err := setupFailedKeysTable(ctx, db, defaultDbName, log); err != nil {
return fmt.Errorf("failed to setup %s rsources failed keys table: %w", defaultDbName, err)
}
err = setupStatsTable(ctx, db, defaultDbName)
if err != nil {
return fmt.Errorf("failed to setup %s stats table: %w", defaultDbName, err)
if err := setupStatsTable(ctx, db, defaultDbName, log); err != nil {
return fmt.Errorf("failed to setup %s rsources stats table: %w", defaultDbName, err)
}
return nil
}

func setupFailedKeysTable(ctx context.Context, db *sql.DB, defaultDbName string) error {
tx, err := db.Begin()
if err != nil {
return err
}
sqlStatement := fmt.Sprintf(`create table if not exists "rsources_failed_keys" (
func setupFailedKeysTable(ctx context.Context, db *sql.DB, defaultDbName string, log logger.Logger) error {
sqlStatement := fmt.Sprintf(`create table "rsources_failed_keys" (
id BIGSERIAL,
db_name text not null default '%s',
job_run_id text not null,
Expand All @@ -303,25 +301,22 @@ func setupFailedKeysTable(ctx context.Context, db *sql.DB, defaultDbName string)
ts timestamp not null default NOW(),
primary key (job_run_id, task_run_id, source_id, destination_id, db_name, id)
)`, defaultDbName)
_, err = tx.ExecContext(ctx, sqlStatement)
_, err := db.ExecContext(ctx, sqlStatement)
if err != nil {
_ = tx.Rollback()
return err
if pqError, ok := err.(*pq.Error); ok && pqError.Code == "42P07" {
log.Debugf("table rsources_stats already exists in %s", defaultDbName)
} else {
return err
}
}
_, err = tx.ExecContext(ctx, `create index if not exists rsources_failed_keys_job_run_id_idx on "rsources_failed_keys" (job_run_id)`)
if err != nil {
_ = tx.Rollback()
if _, err := db.ExecContext(ctx, `create index if not exists rsources_failed_keys_job_run_id_idx on "rsources_failed_keys" (job_run_id)`); err != nil {
return err
}
return tx.Commit()
return nil
}

func setupStatsTable(ctx context.Context, db *sql.DB, defaultDbName string) error {
tx, err := db.Begin()
if err != nil {
return err
}
sqlStatement := fmt.Sprintf(`create table if not exists "rsources_stats" (
func setupStatsTable(ctx context.Context, db *sql.DB, defaultDbName string, log logger.Logger) error {
sqlStatement := fmt.Sprintf(`create table "rsources_stats" (
db_name text not null default '%s',
job_run_id text not null,
task_run_id text not null,
Expand All @@ -333,17 +328,18 @@ func setupStatsTable(ctx context.Context, db *sql.DB, defaultDbName string) erro
ts timestamp not null default NOW(),
primary key (db_name, job_run_id, task_run_id, source_id, destination_id)
)`, defaultDbName)
_, err = tx.ExecContext(ctx, sqlStatement)
_, err := db.ExecContext(ctx, sqlStatement)
if err != nil {
_ = tx.Rollback()
return err
if pqError, ok := err.(*pq.Error); ok && pqError.Code == "42P07" {
log.Debugf("table rsources_stats already exists in db %s", defaultDbName)
} else {
return err
}
}
_, err = tx.ExecContext(ctx, `create index if not exists rsources_stats_job_run_id_idx on "rsources_stats" (job_run_id)`)
if err != nil {
_ = tx.Rollback()
if _, err := db.ExecContext(ctx, `create index if not exists rsources_stats_job_run_id_idx on "rsources_stats" (job_run_id)`); err != nil {
return err
}
return tx.Commit()
return nil
}

func (sh *sourcesHandler) setupLogicalReplication(ctx context.Context) error {
Expand Down
Loading

0 comments on commit ad2626f

Please sign in to comment.