Skip to content

Commit

Permalink
test(queue): improve test stability (#48)
Browse files Browse the repository at this point in the history
* test(queue): improve test stability
  • Loading branch information
Reasno committed Feb 23, 2021
1 parent 41c2b88 commit b13eb0b
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 33 deletions.
2 changes: 1 addition & 1 deletion ots3/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestManager_UploadFromUrl(t *testing.T) {
t.Parallel()
tracer := mocktracer.New()
m := setupManagerWithTracer(tracer)
newURL, err := m.UploadFromUrl(context.Background(), "https://box.bdimg.com/static/fisp_static/common/img/searchbox/logo_news_276_88_1f9876a.png")
newURL, err := m.UploadFromUrl(context.Background(), "https://www.donews.com/static/v2/images/full-logo.png")
assert.NoError(t, err)
assert.NotEmpty(t, newURL)
assert.NotEmpty(t, tracer.FinishedSpans())
Expand Down
85 changes: 53 additions & 32 deletions queue/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package queue
import (
"context"
"errors"
"go.uber.org/atomic"
"math/rand"
"time"

Expand Down Expand Up @@ -74,7 +73,7 @@ func setUp() *QueueableDispatcher {
}

func TestDispatcher_work(t *testing.T) {
rand.Seed(int64(time.Now().Unix()))
rand.Seed(time.Now().Unix())
cases := []struct {
name string
value contract.Event
Expand Down Expand Up @@ -156,7 +155,7 @@ func TestDispatcher_work(t *testing.T) {
func TestDispatcher_Consume(t *testing.T) {
consumer := setUp()

var called atomic.String
var called = make(chan string)
cases := []struct {
name string
evt contract.Event
Expand All @@ -169,12 +168,12 @@ func TestDispatcher_Consume(t *testing.T) {
func(ctx context.Context, event contract.Event) error {
assert.IsType(t, MockEvent{}, event.Data())
assert.Equal(t, "hello", event.Data().(MockEvent).Value)
called.Store("ordinary message")
called <- "ordinary message"
return nil
},
func() {
assert.Equal(t, "ordinary message", called.String())
called.Store("")
str := <-called
assert.Equal(t, "ordinary message", str)
},
},
{
Expand All @@ -183,49 +182,59 @@ func TestDispatcher_Consume(t *testing.T) {
func(ctx context.Context, event contract.Event) error {
assert.IsType(t, MockEvent{}, event.Data())
assert.Equal(t, "hello", event.Data().(MockEvent).Value)
called.Store("persist message")
called <- "persist message"
return nil
},
func() {
time.Sleep(5 * time.Millisecond)
assert.Equal(t, "persist message", called.String())
called.Store("")
str := <-called
assert.Equal(t, "persist message", str)
},
},
{
"deferred message",
Persist(events.Of(MockEvent{Value: "hello", Called: new(bool)}), Defer(2*time.Second)),
func(ctx context.Context, event contract.Event) error {
called.Store("deferred message")
called <- "deferred message"
return nil
},
func() {
time.Sleep(1 * time.Second)
assert.NotEqual(t, "deferred message", called.String())
called.Store("")
time.Sleep(2 * time.Second)
var str string
select {
case str = <-called:
case <-time.After(time.Second):
}
assert.NotEqual(t, "deferred message", str)
str = <-called
assert.Equal(t, "deferred message", str)
},
},
{
"deferred message but called",
Persist(events.Of(MockEvent{Value: "hello", Called: new(bool)}), Defer(time.Second)),
func(ctx context.Context, event contract.Event) error {
called.Store("deferred message but called")
called <- "deferred message but called"
return nil
},
func() {
time.Sleep(2 * time.Second)
assert.Equal(t, "deferred message but called", called.String())
called.Store("")
var str string
select {
case str = <-called:
case <-time.After(2 * time.Second):
}
assert.Equal(t, "deferred message but called", str)
},
},
{
"failed message",
Persist(events.Of(MockEvent{Value: "hello"})),
func(ctx context.Context, event contract.Event) error {
defer func() {
called <- "failed message"
}()
return errors.New("some err")
},
func() {
<-called
time.Sleep(100 * time.Millisecond)
info, _ := consumer.driver.Info(context.Background())
assert.Equal(t, int64(1), info.Failed)
Expand All @@ -237,14 +246,18 @@ func TestDispatcher_Consume(t *testing.T) {
"retry message",
Persist(events.Of(MockEvent{Value: "hello"}), MaxAttempts(2)),
func(ctx context.Context, event contract.Event) error {
if called.String() != "retry message" {
called.Store("retry message")
var firstTry = make(chan struct{}, 1)
select {
case <-firstTry:
return nil
default:
firstTry <- struct{}{}
called <- "retry message"
return errors.New("some err")
}
return nil
},
func() {
time.Sleep(5 * time.Millisecond)
<-called
info, _ := consumer.driver.Info(context.Background())
assert.Equal(t, int64(0), info.Failed)
},
Expand All @@ -253,32 +266,40 @@ func TestDispatcher_Consume(t *testing.T) {
"reload message",
Persist(events.Of(MockEvent{Value: "hello"}), Timeout(time.Second)),
func(ctx context.Context, event contract.Event) error {
if called.String() != "reload message" {
var firstTry = make(chan struct{}, 1)
select {
case <-firstTry:
return nil
default:
firstTry <- struct{}{}
called <- "reload message"
return errors.New("some err")
}
return nil
},
func() {
time.Sleep(5 * time.Millisecond)
called.Store("reload message")
<-called
time.Sleep(100 * time.Millisecond)
num, _ := consumer.driver.Reload(context.Background(), "failed")
assert.Equal(t, int64(1), num)
time.Sleep(5 * time.Millisecond)
info, _ := consumer.driver.Info(context.Background())
assert.Equal(t, int64(0), info.Failed)
called.Store("")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
dispatcher := setUp()
ctx, cancel := context.WithCancel(context.Background())
go dispatcher.Consume(ctx)
defer cancel()
dispatcher.Subscribe(c.ln)
err := dispatcher.Dispatch(context.Background(), c.evt)
assert.NoError(t, err)

go dispatcher.Consume(ctx)
go func() {
dispatcher.Subscribe(c.ln)
err := dispatcher.Dispatch(context.Background(), c.evt)
assert.NoError(t, err)
}()

c.called()
})
}
Expand Down

0 comments on commit b13eb0b

Please sign in to comment.