Skip to content

Commit

Permalink
fix(warehouse): syncs issues (#2732)
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Nov 28, 2022
1 parent cf80d67 commit 0941cc0
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 27 deletions.
89 changes: 62 additions & 27 deletions warehouse/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"strings"
"time"

"google.golang.org/genproto/googleapis/rpc/code"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/rudderlabs/rudder-server/warehouse/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/validations"

Expand Down Expand Up @@ -106,6 +110,7 @@ const (
TriggeredSuccessfully = "Triggered successfully"
NoPendingEvents = "No pending events to sync for this destination"
DownloadFileNamePattern = "downloadfile.*.tmp"
NoSuchSync = "No such sync exist"
)

func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {
Expand Down Expand Up @@ -144,7 +149,7 @@ func InitWarehouseAPI(dbHandle *sql.DB, log logger.Logger) error {

func (uploadsReq *UploadsReqT) validateReq() error {
if !uploadsReq.API.enabled || uploadsReq.API.log == nil || uploadsReq.API.dbHandle == nil {
return errors.New(`warehouse api's are not initialized`)
return errors.New(`warehouse api are not initialized`)
}
if uploadsReq.Limit < 1 {
uploadsReq.Limit = 10
Expand Down Expand Up @@ -250,16 +255,21 @@ func (uploadsReq *UploadsReqT) TriggerWhUploads() (response *proto.TriggerWhUplo
func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error) {
err := uploadReq.validateReq()
if err != nil {
return &proto.WHUploadResponse{}, err
return &proto.WHUploadResponse{}, status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), err.Error())
}

query := uploadReq.generateQuery(`id, source_id, destination_id, destination_type, namespace, status, error, created_at, first_event_at, last_event_at, last_exec_at, updated_at, timings, metadata->>'nextRetryTime', metadata->>'archivedStagingAndLoadFiles'`)
uploadReq.API.log.Debug(query)
var upload proto.WHUploadResponse
var nextRetryTimeStr sql.NullString
var firstEventAt, lastEventAt, createdAt, lastExecAt, updatedAt sql.NullTime
var timingsObject sql.NullString
var uploadError string
var isUploadArchived sql.NullBool

var (
upload proto.WHUploadResponse
nextRetryTimeStr sql.NullString
firstEventAt, lastEventAt, createdAt, lastExecAt, updatedAt sql.NullTime
timingsObject sql.NullString
uploadError string
isUploadArchived sql.NullBool
)

row := uploadReq.API.dbHandle.QueryRow(query)
err = row.Scan(
&upload.Id,
Expand All @@ -278,14 +288,19 @@ func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error) {
&nextRetryTimeStr,
&isUploadArchived,
)
if err == sql.ErrNoRows {
return &proto.WHUploadResponse{}, status.Errorf(codes.Code(code.Code_NOT_FOUND), "sync not found")
}
if err != nil {
uploadReq.API.log.Errorf(err.Error())
return &proto.WHUploadResponse{}, err
return &proto.WHUploadResponse{}, status.Errorf(codes.Code(code.Code_INTERNAL), err.Error())
}

if !uploadReq.authorizeSource(upload.SourceId) {
pkgLogger.Errorf(`Unauthorized request for upload:%d with sourceId:%s in workspaceId:%s`, uploadReq.UploadId, upload.SourceId, uploadReq.WorkspaceID)
return &proto.WHUploadResponse{}, errors.New("unauthorized request")
return &proto.WHUploadResponse{}, status.Error(codes.Code(code.Code_UNAUTHENTICATED), "unauthorized request")
}

upload.CreatedAt = timestamppb.New(createdAt.Time)
upload.FirstEventAt = timestamppb.New(firstEventAt.Time)
upload.LastEventAt = timestamppb.New(lastEventAt.Time)
Expand Down Expand Up @@ -317,16 +332,17 @@ func (uploadReq UploadReqT) GetWHUpload() (*proto.WHUploadResponse, error) {
} else {
upload.Duration = int32(timeutil.Now().Sub(lastExecAt.Time) / time.Second)
}

tableUploadReq := TableUploadReqT{
UploadID: upload.Id,
Name: "",
API: uploadReq.API,
}
tables, err := tableUploadReq.GetWhTableUploads()
upload.Tables, err = tableUploadReq.GetWhTableUploads()
if err != nil {
return &proto.WHUploadResponse{}, err
return &proto.WHUploadResponse{}, status.Errorf(codes.Code(code.Code_INTERNAL), err.Error())
}
upload.Tables = tables

return &upload, nil
}

Expand All @@ -343,10 +359,14 @@ func (uploadReq UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsR
if err != nil {
return
}

var (
uploadJobT UploadJobT
upload Upload
)

query := uploadReq.generateQuery(`id, source_id, destination_id, metadata`)
uploadReq.API.log.Debug(query)
var uploadJobT UploadJobT
var upload Upload

row := uploadReq.API.dbHandle.QueryRow(query)
err = row.Scan(
Expand All @@ -355,15 +375,23 @@ func (uploadReq UploadReqT) TriggerWHUpload() (response *proto.TriggerWhUploadsR
&upload.DestinationID,
&upload.Metadata,
)
if err == sql.ErrNoRows {
return &proto.TriggerWhUploadsResponse{
Message: NoSuchSync,
StatusCode: http.StatusOK,
}, nil
}
if err != nil {
uploadReq.API.log.Errorf(err.Error())
return
}

if !uploadReq.authorizeSource(upload.SourceID) {
pkgLogger.Errorf(`Unauthorized request for upload:%d with sourceId:%s in workspaceId:%s`, uploadReq.UploadId, upload.SourceID, uploadReq.WorkspaceID)
err = errors.New("unauthorized request")
return
}

uploadJobT.upload = &upload
uploadJobT.dbHandle = uploadReq.API.dbHandle
err = uploadJobT.triggerUploadNow()
Expand Down Expand Up @@ -449,7 +477,7 @@ func (tableUploadReq TableUploadReqT) generateQuery(selectFields string) string

func (tableUploadReq TableUploadReqT) validateReq() error {
if !tableUploadReq.API.enabled || tableUploadReq.API.log == nil || tableUploadReq.API.dbHandle == nil {
return errors.New("warehouse api's are not initialized")
return errors.New("warehouse api are not initialized")
}
if tableUploadReq.UploadID == 0 {
return errors.New("upload_id is empty or should be greater than 0")
Expand All @@ -474,7 +502,7 @@ func (uploadReq UploadReqT) generateQuery(selectedFields string) string {

func (uploadReq UploadReqT) validateReq() error {
if !uploadReq.API.enabled || uploadReq.API.log == nil || uploadReq.API.dbHandle == nil {
return errors.New("warehouse api's are not initialized")
return errors.New("warehouse api are not initialized")
}
if uploadReq.UploadId < 1 {
return errors.New(`upload_id is empty or should be greater than 0 `)
Expand Down Expand Up @@ -628,11 +656,16 @@ func (uploadsReq *UploadsReqT) getTotalUploadCount(whereClause string) (int32, e

// for hosted workspaces - we get the uploads and the total upload count using the same query
func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []string, selectFields string) (uploadsRes *proto.WHUploadsResponse, err error) {
var uploads []*proto.WHUploadResponse
var totalUploadCount int32
var (
uploads []*proto.WHUploadResponse
totalUploadCount int32
whereClauses []string
subQuery string
query string
)

// create query
subQuery := fmt.Sprintf(`
subQuery = fmt.Sprintf(`
SELECT
%s,
COUNT(*) OVER() AS total_uploads
Expand All @@ -643,7 +676,6 @@ func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []s
selectFields,
warehouseutils.WarehouseUploadsTable,
)
var whereClauses []string
if uploadsReq.SourceID == "" {
whereClauses = append(whereClauses, fmt.Sprintf(`source_id IN (%v)`, misc.SingleQuoteLiteralJoin(authorizedSourceIDs)))
} else if misc.Contains(authorizedSourceIDs, uploadsReq.SourceID) {
Expand All @@ -660,7 +692,7 @@ func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []s
}

subQuery = subQuery + strings.Join(whereClauses, " AND ")
query := fmt.Sprintf(`
query = fmt.Sprintf(`
SELECT
*
FROM
Expand Down Expand Up @@ -697,11 +729,16 @@ func (uploadsReq *UploadsReqT) warehouseUploadsForHosted(authorizedSourceIDs []s

// for non hosted workspaces - we get the uploads and the total upload count using separate queries
func (uploadsReq *UploadsReqT) warehouseUploads(selectFields string) (uploadsRes *proto.WHUploadsResponse, err error) {
var uploads []*proto.WHUploadResponse
var totalUploadCount int32
var (
uploads []*proto.WHUploadResponse
totalUploadCount int32
query string
whereClause string
whereClauses []string
)

// create query
query := fmt.Sprintf(`
query = fmt.Sprintf(`
select
%s
from
Expand All @@ -710,8 +747,6 @@ func (uploadsReq *UploadsReqT) warehouseUploads(selectFields string) (uploadsRes
selectFields,
warehouseutils.WarehouseUploadsTable,
)
whereClause := ""
var whereClauses []string
if uploadsReq.SourceID != "" {
whereClauses = append(whereClauses, fmt.Sprintf(`source_id = '%s'`, uploadsReq.SourceID))
}
Expand Down
29 changes: 29 additions & 0 deletions warehouse/warehousegrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ func (*warehouseGRPC) GetWHUploads(_ context.Context, request *proto.WHUploadsRe
Offset: request.Offset,
API: UploadAPI,
}
uploadsReq.API.log.Info(
"[GetWHUploads] Fetching warehouse uploads for WorkspaceId: %s, SourceId: %s, DestinationId: %s",
uploadsReq.WorkspaceID,
uploadsReq.SourceID,
uploadsReq.DestinationID,
)
res, err := uploadsReq.GetWhUploads()
return res, err
}
Expand All @@ -41,6 +47,12 @@ func (*warehouseGRPC) TriggerWHUploads(_ context.Context, request *proto.WHUploa
DestinationID: request.DestinationId,
API: UploadAPI,
}
uploadsReq.API.log.Info(
"[TriggerWHUploads] Triggering warehouse uploads for WorkspaceId: %s, SourceId: %s, DestinationId: %s",
uploadsReq.WorkspaceID,
uploadsReq.SourceID,
uploadsReq.DestinationID,
)
res, err := uploadsReq.TriggerWhUploads()
return res, err
}
Expand All @@ -51,6 +63,11 @@ func (*warehouseGRPC) GetWHUpload(_ context.Context, request *proto.WHUploadRequ
WorkspaceID: request.WorkspaceId,
API: UploadAPI,
}
uploadReq.API.log.Info(
"[GetWHUpload] Fetching warehouse upload for WorkspaceId: %s, UploadId: %d",
uploadReq.WorkspaceID,
uploadReq.UploadId,
)
res, err := uploadReq.GetWHUpload()
return res, err
}
Expand All @@ -65,6 +82,11 @@ func (*warehouseGRPC) TriggerWHUpload(_ context.Context, request *proto.WHUpload
WorkspaceID: request.WorkspaceId,
API: UploadAPI,
}
uploadReq.API.log.Info(
"[TriggerWHUpload] Triggering warehouse upload for WorkspaceId: %s, UploadId: %d",
uploadReq.WorkspaceID,
uploadReq.UploadId,
)
res, err := uploadReq.TriggerWHUpload()
return res, err
}
Expand Down Expand Up @@ -169,6 +191,13 @@ func (*warehouseGRPC) CountWHUploadsToRetry(ctx context.Context, req *proto.Retr
UploadIds: req.UploadIds,
API: UploadAPI,
}
retryReq.API.log.Info(
"[RetryWHUploads] Retrying warehouse uploads for WorkspaceId: %s, SourceId: %s, DestinationId: %s, IntervalInHours: %d",
retryReq.WorkspaceID,
retryReq.SourceID,
retryReq.DestinationID,
retryReq.IntervalInHours,
)
r, err := retryReq.UploadsToRetry(ctx)
response = &proto.RetryWHUploadsResponse{
Count: r.Count,
Expand Down

0 comments on commit 0941cc0

Please sign in to comment.