Skip to content

Commit

Permalink
[azservicebus] Updating batching to allow for a configurable wait time (
Browse files Browse the repository at this point in the history
#22154)

Updating batching to allow for a configurable wait time. Can lead to fuller batches for people that want to tune it.

Fixes #19172
  • Loading branch information
richardpark-msft committed Jan 9, 2024
1 parent bdf62de commit 46b3614
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 18 deletions.
8 changes: 2 additions & 6 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 1.5.1 (Unreleased)
## 1.5.1 (2024-01-16)

### Features Added

### Breaking Changes

### Bugs Fixed

### Other Changes
- ReceiverOptions.TimeAfterFirstMessage lets you configure the amount of time, after the first message in a batch is received, before we return messages. (PR#22154)

## 1.5.0 (2023-10-10)

Expand Down
27 changes: 16 additions & 11 deletions sdk/messaging/azservicebus/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type Receiver struct {
amqpLinks internal.AMQPLinks
cancelReleaser *atomic.Value
cleanupOnClose func()
defaultTimeAfterFirstMsg time.Duration
entityPath string
lastPeekedSequenceNumber int64
maxAllowedCredits uint32
Expand Down Expand Up @@ -131,7 +130,6 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
receiver := &Receiver{
cancelReleaser: &atomic.Value{},
cleanupOnClose: args.cleanupOnClose,
defaultTimeAfterFirstMsg: 20 * time.Millisecond,
lastPeekedSequenceNumber: 0,
maxAllowedCredits: defaultLinkRxBuffer,
retryOptions: args.retryOptions,
Expand All @@ -143,13 +141,6 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err
return nil, err
}

if receiver.receiveMode == ReceiveModeReceiveAndDelete {
// TODO: there appears to be a bit more overhead when receiving messages
// in ReceiveAndDelete. Need to investigate if this is related to our
// auto-accepting logic in go-amqp.
receiver.defaultTimeAfterFirstMsg = time.Second
}

newLinkFn := receiver.newReceiverLink

if args.newLinkFn != nil {
Expand Down Expand Up @@ -181,7 +172,13 @@ func (r *Receiver) newReceiverLink(ctx context.Context, session amqpwrap.AMQPSes

// ReceiveMessagesOptions are options for the ReceiveMessages function.
type ReceiveMessagesOptions struct {
// For future expansion
// TimeAfterFirstMessage controls how long, after a message has been received, before we return the
// accumulated batch of messages.
//
// Default value depends on the receive mode:
// - 20ms when the receiver is in ReceiveModePeekLock
// - 1s when the receiver is in ReceiveModeReceiveAndDelete
TimeAfterFirstMessage time.Duration
}

// ReceiveMessages receives a fixed number of messages, up to numMessages.
Expand Down Expand Up @@ -400,7 +397,15 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt
r.amqpLinks.Writef(EventReceiver, "Have %d credits, no new credits needed", currentReceiverCredits)
}

result := r.fetchMessages(ctx, linksWithID.Receiver, maxMessages, r.defaultTimeAfterFirstMsg)
timeAfterFirstMessage := 20 * time.Millisecond

if options != nil && options.TimeAfterFirstMessage > 0 {
timeAfterFirstMessage = options.TimeAfterFirstMessage
} else if r.receiveMode == ReceiveModeReceiveAndDelete {
timeAfterFirstMessage = time.Second
}

result := r.fetchMessages(ctx, linksWithID.Receiver, maxMessages, timeAfterFirstMessage)

r.amqpLinks.Writef(EventReceiver, "Received %d/%d messages", len(result.Messages), maxMessages)

Expand Down
72 changes: 72 additions & 0 deletions sdk/messaging/azservicebus/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package azservicebus

import (
"context"
"errors"
"fmt"
"regexp"
"sort"
Expand Down Expand Up @@ -941,6 +942,77 @@ func TestReceiveAndSendAndReceive(t *testing.T) {
require.Equal(t, msgs[0].Message(), rereceivedMsgs[0].Message(), "all sendable fields are preserved when resending")
}

func TestReceiveWithDifferentWaitTime(t *testing.T) {
setup := func(t *testing.T, timeAfterFirstMessage *time.Duration) int {
serviceBusClient, cleanup, queueName := setupLiveTest(t, nil)
defer cleanup()

sender, err := serviceBusClient.NewSender(queueName, nil)
require.NoError(t, err)
defer sender.Close(context.Background())

batch, err := sender.NewMessageBatch(context.Background(), nil)
require.NoError(t, err)

bigBody := make([]byte, 1000)

// send a bunch of messages
for i := 0; i < 1000; i++ {
err := batch.AddMessage(&Message{
Body: bigBody,
}, nil)

if errors.Is(err, ErrMessageTooLarge) {
err = sender.SendMessageBatch(context.Background(), batch, nil)
require.NoError(t, err)

batch, err = sender.NewMessageBatch(context.Background(), nil)
require.NoError(t, err)

i--
}
}

if batch.NumMessages() > 0 {
err = sender.SendMessageBatch(context.Background(), batch, nil)
require.NoError(t, err)
}

receiver, err := serviceBusClient.NewReceiverForQueue(queueName, nil)
require.NoError(t, err)

var opts *ReceiveMessagesOptions

if timeAfterFirstMessage != nil {
opts = &ReceiveMessagesOptions{
TimeAfterFirstMessage: *timeAfterFirstMessage,
}

t.Logf("Setting time after first message: %s", *timeAfterFirstMessage)
} else {
t.Log("Using default time after first message")
}

messages, err := receiver.ReceiveMessages(context.Background(), 1000, opts)
require.NoError(t, err)

return len(messages)
}

base := setup(t, nil)
require.NotZero(t, base)
t.Logf("Base case: %d messages", base)

base2 := setup(t, to.Ptr[time.Duration](0))
require.NotZero(t, base2)
t.Logf("Base case2: %d messages", base2)

bigger := setup(t, to.Ptr(20*time.Second))
t.Logf("Bigger: %d messages", bigger)
require.Greater(t, bigger, base)
require.Greater(t, bigger, base2)
}

type receivedMessageSlice []*ReceivedMessage

func (messages receivedMessageSlice) Len() int {
Expand Down
1 change: 0 additions & 1 deletion sdk/messaging/azservicebus/receiver_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func TestReceiverCancellationUnitTests(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

r := &Receiver{
defaultTimeAfterFirstMsg: time.Second,
amqpLinks: &internal.FakeAMQPLinks{
Receiver: &internal.FakeAMQPReceiver{
ReceiveFn: func(ctx context.Context) (*amqp.Message, error) {
Expand Down

0 comments on commit 46b3614

Please sign in to comment.