diff --git a/cmd/signozschemamigrator/schema_migrator/manager.go b/cmd/signozschemamigrator/schema_migrator/manager.go index cf3fcb5f..d3c879e2 100644 --- a/cmd/signozschemamigrator/schema_migrator/manager.go +++ b/cmd/signozschemamigrator/schema_migrator/manager.go @@ -784,17 +784,20 @@ func (m *MigrationManager) MigrateDownAsync(ctx context.Context, downVersions [] } func (m *MigrationManager) insertMigrationEntry(ctx context.Context, db string, migrationID uint64, status string) error { - query := fmt.Sprintf("INSERT INTO %s.distributed_schema_migrations_v2 (migration_id, status, created_at) VALUES (%d, '%s', now())", db, migrationID, status) + query := fmt.Sprintf("INSERT INTO %s.distributed_schema_migrations_v2 (migration_id, status, created_at) VALUES (%d, '%s', '%s')", db, migrationID, status, time.Now().UTC().Format("2006-01-02 15:04:05")) + m.logger.Info("Inserting migration entry", zap.String("query", query)) return m.conn.Exec(ctx, query) } func (m *MigrationManager) updateMigrationEntry(ctx context.Context, db string, migrationID uint64, status string, error string) error { - query := fmt.Sprintf("ALTER TABLE %s.schema_migrations_v2 ON CLUSTER %s UPDATE status = '%s', error = '%s', updated_at = now() WHERE migration_id = %d", db, m.clusterName, status, error, migrationID) + query := fmt.Sprintf("ALTER TABLE %s.schema_migrations_v2 ON CLUSTER %s UPDATE status = '%s', error = '%s', updated_at = '%s' WHERE migration_id = %d", db, m.clusterName, status, error, time.Now().UTC().Format("2006-01-02 15:04:05"), migrationID) + m.logger.Info("Updating migration entry", zap.String("query", query)) return m.conn.Exec(ctx, query) } func (m *MigrationManager) RunOperation(ctx context.Context, operation Operation, migrationID uint64, database string, skipStatusUpdate bool) error { m.logger.Info("Running operation", zap.Uint64("migration_id", migrationID), zap.String("database", database), zap.Bool("skip_status_update", skipStatusUpdate)) + start := time.Now() var sql string if m.clusterName != "" { operation = operation.OnCluster(m.clusterName) @@ -803,9 +806,30 @@ func (m *MigrationManager) RunOperation(ctx context.Context, operation Operation operation = operation.WithReplication() } + m.logger.Info("Waiting for running mutations before running the operation") + + if err := m.WaitForRunningMutations(ctx); err != nil { + updateErr := m.updateMigrationEntry(ctx, database, migrationID, failedStatus, err.Error()) + if updateErr != nil { + return errors.Join(err, updateErr) + } + return err + } + if err := m.WaitDistributedDDLQueue(ctx); err != nil { + updateErr := m.updateMigrationEntry(ctx, database, migrationID, failedStatus, err.Error()) + if updateErr != nil { + return errors.Join(err, updateErr) + } + return err + } + if shouldWaitForDistributionQueue, database, table := operation.ShouldWaitForDistributionQueue(); shouldWaitForDistributionQueue { m.logger.Info("Waiting for distribution queue", zap.String("database", database), zap.String("table", table)) if err := m.WaitForDistributionQueue(ctx, database, table); err != nil { + updateErr := m.updateMigrationEntry(ctx, database, migrationID, failedStatus, err.Error()) + if updateErr != nil { + return errors.Join(err, updateErr) + } return err } } @@ -827,6 +851,9 @@ func (m *MigrationManager) RunOperation(ctx context.Context, operation Operation } return err } + + m.logger.Info("Waiting for running mutations after running the operation") + if err := m.WaitForRunningMutations(ctx); err != nil { updateErr := m.updateMigrationEntry(ctx, database, migrationID, failedStatus, err.Error()) if updateErr != nil { @@ -847,6 +874,8 @@ func (m *MigrationManager) RunOperation(ctx context.Context, operation Operation return updateErr } } + duration := time.Since(start) + m.logger.Info("Operation completed", zap.Uint64("migration_id", migrationID), zap.String("database", database), zap.Duration("duration", duration)) return nil } diff --git a/cmd/signozschemamigrator/schema_migrator/traces_migrations.go b/cmd/signozschemamigrator/schema_migrator/traces_migrations.go index 0b551b12..c50249ae 100644 --- a/cmd/signozschemamigrator/schema_migrator/traces_migrations.go +++ b/cmd/signozschemamigrator/schema_migrator/traces_migrations.go @@ -1,13 +1,3 @@ package schemamigrator -var TracesMigrations = []SchemaMigrationRecord{ - { - MigrationID: 1000, - UpItems: []Operation{ - AlterTableDropIndex{Database: "signoz_traces", Table: "signoz_index_v2", Index: Index{Name: "idx_resourceTagsMapKeys"}}, - }, - DownItems: []Operation{ - AlterTableAddIndex{Database: "signoz_traces", Table: "signoz_index_v2", Index: Index{Name: "idx_resourceTagsMapKeys", Expression: "mapKeys(resourceTagsMap)", Type: "bloom_filter(0.01)", Granularity: 64}}, - }, - }, -} +var TracesMigrations = []SchemaMigrationRecord{}