Skip to content

Commit

Permalink
feat(router): avoid worker starvation during job pickup
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 1, 2022
1 parent a6e253b commit fee04b7
Show file tree
Hide file tree
Showing 32 changed files with 1,925 additions and 1,144 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ jobs:

- name: Integration test for enterprise
if: matrix.FEATURES == 'enterprise'
run: go test -v ./docker_test.go -count 1
run: go test -v ./integration_test/docker_test/docker_test.go -count 1
env:
ENTERPRISE_TOKEN: ${{ secrets.ENTERPRISE_TOKEN }}

- name: Integration test for oss
if: matrix.FEATURES == 'oss'
run: go test -v ./docker_test.go -count 1
run: go test -v ./integration_test/docker_test/docker_test.go -count 1
env:
RSERVER_ENABLE_MULTITENANCY: ${{ matrix.MULTITENANCY }}

Expand Down
3 changes: 1 addition & 2 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ func TestDynamicClusterManager(t *testing.T) {
return ch
}).AnyTimes()
mockMTI.EXPECT().UpdateWorkspaceLatencyMap(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
mockMTI.EXPECT().GetRouterPickupJobs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(),
gomock.Any()).AnyTimes()
mockMTI.EXPECT().GetRouterPickupJobs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

provider := &mockModeProvider{modeCh: make(chan servermode.ChangeEvent)}
dCM := &cluster.Dynamic{
Expand Down
17 changes: 10 additions & 7 deletions app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ type Options struct {
}

// LoadOptions loads application's initialisation options based on command line flags and environment
func LoadOptions() *Options {
func LoadOptions(args []string) *Options {
flagSet := flag.NewFlagSet(args[0], flag.ExitOnError)
// Parse command line options
normalMode := flag.Bool("normal-mode", false, "a bool")
degradedMode := flag.Bool("degraded-mode", false, "a bool")
clearDB := flag.Bool("cleardb", false, "a bool")
cpuprofile := flag.String("cpuprofile", "", "write cpu profile to `file`")
memprofile := flag.String("memprofile", "", "write memory profile to `file`")
versionFlag := flag.Bool("v", false, "Print the current version and exit")
normalMode := flagSet.Bool("normal-mode", false, "a bool")
degradedMode := flagSet.Bool("degraded-mode", false, "a bool")
clearDB := flagSet.Bool("cleardb", false, "a bool")
cpuprofile := flagSet.String("cpuprofile", "", "write cpu profile to `file`")
memprofile := flagSet.String("memprofile", "", "write memory profile to `file`")
versionFlag := flagSet.Bool("v", false, "Print the current version and exit")

serverMode := os.Getenv("RSERVER_MODE")
if serverMode == "normal" {
Expand All @@ -33,6 +34,8 @@ func LoadOptions() *Options {
*degradedMode = true
}

// Ignore errors; flagSet is set for ExitOnError.
_ = flagSet.Parse(args[1:])
flag.Parse()

return &Options{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// It then runs the service ensuring it is configured to use the dependencies.
// Finally, it sends events and observe the destinations expecting to get the events back.

package main_test
package docker_test

import (
"context"
Expand All @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/rudderlabs/rudder-server/runner"
"github.com/rudderlabs/rudder-server/testhelper"
"github.com/rudderlabs/rudder-server/testhelper/workspaceConfig"

Expand All @@ -33,7 +34,6 @@ import (
"github.com/tidwall/gjson"
"golang.org/x/sync/errgroup"

main "github.com/rudderlabs/rudder-server"
"github.com/rudderlabs/rudder-server/config"
kafkaClient "github.com/rudderlabs/rudder-server/services/streammanager/kafka/client"
"github.com/rudderlabs/rudder-server/services/streammanager/kafka/client/testutil"
Expand Down Expand Up @@ -422,7 +422,7 @@ func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} {
})
require.NoError(t, containersGroup.Wait())

if err := godotenv.Load("testhelper/.env"); err != nil {
if err := godotenv.Load("../../testhelper/.env"); err != nil {
t.Log("INFO: No .env file found.")
}

Expand Down Expand Up @@ -484,7 +484,8 @@ func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} {

svcDone := make(chan struct{})
go func() {
_ = main.Run(svcCtx)
r := runner.New(runner.ReleaseInfo{})
_ = r.Run(svcCtx, []string{"docker-test-rudder-server"})
close(svcDone)
}()

Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package main
package multi_tenant_test

import (
"context"
Expand Down Expand Up @@ -157,7 +157,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
go func() {
defer close(done)
defer cancel()
cmd := exec.CommandContext(ctx, "go", "run", "main.go")
cmd := exec.CommandContext(ctx, "go", "run", "../../main.go")
cmd.Env = append(os.Environ(),
"APP_TYPE="+appType,
"INSTANCE_ID="+serverInstanceID,
Expand Down Expand Up @@ -238,7 +238,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
require.NoError(t, err)
require.Equal(t, "RELOADED", v.Status)
require.Equal(t, "", v.Error)
case <-time.After(20 * time.Second):
case <-time.After(60 * time.Second):
_, err = clientv3.New(clientv3.Config{
Endpoints: etcdContainer.Hosts,
DialOptions: []grpc.DialOption{
Expand Down Expand Up @@ -303,7 +303,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
require.Len(t, ack.Events, 1)
require.Equal(t, "test-ack/normal", string(ack.Events[0].Kv.Key))
require.Equal(t, `{"status":"NORMAL"}`, string(ack.Events[0].Kv.Value))
case <-time.After(20 * time.Second):
case <-time.After(60 * time.Second):
t.Fatal("Timeout waiting for server-mode test-ack")
}
sendEventsToGateway(t, httpPort, writeKey)
Expand All @@ -330,7 +330,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
require.Len(t, ack.Events, 1)
require.Equal(t, "test-ack/2", string(ack.Events[0].Kv.Key))
require.Equal(t, `{"status":"DEGRADED"}`, string(ack.Events[0].Kv.Value))
case <-time.After(20 * time.Second):
case <-time.After(60 * time.Second):
t.Fatal("Timeout waiting for server-mode test-ack")
}

Expand Down Expand Up @@ -359,7 +359,7 @@ func testMultiTenantByAppType(t *testing.T, appType string) {
require.NoError(t, err)
require.Equal(t, "RELOADED", v.Status)
require.Equal(t, "", v.Error)
case <-time.After(20 * time.Second):
case <-time.After(60 * time.Second):
t.Fatal("Timeout waiting for test-ack/3")
}
})
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
40 changes: 20 additions & 20 deletions jobsdb/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,49 +827,49 @@ func TestMultiTenantLegacyGetAllJobs(t *testing.T) {

t.Run("GetAllJobs with large limits", func(t *testing.T) {
params := GetQueryParamsT{JobsLimit: 30}
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: 30}, params, 0)
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: 30}, params, 0, nil)
require.NoError(t, err, "failed to get all jobs")
require.Equal(t, 30, len(allJobs), "should get all 30 jobs")
require.Equal(t, 30, len(allJobs.Jobs), "should get all 30 jobs")
})

t.Run("GetAllJobs with only jobs limit", func(t *testing.T) {
jobsLimit := 10
params := GetQueryParamsT{JobsLimit: 10}
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0)
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0, nil)
require.NoError(t, err, "failed to get all jobs")
require.Truef(t, len(allJobs)-jobsLimit == 0, "should get %d jobs", jobsLimit)
require.Truef(t, len(allJobs.Jobs)-jobsLimit == 0, "should get %d jobs", jobsLimit)
})

t.Run("GetAllJobs with events limit", func(t *testing.T) {
jobsLimit := 10
params := GetQueryParamsT{JobsLimit: 10, EventsLimit: 3 * eventsPerJob}
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0)
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0, nil)
require.NoError(t, err, "failed to get all jobs")
require.Equal(t, 3, len(allJobs), "should get 3 jobs")
require.Equal(t, 3, len(allJobs.Jobs), "should get 3 jobs")
})

t.Run("GetAllJobs with events limit less than the events of the first job get one job", func(t *testing.T) {
jobsLimit := 10
params := GetQueryParamsT{JobsLimit: jobsLimit, EventsLimit: eventsPerJob - 1}
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0)
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0, nil)
require.NoError(t, err, "failed to get all jobs")
require.Equal(t, 1, len(allJobs), "should get 1 overflown job")
require.Equal(t, 1, len(allJobs.Jobs), "should get 1 overflown job")
})

