Skip to content

Commit

Permalink
feat: regulation api support for Universal Analytics (#2632)
Browse files Browse the repository at this point in the history
* feat: regulation api support for Universal Analytics

* Move info log to debug

* name, error string correction & remove unnecessary condition

* separate method for handling refresh flow method implemented

* addresses naming logging concerns

* add OAuth tests

* add tests for oauth destination in regulation-api

* linter fix

* fix tests with dynamic mocking of cp requests, trigger refreshFlow for only oauth destinations

* pass destination struct to Delete method & associated tests

* moving the fetch token to just before sending request to transformer

* remove comments

* refactor fetch token logic & tests

* refactor refresh flow handling

* change test values

* combine oauth setup with new oauth instance initialisation

* fix to retry the job when refresh token fails & add more cases around error scenarios

* update test-case for fetchToken error

* regulation worker API oauth code refactor

* move oauth logic to separate folder
- remove old structure
- update mocks

* - update error message
- add a condition to check for accountId information
- add a new field to capture flowType for stats & update relevant methods
- add functional option pattern to modify options according to the caller

* rename function parameter

* fix: (CR) rename InitAll to init

* add option for oauth init in tests

* fix: code-review changes

* feat: single retry capability added after refresh error occurs

* fix: update if-else block properly

Co-authored-by: Sai Sankeerth <sanpj2292@github.com>
Co-authored-by: saurav-malani <sauravmalani1@gmail.com>
  • Loading branch information
3 people committed Nov 24, 2022
1 parent ef64bba commit 87a5d02
Show file tree
Hide file tree
Showing 18 changed files with 582 additions and 106 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion regulation-worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/initialize"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/service"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/services/oauth"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
)

var pkgLogger = logger.NewLogger().Child("regulation-worker")

