Skip to content

Commit

Permalink
Add wait before running
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv committed Sep 17, 2024
1 parent 86513fc commit ef67e9a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 13 deletions.
33 changes: 31 additions & 2 deletions cmd/signozschemamigrator/schema_migrator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,17 +784,20 @@ func (m *MigrationManager) MigrateDownAsync(ctx context.Context, downVersions []
}

func (m *MigrationManager) insertMigrationEntry(ctx context.Context, db string, migrationID uint64, status string) error {
query := fmt.Sprintf("INSERT INTO %s.distributed_schema_migrations_v2 (migration_id, status, created_at) VALUES (%d, '%s', now())", db, migrationID, status)
query := fmt.Sprintf("INSERT INTO %s.distributed_schema_migrations_v2 (migration_id, status, created_at) VALUES (%d, '%s', '%s')", db, migrationID, status, time.Now().UTC().Format("2006-01-02 15:04:05"))
m.logger.Info("Inserting migration entry", zap.String("query", query))
return m.conn.Exec(ctx, query)
}

func (m *MigrationManager) updateMigrationEntry(ctx context.Context, db string, migrationID uint64, status string, error string) error {
query := fmt.Sprintf("ALTER TABLE %s.schema_migrations_v2 ON CLUSTER %s UPDATE status = '%s', error = '%s', updated_at = now() WHERE migration_id = %d", db, m.clusterName, status, error, migrationID)
query := fmt.Sprintf("ALTER TABLE %s.schema_migrations_v2 ON CLUSTER %s UPDATE status = '%s', error = '%s', updated_at = '%s' WHERE migration_id = %d", db, m.clusterName, status, error, time.Now().UTC().Format("2006-01-02 15:04:05"), migrationID)
m.logger.Info("Updating migration entry", zap.String("query", query))
return m.conn.Exec(ctx, query)
}

func (m *MigrationManager) RunOperation(ctx context.Context, operation Operation, migrationID uint64, database string, skipStatusUpdate bool) error {
m.logger.Info("Running operation", zap.Uint64("migration_id", migrationID), zap.String("database", database), zap.Bool("skip_status_update", skipStatusUpdate))
start := time.Now()
var sql string
if m.clusterName != "" {
operation = operation.OnCluster(m.clusterName)
Expand All @@ -803,9 +806,30 @@ func (m *MigrationManager) RunOperation(ctx context.Context, operation Operation
operation = operation.WithReplication()
}

m.logger.Info("Waiting for running mutations before running the operation")

if err := m.WaitForRunningMutations(ctx); err != nil {
updateErr := m.updateMigrationEntry(ctx, database, migrationID, failedStatus, err.Error())
if updateErr != nil {
return errors.Join(err, updateErr)
}
return err
}
if err := m.WaitDistributedDDLQueue(ctx); err != nil {
updateErr := m.updateMigrationEntry(ctx, database, migrationID, failedStatus, err.Error())
if updateErr != nil {
return errors.Join(err, updateErr)
}
return err
}

if shouldWaitForDistributionQueue, database, table := operation.ShouldWaitForDistributionQueue(); shouldWaitForDistributionQueue {
m.logger.Info("Waiting for distribution queue", zap.String("database", database), zap.String("table", table))
if err := m.WaitForDistributionQueue(ctx, database, table); err != nil {
updateErr := m.updateMigrationEntry(ctx, database, migrationID, failedStatus, err.Error())
if updateErr != nil {
return errors.Join(err, updateErr)
}
return err
}
}
Expand All @@ -827,6 +851,9 @@ func (m *MigrationManager) RunOperation(ctx context.Context, operation Operation
}
return err
}

m.logger.Info("Waiting for running mutations after running the operation")

if err := m.WaitForRunningMutations(ctx); err != nil {
updateErr := m.updateMigrationEntry(ctx, database, migrationID, failedStatus, err.Error())
if updateErr != nil {
Expand All @@ -847,6 +874,8 @@ func (m *MigrationManager) RunOperation(ctx context.Context, operation Operation
return updateErr
}
}
duration := time.Since(start)
m.logger.Info("Operation completed", zap.Uint64("migration_id", migrationID), zap.String("database", database), zap.Duration("duration", duration))

return nil
}
Expand Down
12 changes: 1 addition & 11 deletions cmd/signozschemamigrator/schema_migrator/traces_migrations.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,3 @@
package schemamigrator

var TracesMigrations = []SchemaMigrationRecord{
{
MigrationID: 1000,
UpItems: []Operation{
AlterTableDropIndex{Database: "signoz_traces", Table: "signoz_index_v2", Index: Index{Name: "idx_resourceTagsMapKeys"}},
},
DownItems: []Operation{
AlterTableAddIndex{Database: "signoz_traces", Table: "signoz_index_v2", Index: Index{Name: "idx_resourceTagsMapKeys", Expression: "mapKeys(resourceTagsMap)", Type: "bloom_filter(0.01)", Granularity: 64}},
},
},
}
var TracesMigrations = []SchemaMigrationRecord{}

0 comments on commit ef67e9a

Please sign in to comment.