Skip to content

Commit

Permalink
feat(warehouse): batching of alter add statements (#2484)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 31, 2022
1 parent b6ce9b9 commit 37d32f1
Show file tree
Hide file tree
Showing 22 changed files with 460 additions and 306 deletions.
49 changes: 38 additions & 11 deletions warehouse/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,14 +632,6 @@ func (as *HandleT) createTable(name string, columns map[string]string) (err erro
return
}

func (as *HandleT) addColumn(tableName, columnName, columnType string) (err error) {
sqlStatement := fmt.Sprintf(`IF NOT EXISTS (SELECT 1 FROM SYS.COLUMNS WHERE OBJECT_ID = OBJECT_ID(N'%[1]s') AND name = '%[2]s')
ALTER TABLE %[1]s ADD %[2]s %[3]s`, tableName, columnName, rudderDataTypesMapToMssql[columnType])
pkgLogger.Infof("AZ: Adding column in synapse for AZ:%s : %v", as.Warehouse.Destination.ID, sqlStatement)
_, err = as.Db.Exec(sqlStatement)
return
}

func (as *HandleT) CreateTable(tableName string, columnMap map[string]string) (err error) {
// Search paths doesn't exist unlike Postgres, default is dbo. Hence, use namespace wherever possible
err = as.createTable(as.Namespace+"."+tableName, columnMap)
Expand All @@ -653,9 +645,44 @@ func (as *HandleT) DropTable(tableName string) (err error) {
return
}

func (as *HandleT) AddColumn(tableName, columnName, columnType string) (err error) {
err = as.addColumn(as.Namespace+"."+tableName, columnName, columnType)
return err
func (as *HandleT) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
var query string
if len(columnsInfo) == 1 {
query += fmt.Sprintf(`
IF NOT EXISTS (
SELECT
1
FROM
SYS.COLUMNS
WHERE
OBJECT_ID = OBJECT_ID(N'%[1]s.%[2]s')
AND name = '%[3]s'
)
`,
as.Namespace,
tableName,
columnsInfo[0].Name,
)
}

query += fmt.Sprintf(`
ALTER TABLE
%s.%s
ADD`,
as.Namespace,
tableName,
)

for _, columnInfo := range columnsInfo {
query += fmt.Sprintf(` %s %s,`, columnInfo.Name, rudderDataTypesMapToMssql[columnInfo.Type])
}

query = strings.TrimSuffix(query, ",")
query += ";"

pkgLogger.Infof("AZ: Adding columns for destinationID: %s, tableName: %s with query: %v", as.Warehouse.Destination.ID, tableName, query)
_, err = as.Db.Exec(query)
return
}

func (*HandleT) AlterColumn(_, _, _ string) (err error) {
Expand Down
13 changes: 13 additions & 0 deletions warehouse/azure-synapse/azure_synapse_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package azuresynapse_test

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestAzureSynapse(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "AzureSynapse Suite")
}
50 changes: 27 additions & 23 deletions warehouse/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,23 +165,6 @@ func (bq *HandleT) createTableView(tableName string, columnMap map[string]string
return
}

func (bq *HandleT) addColumn(tableName, columnName, columnType string) (err error) {
pkgLogger.Infof("BQ: Adding columns in table %s in bigquery dataset: %s in project: %s", tableName, bq.namespace, bq.projectID)
tableRef := bq.db.Dataset(bq.namespace).Table(tableName)
meta, err := tableRef.Metadata(bq.backgroundContext)
if err != nil {
return err
}
newSchema := append(meta.Schema,
&bigquery.FieldSchema{Name: columnName, Type: dataTypesMap[columnType]},
)
update := bigquery.TableMetadataToUpdate{
Schema: newSchema,
}
_, err = tableRef.Update(bq.backgroundContext, update, meta.ETag)
return
}

func (bq *HandleT) schemaExists(_, _ string) (exists bool, err error) {
ds := bq.db.Dataset(bq.namespace)
_, err = ds.Metadata(bq.backgroundContext)
Expand Down Expand Up @@ -826,15 +809,36 @@ func (bq *HandleT) LoadTable(tableName string) error {
return err
}

func (bq *HandleT) AddColumn(tableName, columnName, columnType string) (err error) {
err = bq.addColumn(tableName, columnName, columnType)
func (bq *HandleT) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
pkgLogger.Infof("BQ: Adding columns for destinationID: %s, tableName: %s, dataset: %s, project: %s", bq.warehouse.Destination.ID, tableName, bq.namespace, bq.projectID)
tableRef := bq.db.Dataset(bq.namespace).Table(tableName)
meta, err := tableRef.Metadata(bq.backgroundContext)
if err != nil {
if checkAndIgnoreAlreadyExistError(err) {
pkgLogger.Infof("BQ: Column %s already exists on %s.%s \nResponse: %v", columnName, bq.namespace, tableName, err)
err = nil
return
}

newSchema := meta.Schema
for _, columnInfo := range columnsInfo {
newSchema = append(newSchema,
&bigquery.FieldSchema{Name: columnInfo.Name, Type: dataTypesMap[columnInfo.Type]},
)
}

tableMetadataToUpdate := bigquery.TableMetadataToUpdate{
Schema: newSchema,
}
_, err = tableRef.Update(bq.backgroundContext, tableMetadataToUpdate, meta.ETag)

// Handle error in case of single column
if len(columnsInfo) == 1 {
if err != nil {
if checkAndIgnoreAlreadyExistError(err) {
pkgLogger.Infof("BQ: Column %s already exists on %s.%s \nResponse: %v", columnsInfo[0].Name, bq.namespace, tableName, err)
err = nil
}
}
}
return err
return
}

func (*HandleT) AlterColumn(_, _, _ string) (err error) {
Expand Down
34 changes: 27 additions & 7 deletions warehouse/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,16 +843,36 @@ func (ch *HandleT) DropTable(tableName string) (err error) {
return
}

// AddColumn adds column:columnName with dataType columnType to the tableName
func (ch *HandleT) AddColumn(tableName, columnName, columnType string) (err error) {
cluster := warehouseutils.GetConfigValue(Cluster, ch.Warehouse)
clusterClause := ""
func (ch *HandleT) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
var (
query string
cluster string
clusterClause string
)

cluster = warehouseutils.GetConfigValue(Cluster, ch.Warehouse)
if len(strings.TrimSpace(cluster)) > 0 {
clusterClause = fmt.Sprintf(`ON CLUSTER %q`, cluster)
}
sqlStatement := fmt.Sprintf(`ALTER TABLE %q.%q %s ADD COLUMN IF NOT EXISTS %q %s`, ch.Namespace, tableName, clusterClause, columnName, getClickHouseColumnTypeForSpecificTable(tableName, columnName, rudderDataTypesMapToClickHouse[columnType], false))
pkgLogger.Infof("CH: Adding column in clickhouse for ch:%s : %v", ch.Warehouse.Destination.ID, sqlStatement)
_, err = ch.Db.Exec(sqlStatement)

query = fmt.Sprintf(`
ALTER TABLE
%q.%q %s`,
ch.Namespace,
tableName,
clusterClause,
)

for _, columnInfo := range columnsInfo {
columnType := getClickHouseColumnTypeForSpecificTable(tableName, columnInfo.Name, rudderDataTypesMapToClickHouse[columnInfo.Type], false)
query += fmt.Sprintf(` ADD COLUMN IF NOT EXISTS %q %s,`, columnInfo.Name, columnType)
}

query = strings.TrimSuffix(query, ",")
query += ";"

pkgLogger.Infof("CH: Adding columns for destinationID: %s, tableName: %s with query: %v", ch.Warehouse.Destination.ID, tableName, query)
_, err = ch.Db.Exec(query)
return
}

Expand Down
4 changes: 2 additions & 2 deletions warehouse/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ func (ic *IndexConstraintT) violates(brEvent *BatchRouterEventT, columnName stri
if !ok {
continue
}
if columnInfo.ColumnType == "string" {
columnVal, ok := columnInfo.ColumnVal.(string)
if columnInfo.Type == "string" {
columnVal, ok := columnInfo.Value.(string)
if !ok {
continue
}
Expand Down
4 changes: 2 additions & 2 deletions warehouse/datalake/datalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (*HandleT) DropTable(_ string) (err error) {
return fmt.Errorf("datalake err :not implemented")
}

func (wh *HandleT) AddColumn(tableName, columnName, columnType string) (err error) {
return wh.SchemaRepository.AddColumn(tableName, columnName, columnType)
func (wh *HandleT) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
return wh.SchemaRepository.AddColumns(tableName, columnsInfo)
}

func (wh *HandleT) AlterColumn(tableName, columnName, columnType string) (err error) {
Expand Down
10 changes: 6 additions & 4 deletions warehouse/datalake/schema-repository/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (gl *GlueSchemaRepository) CreateTable(tableName string, columnMap map[stri
return
}

func (gl *GlueSchemaRepository) AddColumn(tableName, columnName, columnType string) (err error) {
func (gl *GlueSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
updateTableInput := glue.UpdateTableInput{
DatabaseName: aws.String(gl.Namespace),
TableInput: &glue.TableInput{
Expand All @@ -148,8 +148,10 @@ func (gl *GlueSchemaRepository) AddColumn(tableName, columnName, columnType stri
return fmt.Errorf("table %s not found in schema", tableName)
}

// add new column to tableSchema
tableSchema[columnName] = columnType
// add new columns to table schema
for _, columnInfo := range columnsInfo {
tableSchema[columnInfo.Name] = columnInfo.Type
}

// add storage descriptor to update table request
updateTableInput.TableInput.StorageDescriptor = gl.getStorageDescriptor(tableName, tableSchema)
Expand All @@ -160,7 +162,7 @@ func (gl *GlueSchemaRepository) AddColumn(tableName, columnName, columnType stri
}

func (gl *GlueSchemaRepository) AlterColumn(tableName, columnName, columnType string) (err error) {
return gl.AddColumn(tableName, columnName, columnType)
return gl.AddColumns(tableName, []warehouseutils.ColumnInfo{{Name: columnName, Type: columnType}})
}

func getGlueClient(wh warehouseutils.Warehouse) (*glue.Glue, error) {
Expand Down
6 changes: 4 additions & 2 deletions warehouse/datalake/schema-repository/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (ls *LocalSchemaRepository) CreateTable(tableName string, columnMap map[str
return ls.uploader.UpdateLocalSchema(schema)
}

func (ls *LocalSchemaRepository) AddColumn(tableName, columnName, columnType string) (err error) {
func (ls *LocalSchemaRepository) AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) {
// fetch schema from local db
schema, err := ls.FetchSchema(ls.warehouse)
if err != nil {
Expand All @@ -62,7 +62,9 @@ func (ls *LocalSchemaRepository) AddColumn(tableName, columnName, columnType str
return fmt.Errorf("failed to add column: table %s does not exist", tableName)
}

schema[tableName][columnName] = columnType
for _, columnInfo := range columnsInfo {
schema[tableName][columnInfo.Name] = columnInfo.Type
}

// update schema
return ls.uploader.UpdateLocalSchema(schema)
Expand Down
2 changes: 1 addition & 1 deletion warehouse/datalake/schema-repository/schema-repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type SchemaRepository interface {
FetchSchema(warehouse warehouseutils.Warehouse) (warehouseutils.SchemaT, error)
CreateSchema() (err error)
CreateTable(tableName string, columnMap map[string]string) (err error)
AddColumn(tableName, columnName, columnType string) (err error)
AddColumns(tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error)
AlterColumn(tableName, columnName, columnType string) (err error)
}

Expand Down
Loading

0 comments on commit 37d32f1

Please sign in to comment.