func main() {
func init() {
initialize.Init()
backendconfig.Init()
oauth.Init()
}

func main() {
pkgLogger.Info("starting regulation-worker")
ctx, cancel := context.WithCancel(context.Background())

Expand All @@ -58,6 +62,9 @@ func Run(ctx context.Context) {
if err != nil {
panic(fmt.Errorf("error while getting workspaceId: %w", err))
}
// setting up oauth
OAuth := oauth.NewOAuthErrorHandler(backendconfig.DefaultBackendConfig, oauth.WithRudderFlow(oauth.RudderFlow_Delete))

svc := service.JobSvc{
API: &client.JobAPI{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.regulationManager.timeout", 60, time.Second)},
Expand All @@ -75,6 +82,7 @@ func Run(ctx context.Context) {
&api.APIManager{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)},
DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"),
OAuth: OAuth,
}),
}

Expand Down
116 changes: 107 additions & 9 deletions regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,49 @@ import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"strings"

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/services/oauth"
"github.com/rudderlabs/rudder-server/services/stats"
"github.com/rudderlabs/rudder-server/utils/logger"
)

var (
pkgLogger = logger.NewLogger().Child("api")
supportedDestinations = []string{"BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP"}
supportedDestinations = []string{"BRAZE", "AM", "INTERCOM", "CLEVERTAP", "AF", "MP", "GA"}
)

type APIManager struct {
Client *http.Client
DestTransformURL string
OAuth oauth.Authorizer
}

type oauthDetail struct {
secretToken *oauth.AuthResponse
id string
}

func (*APIManager) GetSupportedDestinations() []string {
return supportedDestinations
}

// prepares payload based on (job,destDetail) & make an API call to transformer.
// gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller.
func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map[string]interface{}, destName string) model.JobStatus {
func (api *APIManager) Delete(ctx context.Context, job model.Job, destination model.Destination) model.JobStatus {
pkgLogger.Debugf("deleting: %v", job, " from API destination: %v", destination.Name)
method := http.MethodPost
endpoint := "/deleteUsers"
url := fmt.Sprint(api.DestTransformURL, endpoint)

bodySchema := mapJobToPayload(job, strings.ToLower(destName), destConfig)
bodySchema := mapJobToPayload(job, strings.ToLower(destination.Name), destination.Config)
pkgLogger.Debugf("payload: %#v", bodySchema)

reqBody, err := json.Marshal(bodySchema)
if err != nil {
Expand All @@ -49,11 +63,27 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
}
req.Header.Set("Content-Type", "application/json")

// check if OAuth destination
isOAuthEnabled := oauth.GetAuthType(destination.DestDefConfig) == oauth.OAuth
var oAuthDetail oauthDetail
if isOAuthEnabled {
oAuthDetail, err = api.getOAuthDetail(&destination, job.WorkspaceID)
if err != nil {
pkgLogger.Error(err)
return model.JobStatusFailed
}
err = setOAuthHeader(oAuthDetail.secretToken, req)
if err != nil {
pkgLogger.Error(err)
return model.JobStatusFailed
}
}

fileCleaningTime := stats.Default.NewTaggedStat("file_cleaning_time", stats.TimerType, stats.Tags{
"jobId": fmt.Sprintf("%d", job.ID),
"workspaceId": job.WorkspaceID,
"destType": "api",
"destName": strings.ToLower(destName),
"destName": strings.ToLower(destination.Name),
})
fileCleaningTime.Start()
defer fileCleaningTime.End()
Expand All @@ -75,8 +105,25 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
if err := json.Unmarshal(bodyBytes, &jobResp); err != nil {
return model.JobStatusFailed
}
jobStatus := getJobStatus(resp.StatusCode, jobResp)
pkgLogger.Debugf("[%v] JobStatus for %v: %v", destination.Name, destination.DestinationID, jobStatus)

if isOAuthEnabled && isTokenExpired(jobResp) {
err = api.refreshOAuthToken(destination.Name, job.WorkspaceID, oAuthDetail)
if err != nil {
pkgLogger.Error(err)
return model.JobStatusFailed
}
// retry the request
pkgLogger.Debug("Retrying deleteRequest job for the whole batch")
return api.Delete(ctx, job, destination)
}

return jobStatus
}

switch resp.StatusCode {
func getJobStatus(statusCode int, jobResp []JobRespSchema) model.JobStatus {
switch statusCode {

case http.StatusOK:
return model.JobStatusComplete
Expand All @@ -103,10 +150,6 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
}
}

func (*APIManager) GetSupportedDestinations() []string {
return supportedDestinations
}

func mapJobToPayload(job model.Job, destName string, destConfig map[string]interface{}) []apiDeletionPayloadSchema {
uas := make([]userAttributesSchema, len(job.Users))
for i, ua := range job.Users {
Expand All @@ -126,3 +169,58 @@ func mapJobToPayload(job model.Job, destName string, destConfig map[string]inter
},
}
}

func isTokenExpired(jobResponses []JobRespSchema) bool {
for _, jobResponse := range jobResponses {
if jobResponse.AuthErrorCategory == oauth.REFRESH_TOKEN {
return true
}
}
return false
}

func setOAuthHeader(secretToken *oauth.AuthResponse, req *http.Request) error {
payload, marshalErr := json.Marshal(secretToken.Account)
if marshalErr != nil {
marshalFailErr := fmt.Sprintf("error while marshalling account secret information: %v", marshalErr)
pkgLogger.Errorf(marshalFailErr)
return errors.New(marshalFailErr)
}
req.Header.Set("X-Rudder-Dest-Info", string(payload))
return nil
}

func (api *APIManager) getOAuthDetail(destDetail *model.Destination, workspaceId string) (oauthDetail, error) {
id := oauth.GetAccountId(destDetail.Config, oauth.DeleteAccountIdKey)
if strings.TrimSpace(id) == "" {
return oauthDetail{}, fmt.Errorf("%v is not present for %v", oauth.DeleteAccountIdKey, destDetail.Name)
}
tokenStatusCode, secretToken := api.OAuth.FetchToken(&oauth.RefreshTokenParams{
AccountId: id,
WorkspaceId: workspaceId,
DestDefName: destDetail.Name,
EventNamePrefix: "fetch_token",
})
if tokenStatusCode != http.StatusOK {
return oauthDetail{}, fmt.Errorf("[%s][FetchToken] Error in Token Fetch statusCode: %d\t error: %s", destDetail.Name, tokenStatusCode, secretToken.Err)
}
return oauthDetail{
id: id,
secretToken: secretToken,
}, nil
}

func (api *APIManager) refreshOAuthToken(destName, workspaceId string, oAuthDetail oauthDetail) error {
refTokenParams := &oauth.RefreshTokenParams{
Secret: oAuthDetail.secretToken.Account.Secret,
WorkspaceId: workspaceId,
AccountId: oAuthDetail.id,
DestDefName: destName,
EventNamePrefix: "refresh_token",
}
statusCode, _ := api.OAuth.RefreshToken(refTokenParams)
if statusCode != http.StatusOK {
return fmt.Errorf("failed to refresh token for destination: %v", destName)
}
return nil
}
Loading

0 comments on commit 87a5d02

Please sign in to comment.