Skip to content

Commit

Permalink
chore: regulation worker avoid panic in case of timeout (#2657)
Browse files Browse the repository at this point in the history
  • Loading branch information
saurav-malani committed Nov 8, 2022
1 parent a0da8f7 commit 073a035
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 35 deletions.
10 changes: 6 additions & 4 deletions regulation-worker/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/rudderlabs/rudder-server/regulation-worker/internal/service"
"github.com/rudderlabs/rudder-server/services/filemanager"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
)

var pkgLogger = logger.NewLogger().Child("regulation-worker")
Expand Down Expand Up @@ -57,10 +58,9 @@ func Run(ctx context.Context) {
if err != nil {
panic(fmt.Errorf("error while getting workspaceId: %w", err))
}

svc := service.JobSvc{
API: &client.JobAPI{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.timeout", 30, time.Second)},
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.regulationManager.timeout", 60, time.Second)},
URLPrefix: config.MustGetString("CONFIG_BACKEND_URL"),
WorkspaceToken: config.MustGetString("CONFIG_BACKEND_TOKEN"),
WorkspaceID: workspaceId,
Expand All @@ -73,14 +73,16 @@ func Run(ctx context.Context) {
FilesLimit: config.GetInt("REGULATION_WORKER_FILES_LIMIT", 1000),
},
&api.APIManager{
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.timeout", 30, time.Second)},
Client: &http.Client{Timeout: config.GetDuration("HttpClient.regulationWorker.transformer.timeout", 60, time.Second)},
DestTransformURL: config.MustGetString("DEST_TRANSFORM_URL"),
}),
}

pkgLogger.Infof("calling looper with service: %v", svc)
l := withLoop(svc)
err = l.Loop(ctx)
err = misc.WithBugsnag(func() error {
return l.Loop(ctx)
})()
if err != nil && !errors.Is(err, context.Canceled) {
pkgLogger.Errorf("error: %v", err)
panic(err)
Expand Down
15 changes: 9 additions & 6 deletions regulation-worker/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"regexp"
"strconv"
"time"
Expand All @@ -30,12 +31,9 @@ type JobAPI struct {
// which is actually returned.
func (j *JobAPI) Get(ctx context.Context) (model.Job, error) {
pkgLogger.Debugf("making http request to regulation manager to get new job")
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

url := fmt.Sprintf("%s/dataplane/workspaces/%s/regulations/workerJobs", j.URLPrefix, j.WorkspaceID)
pkgLogger.Debugf("making GET request to URL: %v", url)

req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
pkgLogger.Errorf("error while create new http request: %v", err)
Expand All @@ -45,10 +43,12 @@ func (j *JobAPI) Get(ctx context.Context) (model.Job, error) {
req.SetBasicAuth(j.WorkspaceToken, "")
req.Header.Set("Content-Type", "application/json")

pkgLogger.Debugf("making request: %v", req)
resp, err := j.Client.Do(req)
if os.IsTimeout(err) {
stats.Default.NewStat("regulation_manager.request_timeout", stats.CountType).Count(1)
return model.Job{}, model.ErrRequestTimeout
}
if err != nil {
pkgLogger.Errorf("http request failed with error: %v", err)
return model.Job{}, err
}
defer func() {
Expand Down Expand Up @@ -133,8 +133,11 @@ func (j *JobAPI) UpdateStatus(ctx context.Context, status model.JobStatus, jobID
req.Header.Set("Content-Type", "application/json")

resp, err := j.Client.Do(req)
if os.IsTimeout(err) {
stats.Default.NewStat("regulation_manager.request_timeout", stats.CountType).Count(1)
return model.ErrRequestTimeout
}
if err != nil {
pkgLogger.Errorf("error while making http request: %v", err)
return err
}
defer func() { _ = resp.Body.Close() }()
Expand Down
22 changes: 19 additions & 3 deletions regulation-worker/internal/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/rudderlabs/rudder-server/regulation-worker/internal/client"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/initialize"
Expand All @@ -24,6 +25,7 @@ func TestGet(t *testing.T) {
expectedErr error
acutalErr error
expectedUsrAttributeCount int
serverDelay int
}{
{
name: "Get request to get job: successful",
Expand All @@ -44,17 +46,31 @@ func TestGet(t *testing.T) {
respCode: 429,
expectedErr: fmt.Errorf("unexpected response code: 429"),
},
{
name: "Get request to get model.ErrRequestTimeout",
workspaceID: "1001",
expectedErr: model.ErrRequestTimeout,
serverDelay: 1,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tt.respCode)
if tt.respCode != 0 {
w.WriteHeader(tt.respCode)
}
time.Sleep(time.Duration(tt.serverDelay) * time.Millisecond)
fmt.Fprintf(w, tt.respBody)
}))
defer svr.Close()

httpClient := &http.Client{}
if tt.serverDelay > 0 {
httpClient = &http.Client{
Timeout: time.Duration(tt.serverDelay) * time.Microsecond,
}
}
c := client.JobAPI{
Client: &http.Client{},
Client: httpClient,
WorkspaceID: tt.workspaceID,
URLPrefix: svr.URL,
}
Expand Down
18 changes: 6 additions & 12 deletions regulation-worker/internal/delete/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"fmt"
"io"
"net/http"
"os"
"strings"

"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
Expand All @@ -31,24 +32,19 @@ type APIManager struct {
// prepares payload based on (job,destDetail) & make an API call to transformer.
// gets (status, failure_reason) which is converted to appropriate model.Error & returned to caller.
func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map[string]interface{}, destName string) model.JobStatus {
pkgLogger.Debugf("deleting: %v", job, " from API destination: %v", destName)
method := "POST"
method := http.MethodPost
endpoint := "/deleteUsers"
url := fmt.Sprint(api.DestTransformURL, endpoint)
pkgLogger.Debugf("transformer url: %s", url)

bodySchema := mapJobToPayload(job, strings.ToLower(destName), destConfig)
pkgLogger.Debugf("payload: %#v", bodySchema)

reqBody, err := json.Marshal(bodySchema)
if err != nil {
pkgLogger.Errorf("error while marshalling job request body: %v", err)
return model.JobStatusFailed
}

req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(reqBody))
if err != nil {
pkgLogger.Errorf("error while create new request: %v", err)
return model.JobStatusFailed
}
req.Header.Set("Content-Type", "application/json")
Expand All @@ -62,26 +58,24 @@ func (api *APIManager) Delete(ctx context.Context, job model.Job, destConfig map
fileCleaningTime.Start()
defer fileCleaningTime.End()

pkgLogger.Debugf("sending request: %v", req)
resp, err := api.Client.Do(req)
if err != nil {
pkgLogger.Errorf("error while making http request: %v", err)
if os.IsTimeout(err) {
stats.Default.NewStat("regulation_worker_delete_api_timeout", stats.CountType).Count(1)
}
return model.JobStatusFailed
}
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
pkgLogger.Errorf("error while reading response body: %v", err)
return model.JobStatusFailed
}
bodyString := string(bodyBytes)
pkgLogger.Debugf("response body: %s", bodyString)

var jobResp []JobRespSchema
if err := json.Unmarshal(bodyBytes, &jobResp); err != nil {
pkgLogger.Errorf("error while decoding response body: %v", err)
return model.JobStatusFailed
}

switch resp.StatusCode {

case http.StatusOK:
Expand Down
1 change: 1 addition & 0 deletions regulation-worker/internal/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var (
ErrNoRunnableJob = errors.New("no runnable job found")
ErrDestNotImplemented = errors.New("job deletion not implemented for the destination")
ErrInvalidDestination = errors.New("invalid destination")
ErrRequestTimeout = errors.New("request timeout")
)

type JobStatus string
Expand Down
18 changes: 14 additions & 4 deletions regulation-worker/internal/service/looper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strconv"
"time"

backoff "github.com/cenkalti/backoff/v4"
"github.com/rudderlabs/rudder-server/regulation-worker/internal/model"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand All @@ -16,8 +15,7 @@ import (
var pkgLogger = logger.NewLogger().Child("service")

type Looper struct {
Backoff backoff.BackOffContext
Svc JobSvc
Svc JobSvc
}

func (l *Looper) Loop(ctx context.Context) error {
Expand All @@ -27,6 +25,10 @@ func (l *Looper) Loop(ctx context.Context) error {
if err != nil {
return fmt.Errorf("reading value: %s from env: %s", "INTERVAL_IN_MINUTES", err.Error())
}
retryDelay, err := getenvInt("RETRY_DELAY_IN_SECONDS", 60)
if err != nil {
return fmt.Errorf("reading value: %s from env: %s", "INTERVAL_IN_MINUTES", err.Error())
}

for {

Expand All @@ -38,7 +40,15 @@ func (l *Looper) Loop(ctx context.Context) error {
pkgLogger.Debugf("context cancelled... exiting infinite loop %v", err)
return nil
}

continue
}
// this is to make sure that we don't panic when any of the API call fails with deadline exceeded error.
if err == model.ErrRequestTimeout {
pkgLogger.Errorf("context deadline exceeded... retrying after %d minute(s): %v", retryDelay, err)
if err := misc.SleepCtx(ctx, time.Duration(retryDelay)*time.Second); err != nil {
pkgLogger.Debugf("context cancelled... exiting infinite loop %v", err)
return nil
}
continue
}

Expand Down
16 changes: 10 additions & 6 deletions regulation-worker/internal/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package service
// TODO: appropriate status var update and handling via model.status
import (
"context"
"fmt"
"time"

"github.com/cenkalti/backoff"
Expand Down Expand Up @@ -36,6 +35,7 @@ type JobSvc struct {
// calls api-client.getJob(workspaceID)
// calls api-client to get new job with workspaceID, which returns jobID.
func (js *JobSvc) JobSvc(ctx context.Context) error {
loopStart := time.Now()
// API request to get new job
pkgLogger.Debugf("making API request to get job")
job, err := js.API.Get(ctx)
Expand All @@ -44,11 +44,6 @@ func (js *JobSvc) JobSvc(ctx context.Context) error {
return err
}

totalJobTime := stats.Default.NewTaggedStat("total_job_time", stats.TimerType, stats.Tags{"jobId": fmt.Sprintf("%d", job.ID), "workspaceId": job.WorkspaceID})
totalJobTime.Start()
defer totalJobTime.End()

pkgLogger.Debugf("job: %v", job)
// once job is successfully received, calling updatestatus API to update the status of job to running.
status := model.JobStatusRunning
err = js.updateStatus(ctx, status, job.ID)
Expand All @@ -65,7 +60,16 @@ func (js *JobSvc) JobSvc(ctx context.Context) error {
return js.updateStatus(ctx, model.JobStatusFailed, job.ID)
}

deletionStart := time.Now()

status = js.Deleter.Delete(ctx, job, destDetail)

stats.Default.NewTaggedStat("deletion_time", stats.TimerType, stats.Tags{"workspaceId": job.WorkspaceID, "destinationid": destDetail.DestinationID, "destinationType": destDetail.Name, "status": string(status)}).Since(deletionStart)
if status == model.JobStatusComplete {
stats.Default.NewTaggedStat("deleted_user_count", stats.CountType, stats.Tags{"workspaceId": job.WorkspaceID, "destinationid": destDetail.DestinationID, "destinationType": destDetail.Name}).Count(len(job.Users))
}
stats.Default.NewTaggedStat("loop_time", stats.TimerType, stats.Tags{"workspaceId": job.WorkspaceID, "destinationid": destDetail.DestinationID, "destinationType": destDetail.Name, "status": string(status)}).Since(loopStart)

return js.updateStatus(ctx, status, job.ID)
}

Expand Down

0 comments on commit 073a035

Please sign in to comment.