t.Run("GetAllJobs with payload limit", func(t *testing.T) {
jobsLimit := 10
params := GetQueryParamsT{JobsLimit: jobsLimit, PayloadSizeLimit: 3 * payloadSize}
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0)
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0, nil)
require.NoError(t, err, "failed to get all jobs")
require.Equal(t, 3, len(allJobs), "should get 3 jobs")
require.Equal(t, 3, len(allJobs.Jobs), "should get 3 jobs")
})

t.Run("GetAllJobs with payload limit less than the payload size should get one job", func(t *testing.T) {
jobsLimit := 10
params := GetQueryParamsT{JobsLimit: jobsLimit, PayloadSizeLimit: payloadSize - 1}
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0)
allJobs, err := mtl.GetAllJobs(context.Background(), map[string]int{defaultWorkspaceID: jobsLimit}, params, 0, nil)
require.NoError(t, err, "failed to get all jobs")
require.Equal(t, 1, len(allJobs), "should get 1 overflown job")
require.Equal(t, 1, len(allJobs.Jobs), "should get 1 overflown job")
})
}

Expand Down Expand Up @@ -933,9 +933,9 @@ func TestMultiTenantGetAllJobs(t *testing.T) {
workspaceC: 30,
}
params := GetQueryParamsT{JobsLimit: 90}
allJobs, err := mtl.GetAllJobs(context.Background(), workspaceLimits, params, 100)
allJobs, err := mtl.GetAllJobs(context.Background(), workspaceLimits, params, 100, nil)
require.NoError(t, err, "failed to get all jobs")
require.Equal(t, 90, len(allJobs), "should get all 90 jobs")
require.Equal(t, 90, len(allJobs.Jobs), "should get all 90 jobs")
})

