Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Testing] flaky tests fix test async uploader - Test_AsyncUploader/stopping_component_stops_retrying #2453

Merged
merged 10 commits into from
May 27, 2022
Merged
87 changes: 67 additions & 20 deletions engine/execution/computation/computer/uploader/uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"os"
"runtime/debug"
"sync"
"testing"
"time"
Expand All @@ -19,25 +20,26 @@ import (
"github.com/onflow/flow-go/engine/execution/state/unittest"
"github.com/onflow/flow-go/module/metrics"
testutils "github.com/onflow/flow-go/utils/unittest"
unittest2 "github.com/onflow/flow-go/utils/unittest"
)

func Test_AsyncUploader(t *testing.T) {

computationResult := unittest.ComputationResultFixture(nil)

t.Run("uploads are run in parallel and emit metrics", func(t *testing.T) {
wgCalled := sync.WaitGroup{}
wgCalled.Add(3)
wgUploadStarted := sync.WaitGroup{}
wgUploadStarted.Add(3)

wgAllDone := sync.WaitGroup{}
wgAllDone.Add(1)
wgContinueUpload := sync.WaitGroup{}
wgContinueUpload.Add(1)

uploader := &DummyUploader{
f: func() error {
// this should be called 3 times
wgCalled.Done()
wgUploadStarted.Done()

wgAllDone.Wait()
wgContinueUpload.Wait()

return nil
},
Expand All @@ -55,12 +57,13 @@ func Test_AsyncUploader(t *testing.T) {
err = async.Upload(computationResult)
require.NoError(t, err)

wgCalled.Wait() // all three are in progress, check metrics
wgUploadStarted.Wait() // all three are in progress, check metrics

require.Equal(t, int64(3), metrics.Counter.Load())

wgAllDone.Done() //release all
wgContinueUpload.Done() //release all

// shut down component
<-async.Done()

require.Equal(t, int64(0), metrics.Counter.Load())
Expand All @@ -76,6 +79,7 @@ func Test_AsyncUploader(t *testing.T) {

uploader := &DummyUploader{
f: func() error {
// force an upload error to test that upload is retried 3 times
if callCount < 3 {
callCount++
return fmt.Errorf("artificial upload error")
Expand All @@ -95,35 +99,78 @@ func Test_AsyncUploader(t *testing.T) {
require.Equal(t, 3, callCount)
})

time.Sleep(1 * time.Second)

// This test shuts down the async uploader right after the upload has started. The upload has an error to force
// the retry mechanism to kick in (under normal circumstances). Since the component is shutting down, the retry
// should not kick in.
//
// sequence of events:
// 1. create async uploader and initiate upload with an error - to force retrying
// 2. shut down async uploader right after upload initiated (not completed)
// 3. assert that upload called only once even when trying to use retry mechanism
t.Run("stopping component stops retrying", func(t *testing.T) {
testutils.SkipUnless(t, testutils.TEST_FLAKY, "flaky")

callCount := 0
t.Log("test started grID:", string(bytes.Fields(debug.Stack())[1]))

wg := sync.WaitGroup{}
wg.Add(1)
// this wait group ensures that async uploader has a chance to start the upload before component is shut down
// otherwise, there's a race condition that can happen where the component can shut down before the async uploader
// has a chance to start the upload
wgUploadStarted := sync.WaitGroup{}
wgUploadStarted.Add(1)

// this wait group ensures that async uploader won't send an error (to test if retry will kick in) until
// the component has initiated shutting down (which should stop retry from working)
wgShutdownStarted := sync.WaitGroup{}
wgShutdownStarted.Add(1)
t.Log("added 1 to wait group grID:", string(bytes.Fields(debug.Stack())[1]))

uploader := &DummyUploader{
f: func() error {
defer func() {
t.Log("DummyUploader func() - about to call wgUploadStarted.Done() grID:", string(bytes.Fields(debug.Stack())[1]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we should use debug.Stack() on normal operation. I get it helps, but seems like potential pitfall.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's strictly for debugging purposes to log the goroutine id - it was very helpful in debugging this test and I think it's something we should use for tests with multiple goroutines. What's the potential pitfall?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's a better way to log the goroutine ID, let's use that, but this is the most convenient way I found.

// signal to main goroutine that upload started, so it can initiate shutting down component
wgUploadStarted.Done()

t.Log("DummyUpload func() waiting for component shutdown to start grID:", string(bytes.Fields(debug.Stack())[1]))
wgShutdownStarted.Wait()
t.Log("DummyUploader func() component shutdown started, about to return error grID:", string(bytes.Fields(debug.Stack())[1]))

// force an upload error to test that upload is never retried (because component is shut down)
// normally, we would see retry mechanism kick in and the callCount would be > 1
// but since component has started shutting down, we expect callCount to be 1
// In summary, callCount SHOULD be called only once - but we want the test to TRY and call it more than once to prove that it
// was only called it once. If we changed it to 'callCount < 1' that wouldn't prove that the test tried to call it more than once
// and wouldn't prove that stopping the component stopped the retry mechanism.
if callCount < 5 {
gomisha marked this conversation as resolved.
Show resolved Hide resolved
t.Logf("DummyUploader func() incrementing callCount=%d grID: %s", callCount, string(bytes.Fields(debug.Stack())[1]))
callCount++
}()
wg.Wait()
return fmt.Errorf("this should return only once")
t.Logf("DummyUploader func() about to return error callCount=%d grID: %s", callCount, string(bytes.Fields(debug.Stack())[1]))
return fmt.Errorf("this should return only once")
}
return nil
},
}

t.Log("about to create NewAsyncUploader grID:", string(bytes.Fields(debug.Stack())[1]))
async := NewAsyncUploader(uploader, 1*time.Nanosecond, 5, zerolog.Nop(), &metrics.NoopCollector{})

t.Log("about to call async.Upload() grID:", string(bytes.Fields(debug.Stack())[1]))
err := async.Upload(computationResult) // doesn't matter what we upload
require.NoError(t, err)

// stop component and check that it's fully stopped
t.Log("about to close async uploader grID:", string(bytes.Fields(debug.Stack())[1]))

// wait until upload has started before shutting down the component
wgUploadStarted.Wait()

// stop component and check that it's fully stopped
t.Log("about to initiate shutdown grID: ", string(bytes.Fields(debug.Stack())[1]))
c := async.Done()
wg.Done()
<-c
gomisha marked this conversation as resolved.
Show resolved Hide resolved
t.Log("about to notify upload() that shutdown started and can continue uploading grID:", string(bytes.Fields(debug.Stack())[1]))
wgShutdownStarted.Done()
t.Log("about to check async done channel is closed grID:", string(bytes.Fields(debug.Stack())[1]))
unittest2.RequireCloseBefore(t, c, 1*time.Second, "async uploader not closed in time")

t.Log("about to check if callCount is 1 grID:", string(bytes.Fields(debug.Stack())[1]))
require.Equal(t, 1, callCount)
})

Expand Down