Skip to content

Commit

Permalink
fix(warehouse): id resolution index issue (#2676)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr authored and atzoum committed Nov 11, 2022
1 parent 775fc6e commit ff89dfd
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 8 deletions.
18 changes: 16 additions & 2 deletions warehouse/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,17 +396,31 @@ func (bq *HandleT) loadTable(tableName string, _, getLoadFileLocFromTableUploads
primaryJoinClause := strings.Join(primaryKeyList, " AND ")
bqTable := func(name string) string { return fmt.Sprintf("`%s`.`%s`", bq.namespace, name) }

var orderByClause string
if _, ok := tableColMap["received_at"]; ok {
orderByClause = "ORDER BY received_at DESC"
}

sqlStatement := fmt.Sprintf(`MERGE INTO %[1]s AS original
USING (
SELECT * FROM (
SELECT *, row_number() OVER (PARTITION BY %[7]s ORDER BY RECEIVED_AT DESC) AS _rudder_staging_row_number FROM %[2]s
SELECT *, row_number() OVER (PARTITION BY %[7]s %[8]s) AS _rudder_staging_row_number FROM %[2]s
) AS q WHERE _rudder_staging_row_number = 1
) AS staging
ON (%[3]s)
WHEN MATCHED THEN
UPDATE SET %[6]s
WHEN NOT MATCHED THEN
INSERT (%[4]s) VALUES (%[5]s)`, bqTable(tableName), bqTable(stagingTableName), primaryJoinClause, columnNames, stagingColumnNames, columnsWithValues, partitionKey)
INSERT (%[4]s) VALUES (%[5]s)`,
bqTable(tableName),
bqTable(stagingTableName),
primaryJoinClause,
columnNames,
stagingColumnNames,
columnsWithValues,
partitionKey,
orderByClause,
)
pkgLogger.Infof("BQ: Dedup records for table:%s using staging table: %s\n", tableName, sqlStatement)

q := bq.db.Query(sqlStatement)
Expand Down
12 changes: 9 additions & 3 deletions warehouse/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestBigQueryIntegration(t *testing.T) {
testhelper.SendEvents(t, warehouseTest, sendEventsMap)
testhelper.SendIntegratedEvents(t, warehouseTest, sendEventsMap)

testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap())
testhelper.VerifyEventsInStagingFiles(t, warehouseTest, stagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, mergeEventsMap())
Expand All @@ -103,7 +103,7 @@ func TestBigQueryIntegration(t *testing.T) {
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendIntegratedEvents(t, warehouseTest, sendEventsMap)

testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap())
testhelper.VerifyEventsInStagingFiles(t, warehouseTest, stagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, mergeEventsMap())
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestBigQueryIntegration(t *testing.T) {
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)
testhelper.SendModifiedEvents(t, warehouseTest, sendEventsMap)

testhelper.VerifyEventsInStagingFiles(t, warehouseTest, testhelper.StagingFilesEventsMap())
testhelper.VerifyEventsInStagingFiles(t, warehouseTest, stagingFilesEventsMap())
testhelper.VerifyEventsInLoadFiles(t, warehouseTest, loadFilesEventsMap())
testhelper.VerifyEventsInTableUploads(t, warehouseTest, tableUploadsEventsMap())
testhelper.VerifyEventsInWareHouse(t, warehouseTest, appendEventsMap())
Expand Down Expand Up @@ -194,6 +194,12 @@ func tableUploadsEventsMap() testhelper.EventsCountMap {
return eventsMap
}

func stagingFilesEventsMap() testhelper.EventsCountMap {
return testhelper.EventsCountMap{
"wh_staging_files": 34, // Since extra 2 merge events because of ID resolution
}
}

func mergeEventsMap() testhelper.EventsCountMap {
return testhelper.EventsCountMap{
"identifies": 1,
Expand Down
6 changes: 3 additions & 3 deletions warehouse/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (wh *HandleT) setupIdentityTables(warehouse warehouseutils.Warehouse) {
}

sqlStatement = fmt.Sprintf(`
CREATE INDEX IF NOT EXISTS merge_properties_index_ %[1]s ON %[1]s (
CREATE INDEX IF NOT EXISTS merge_properties_index_%[1]s ON %[1]s (
merge_property_1_type, merge_property_1_value,
merge_property_2_type, merge_property_2_value
);
Expand Down Expand Up @@ -261,7 +261,7 @@ func (wh *HandleT) setupIdentityTables(warehouse warehouseutils.Warehouse) {
}

sqlStatement = fmt.Sprintf(`
CREATE INDEX IF NOT EXISTS rudder_id_index_ %[1]s ON %[1]s (rudder_id);
CREATE INDEX IF NOT EXISTS rudder_id_index_%[1]s ON %[1]s (rudder_id);
`,
warehouseutils.IdentityMappingsTableName(warehouse),
)
Expand All @@ -272,7 +272,7 @@ func (wh *HandleT) setupIdentityTables(warehouse warehouseutils.Warehouse) {
}

sqlStatement = fmt.Sprintf(`
CREATE INDEX IF NOT EXISTS merge_property_index_ %[1]s ON %[1]s (
CREATE INDEX IF NOT EXISTS merge_property_index_%[1]s ON %[1]s (
merge_property_type, merge_property_value
);
`,
Expand Down
1 change: 1 addition & 0 deletions warehouse/testhelper/.env
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ RSERVER_WAREHOUSE_DELTALAKE_MAX_PARALLEL_LOADS=8
RSERVER_WAREHOUSE_WAREHOUSE_SYNC_FREQ_IGNORE=true
RSERVER_WAREHOUSE_UPLOAD_FREQ_IN_S=10
RSERVER_WAREHOUSE_ENABLE_JITTER_FOR_SYNCS=false
RSERVER_WAREHOUSE_ENABLE_IDRESOLUTION=true

RSERVER_EVENT_SCHEMAS_ENABLE_EVENT_SCHEMAS_FEATURE=false
RSERVER_EVENT_SCHEMAS_SYNC_INTERVAL=15
Expand Down

0 comments on commit ff89dfd

Please sign in to comment.