Skip to content

Commit

Permalink
pubsub/awssnssqs: Fix race condition in new test, and call ack to avo…
Browse files Browse the repository at this point in the history
…id warning (#3463)
  • Loading branch information
vangent committed Aug 10, 2024
1 parent b70270a commit 47ba1ac
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,35 +288,40 @@ type secondReceiveBlockedDriverSub struct {
}

func (s *secondReceiveBlockedDriverSub) ReceiveBatch(ctx context.Context, _ int) ([]*driver.Message, error) {
s.receiveCounter.Add(1)
if s.receiveCounter.Load() > 1 {
// wait after 1st request for the context to finish before returning the batch result
// The first request will return a message right away.
// The second one will block ~forever.
if n := s.receiveCounter.Add(1); n > 1 {
<-ctx.Done()
}
msg := &driver.Message{Body: []byte(fmt.Sprintf("message #%d", s.receiveCounter.Load()))}
return []*driver.Message{msg}, nil
}
func (*secondReceiveBlockedDriverSub) CanNack() bool { return false }
func (*secondReceiveBlockedDriverSub) IsRetryable(error) bool { return false }
func (*secondReceiveBlockedDriverSub) Close() error { return nil }
func (*secondReceiveBlockedDriverSub) CanNack() bool { return false }
func (*secondReceiveBlockedDriverSub) SendAcks(_ context.Context, _ []driver.AckID) error { return nil }
func (*secondReceiveBlockedDriverSub) IsRetryable(error) bool { return false }
func (*secondReceiveBlockedDriverSub) Close() error { return nil }

// TestIndependentBatchReturn verifies that when multiple batch requests are sent,
// as long as one of them succeeds it should not block Subscription.Receive.
func TestIndependentBatchReturn(t *testing.T) {
// We want to test the scenario when multiple batch requests are sent, as long as one of them succeeds, it should
// not block the Subscription.Receive result
s := NewSubscription(
&secondReceiveBlockedDriverSub{},
&batcher.Options{MaxBatchSize: 1, MaxHandlers: 2}, // force 2 batches, by allowing 2 handlers and 1 msg per batch
&batcher.Options{MaxBatchSize: 1, MaxHandlers: 2}, // force 2 batches by allowing 2 handlers and 1 msg per batch
nil,
)
// set the false calculated subscription batch size to force 2 batches to be called
s.runningBatchSize = 2
ctx := context.Background()
defer s.Shutdown(ctx)
_, err := s.Receive(ctx)

// Set the batch size to force 2 batches to be called.
s.runningBatchSize = 2
ctxTimeout, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
m, err := s.Receive(ctxTimeout)
if err != nil {
t.Fatal("Receive should not fail", err)
return
}
m.Ack()
}

func TestRetryTopic(t *testing.T) {
Expand Down

0 comments on commit 47ba1ac

Please sign in to comment.