t.Run("GetAllJobs with only jobs limit", func(t *testing.T) {
Expand All @@ -946,9 +946,9 @@ func TestMultiTenantGetAllJobs(t *testing.T) {
workspaceC: 0,
}
params := GetQueryParamsT{JobsLimit: jobsLimit * 2}
allJobs, err := mtl.GetAllJobs(context.Background(), workspaceLimits, params, 100)
allJobs, err := mtl.GetAllJobs(context.Background(), workspaceLimits, params, 100, nil)
require.NoError(t, err, "failed to get all jobs")
require.Truef(t, len(allJobs)-2*jobsLimit == 0, "should get %d jobs", 2*jobsLimit)
require.Truef(t, len(allJobs.Jobs)-2*jobsLimit == 0, "should get %d jobs", 2*jobsLimit)
})

t.Run("GetAllJobs with payload limit", func(t *testing.T) {
Expand All @@ -958,9 +958,9 @@ func TestMultiTenantGetAllJobs(t *testing.T) {
workspaceC: 30,
}
params := GetQueryParamsT{JobsLimit: 90, PayloadSizeLimit: 6 * payloadSize}
allJobs, err := mtl.GetAllJobs(context.Background(), workspaceLimits, params, 100)
allJobs, err := mtl.GetAllJobs(context.Background(), workspaceLimits, params, 100, nil)
require.NoError(t, err, "failed to get all jobs")
require.Equal(t, 6+3, len(allJobs), "should get limit jobs +1 (overflow) per workspace")
require.Equal(t, 6+3, len(allJobs.Jobs), "should get limit jobs +1 (overflow) per workspace")
})

t.Run("GetAllJobs with payload limit less than the payload size should get one job", func(t *testing.T) {
Expand All @@ -970,9 +970,9 @@ func TestMultiTenantGetAllJobs(t *testing.T) {
workspaceC: 30,
}
params := GetQueryParamsT{JobsLimit: 90, PayloadSizeLimit: payloadSize - 1}
allJobs, err := mtl.GetAllJobs(context.Background(), workspaceLimits, params, 100)
allJobs, err := mtl.GetAllJobs(context.Background(), workspaceLimits, params, 100, nil)
require.NoError(t, err, "failed to get all jobs")
require.Equal(t, 3, len(allJobs), "should get limit+1 jobs")
require.Equal(t, 3, len(allJobs.Jobs), "should get limit+1 jobs")
})
}

Expand Down
Loading

0 comments on commit fee04b7

Please sign in to comment.