Skip to content

Commit

Permalink
chore(warehouse): use fastUUID with google UUID generation (#2598)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Nov 11, 2022
1 parent bc0d1c7 commit 07093b1
Show file tree
Hide file tree
Showing 17 changed files with 47 additions and 53 deletions.
3 changes: 1 addition & 2 deletions services/pgnotifier/pgnotifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"time"

"github.com/allisson/go-pglock/v2"
"github.com/gofrs/uuid"
"github.com/lib/pq"
"github.com/spaolacci/murmur3"

Expand Down Expand Up @@ -535,7 +534,7 @@ func (notifier *PgNotifierT) Publish(payload MessagePayload, schema *whUtils.Sch
}
defer stmt.Close()

batchID := uuid.Must(uuid.NewV4()).String()
batchID := misc.FastUUID().String()
pkgLogger.Infof("PgNotifier: Inserting %d records into %s as batch: %s", len(jobs), queueName, batchID)
for _, job := range jobs {
_, err = stmt.Exec(batchID, WaitingState, string(job), notifier.workspaceIdentifier, priority, payload.JobType)
Expand Down
5 changes: 3 additions & 2 deletions warehouse/bigquery/bigquery_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (
"os"
"testing"

"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/stretchr/testify/require"

"cloud.google.com/go/bigquery"

"github.com/gofrs/uuid"
bigquery2 "github.com/rudderlabs/rudder-server/warehouse/bigquery"
"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/testhelper"
Expand Down Expand Up @@ -106,7 +107,7 @@ func TestBigQueryIntegration(t *testing.T) {
DestinationID: handle.DestinationId,
Schema: handle.Schema,
Tables: handle.Tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
MessageId: misc.FastUUID().String(),
Provider: warehouseutils.BQ,
LatestSourceRunConfig: testhelper.DefaultSourceRunConfig(),
}
Expand Down
7 changes: 4 additions & 3 deletions warehouse/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"os"
"testing"

"github.com/rudderlabs/rudder-server/utils/misc"

backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"

"github.com/rudderlabs/rudder-server/utils/timeutil"
Expand All @@ -17,7 +19,6 @@ import (

"cloud.google.com/go/bigquery"

"github.com/gofrs/uuid"
bigquery2 "github.com/rudderlabs/rudder-server/warehouse/bigquery"
"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/testhelper"
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestBigQueryIntegration(t *testing.T) {
WriteKey: writeKey,
Schema: schema,
Tables: tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
MessageId: misc.FastUUID().String(),
Provider: warehouseutils.BQ,
SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR",
DestinationID: "26Bgm9FrQDZjvadSwAlpd35atwn",
Expand Down Expand Up @@ -180,7 +181,7 @@ func TestBigQueryIntegration(t *testing.T) {
WriteKey: writeKey,
Schema: schema,
Tables: tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
MessageId: misc.FastUUID().String(),
Provider: warehouseutils.BQ,
SourceID: "24p1HhPk09FW25Kuzxv7GshCLKR",
DestinationID: "26Bgm9FrQDZjvadSwAlpd35atwn",
Expand Down
5 changes: 3 additions & 2 deletions warehouse/constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package warehouse
import (
"fmt"

"github.com/gofrs/uuid"
"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/iancoleman/strcase"
"github.com/rudderlabs/rudder-server/config"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
Expand Down Expand Up @@ -102,6 +103,6 @@ func (ic *IndexConstraintT) violates(brEvent *BatchRouterEventT, columnName stri
}
return &ConstraintsViolationT{
IsViolated: concatenatedLength > ic.Limit,
ViolatedIdentifier: fmt.Sprintf(`%s-%s`, strcase.ToKebab(warehouseutils.DiscardsTable), uuid.Must(uuid.NewV4()).String()),
ViolatedIdentifier: fmt.Sprintf(`%s-%s`, strcase.ToKebab(warehouseutils.DiscardsTable), misc.FastUUID().String()),
}
}
3 changes: 1 addition & 2 deletions warehouse/deltalake/deltalake.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (

"github.com/iancoleman/strcase"

"github.com/gofrs/uuid"
"github.com/rudderlabs/rudder-server/config"
proto "github.com/rudderlabs/rudder-server/proto/databricks"
"github.com/rudderlabs/rudder-server/services/stats"
Expand Down Expand Up @@ -205,7 +204,7 @@ func Connect(cred *databricks.CredentialsT, connectTimeout time.Duration) (dbHan
}

ctx := context.Background()
identifier := uuid.Must(uuid.NewV4()).String()
identifier := misc.FastUUID().String()
connConfig := &proto.ConnectionConfig{
Host: cred.Host,
Port: cred.Port,
Expand Down
8 changes: 4 additions & 4 deletions warehouse/deltalake/deltalake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"os"
"testing"

"github.com/rudderlabs/rudder-server/utils/misc"

backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"

"github.com/rudderlabs/rudder-server/utils/timeutil"

"github.com/gofrs/uuid"

proto "github.com/rudderlabs/rudder-server/proto/databricks"

"github.com/rudderlabs/rudder-server/warehouse/client"
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestDeltalakeIntegration(t *testing.T) {
WriteKey: handle.WriteKey,
Schema: handle.Schema,
Tables: handle.Tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
MessageId: misc.FastUUID().String(),
Provider: warehouseutils.DELTALAKE,
SourceID: "25H5EpYzojqQSepRSaGBrrPx3e4",
DestinationID: "25IDjdnoEus6DDNrth3SWO1FOpu",
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestDeltalakeIntegration(t *testing.T) {
WriteKey: handle.WriteKey,
Schema: handle.Schema,
Tables: handle.Tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
MessageId: misc.FastUUID().String(),
Provider: warehouseutils.DELTALAKE,
SourceID: "25H5EpYzojqQSepRSaGBrrPx3e4",
DestinationID: "25IDjdnoEus6DDNrth3SWO1FOpu",
Expand Down
7 changes: 3 additions & 4 deletions warehouse/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"strings"
"time"

"github.com/gofrs/uuid"
"github.com/lib/pq"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/filemanager"
Expand Down Expand Up @@ -86,7 +85,7 @@ func (idr *HandleT) applyRule(txn *sql.Tx, ruleID int64, gzWriter *misc.GZipWrit
// generate new one and assign to these two
var rudderID string
if len(rudderIDs) == 0 {
rudderID = uuid.Must(uuid.NewV4()).String()
rudderID = misc.FastUUID().String()
} else {
rudderID = rudderIDs[0]
}
Expand Down Expand Up @@ -173,7 +172,7 @@ func (idr *HandleT) addRules(txn *sql.Tx, loadFileNames []string, gzWriter *misc
// add rules from load files into temp table
// use original table to delete redundant ones from temp table
// insert from temp table into original table
mergeRulesStagingTable := fmt.Sprintf(`rudder_identity_merge_rules_staging_%s`, strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", ""))
mergeRulesStagingTable := fmt.Sprintf(`rudder_identity_merge_rules_staging_%s`, warehouseutils.RandHex())
sqlStatement := fmt.Sprintf(`CREATE TEMP TABLE %s
ON COMMIT DROP
AS SELECT * FROM %s
Expand Down Expand Up @@ -444,7 +443,7 @@ func (idr *HandleT) createTempGzFile(dirName string) (gzWriter misc.GZipWriter,
panic(err)
}
fileExtension := warehouseutils.GetTempFileExtension(idr.Warehouse.Type)
path = tmpDirPath + dirName + fmt.Sprintf(`%s_%s/%v/`, idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.ID, idr.UploadID) + uuid.Must(uuid.NewV4()).String() + "." + fileExtension
path = tmpDirPath + dirName + fmt.Sprintf(`%s_%s/%v/`, idr.Warehouse.Destination.DestinationDefinition.Name, idr.Warehouse.Destination.ID, idr.UploadID) + misc.FastUUID().String() + "." + fileExtension
err = os.MkdirAll(filepath.Dir(path), os.ModePerm)
if err != nil {
panic(err)
Expand Down
5 changes: 2 additions & 3 deletions warehouse/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (

"github.com/tidwall/gjson"

"github.com/gofrs/uuid"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/utils/logger"
Expand Down Expand Up @@ -275,7 +274,7 @@ func (rs *HandleT) generateManifest(tableName string, _ map[string]string) (stri
if err != nil {
panic(err)
}
localManifestPath := fmt.Sprintf("%v%v", tmpDirPath+dirName, uuid.Must(uuid.NewV4()).String())
localManifestPath := fmt.Sprintf("%v%v", tmpDirPath+dirName, misc.FastUUID().String())
err = os.MkdirAll(filepath.Dir(localManifestPath), os.ModePerm)
if err != nil {
panic(err)
Expand All @@ -301,7 +300,7 @@ func (rs *HandleT) generateManifest(tableName string, _ map[string]string) (stri
return "", err
}

uploadOutput, err := uploader.Upload(context.TODO(), file, manifestFolder, rs.Warehouse.Source.ID, rs.Warehouse.Destination.ID, time.Now().Format("01-02-2006"), tableName, uuid.Must(uuid.NewV4()).String())
uploadOutput, err := uploader.Upload(context.TODO(), file, manifestFolder, rs.Warehouse.Source.ID, rs.Warehouse.Destination.ID, time.Now().Format("01-02-2006"), tableName, misc.FastUUID().String())
if err != nil {
return "", err
}
Expand Down
6 changes: 3 additions & 3 deletions warehouse/redshift/redshift_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"os"
"testing"

"github.com/stretchr/testify/require"
"github.com/rudderlabs/rudder-server/utils/misc"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/warehouse/client"
"github.com/rudderlabs/rudder-server/warehouse/redshift"
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestSourceRedshiftIntegration(t *testing.T) {
SourceID: handle.SourceId,
DestinationID: handle.DestinationId,
LatestSourceRunConfig: testhelper.DefaultSourceRunConfig(),
MessageId: uuid.Must(uuid.NewV4()).String(),
MessageId: misc.FastUUID().String(),
Provider: warehouseutils.RS,
}

Expand Down
5 changes: 2 additions & 3 deletions warehouse/slave.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"strings"
"time"

"github.com/gofrs/uuid"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/services/pgnotifier"
Expand Down Expand Up @@ -187,7 +186,7 @@ func (job *Payload) getDiscardsTable() string {

func (jobRun *JobRunT) getLoadFilePath(tableName string) string {
job := jobRun.job
randomness := uuid.Must(uuid.NewV4()).String()
randomness := misc.FastUUID().String()
return strings.TrimSuffix(jobRun.stagingFilePath, "json.gz") + tableName + fmt.Sprintf(`.%s`, randomness) + fmt.Sprintf(`.%s`, warehouseutils.GetLoadFileFormat(job.DestinationType))
}

Expand Down Expand Up @@ -679,7 +678,7 @@ func processClaimedAsyncJob(claimedJob pgnotifier.ClaimT) {
func setupSlave(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)

slaveID := uuid.Must(uuid.NewV4()).String()
slaveID := misc.FastUUID().String()
jobNotificationChannel := notifier.Subscribe(ctx, slaveID, noOfSlaveWorkerRoutines)
for workerIdx := 0; workerIdx <= noOfSlaveWorkerRoutines-1; workerIdx++ {
idx := workerIdx
Expand Down
5 changes: 3 additions & 2 deletions warehouse/snowflake/snowflake_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"os"
"testing"

"github.com/rudderlabs/rudder-server/utils/misc"

// "github.com/stretchr/testify/require"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/warehouse/client"
Expand Down Expand Up @@ -93,7 +94,7 @@ func TestSnowflakeIntegration(t *testing.T) {
},
Schema: handle.Schema,
Tables: handle.Tables,
MessageId: uuid.Must(uuid.NewV4()).String(),
MessageId: misc.FastUUID().String(),
Provider: warehouseutils.SNOWFLAKE,
SourceID: handle.SourceId,
DestinationID: handle.DestinationId,
Expand Down
5 changes: 2 additions & 3 deletions warehouse/testhelper/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/joho/godotenv"

"github.com/gofrs/uuid"
azuresynapse "github.com/rudderlabs/rudder-server/warehouse/azure-synapse"
"github.com/rudderlabs/rudder-server/warehouse/datalake"

Expand Down Expand Up @@ -621,7 +620,7 @@ func DefaultSourceRunConfig() map[string]string {
}

func GetUserId(userType string) string {
return fmt.Sprintf("userId_%s_%s", strings.ToLower(userType), strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", ""))
return fmt.Sprintf("userId_%s_%s", strings.ToLower(userType), warehouseutils.RandHex())
}

func CreateBucketForMinio(t testing.TB, bucketName string) {
Expand Down Expand Up @@ -849,7 +848,7 @@ func DatabricksCredentials() (credentials databricks.CredentialsT, err error) {

func (w *WareHouseTest) MsgId() string {
if w.MessageId == "" {
return uuid.Must(uuid.NewV4()).String()
return misc.FastUUID().String()
}
return w.MessageId
}
3 changes: 1 addition & 2 deletions warehouse/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/gofrs/uuid"
"github.com/lib/pq"
"github.com/tidwall/gjson"

Expand Down Expand Up @@ -1846,7 +1845,7 @@ func (job *UploadJobT) createLoadFiles(generateAll bool) (startLoadFileID, endLo

publishBatchSize := config.GetInt("Warehouse.pgNotifierPublishBatchSize", 100)
pkgLogger.Infof("[WH]: Starting batch processing %v stage files for %s:%s", publishBatchSize, destType, destID)
uniqueLoadGenID := uuid.Must(uuid.NewV4()).String()
uniqueLoadGenID := misc.FastUUID().String()
job.upload.LoadFileGenStartTime = timeutil.Now()

// Getting distinct destination revision ID from staging files metadata
Expand Down
12 changes: 10 additions & 2 deletions warehouse/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"crypto/sha512"
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand All @@ -21,7 +22,6 @@ import (
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/sts"
"github.com/gofrs/uuid"
"github.com/iancoleman/strcase"
"github.com/tidwall/gjson"

Expand Down Expand Up @@ -1075,8 +1075,16 @@ func StagingTablePrefix(provider string) string {
}

func StagingTableName(provider, tableName string, tableNameLimit int) string {
randomNess := strings.ReplaceAll(uuid.Must(uuid.NewV4()).String(), "-", "")
randomNess := RandHex()
prefix := StagingTablePrefix(provider)
stagingTableName := fmt.Sprintf(`%s%s_%s`, prefix, tableName, randomNess)
return misc.TruncateStr(stagingTableName, tableNameLimit)
}

// RandHex returns a random hex string of length 32
func RandHex() string {
u := misc.FastUUID()
var buf [32]byte
hex.Encode(buf[:], u[:])
return string(buf[:])
}
6 changes: 1 addition & 5 deletions warehouse/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/gofrs/uuid"

"github.com/stretchr/testify/require"

"github.com/rudderlabs/rudder-server/config"
Expand Down Expand Up @@ -1396,9 +1394,7 @@ var _ = Describe("Utils", func() {

It("SSL keys", func() {
destinationID := "destID"
clientKey := uuid.Must(uuid.NewV4()).String()
clientCert := uuid.Must(uuid.NewV4()).String()
serverCA := uuid.Must(uuid.NewV4()).String()
clientKey, clientCert, serverCA := misc.FastUUID().String(), misc.FastUUID().String(), misc.FastUUID().String()

err := WriteSSLKeys(backendconfig.DestinationT{ID: destinationID, Config: map[string]interface{}{"clientKey": clientKey, "clientCert": clientCert, "serverCA": serverCA}})
Expect(err).To(Equal(WriteSSLKeyError{}))
Expand Down
Loading

0 comments on commit 07093b1

Please sign in to comment.