From a3596a9875aa7b5e226489c9ef3567886f8b95de Mon Sep 17 00:00:00 2001 From: Richard Park Date: Fri, 15 Dec 2023 18:42:03 -0800 Subject: [PATCH 1/2] Updating batching to allow for a configurable wait time. Can lead to fuller batches for people that want to tune it. --- sdk/messaging/azservicebus/receiver.go | 27 ++++--- sdk/messaging/azservicebus/receiver_test.go | 72 +++++++++++++++++++ .../azservicebus/receiver_unit_test.go | 1 - 3 files changed, 88 insertions(+), 12 deletions(-) diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index b1adb4188b9b..a03395e10854 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -47,7 +47,6 @@ type Receiver struct { amqpLinks internal.AMQPLinks cancelReleaser *atomic.Value cleanupOnClose func() - defaultTimeAfterFirstMsg time.Duration entityPath string lastPeekedSequenceNumber int64 maxAllowedCredits uint32 @@ -131,7 +130,6 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err receiver := &Receiver{ cancelReleaser: &atomic.Value{}, cleanupOnClose: args.cleanupOnClose, - defaultTimeAfterFirstMsg: 20 * time.Millisecond, lastPeekedSequenceNumber: 0, maxAllowedCredits: defaultLinkRxBuffer, retryOptions: args.retryOptions, @@ -143,13 +141,6 @@ func newReceiver(args newReceiverArgs, options *ReceiverOptions) (*Receiver, err return nil, err } - if receiver.receiveMode == ReceiveModeReceiveAndDelete { - // TODO: there appears to be a bit more overhead when receiving messages - // in ReceiveAndDelete. Need to investigate if this is related to our - // auto-accepting logic in go-amqp. - receiver.defaultTimeAfterFirstMsg = time.Second - } - newLinkFn := receiver.newReceiverLink if args.newLinkFn != nil { @@ -181,7 +172,13 @@ func (r *Receiver) newReceiverLink(ctx context.Context, session amqpwrap.AMQPSes // ReceiveMessagesOptions are options for the ReceiveMessages function. type ReceiveMessagesOptions struct { - // For future expansion + // TimeAfterFirstMessage controls how long, after a message has been received, before we return the + // accumulated batch of messages. + // + // Default value depends on the receive mode: + // - 20ms when the receiver is in ReceiveModePeekLock + // - 1s when the receiver is in ReceiveModeReceiveAndDelete + TimeAfterFirstMessage time.Duration } // ReceiveMessages receives a fixed number of messages, up to numMessages. @@ -400,7 +397,15 @@ func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, opt r.amqpLinks.Writef(EventReceiver, "Have %d credits, no new credits needed", currentReceiverCredits) } - result := r.fetchMessages(ctx, linksWithID.Receiver, maxMessages, r.defaultTimeAfterFirstMsg) + timeAfterFirstMessage := 20 * time.Millisecond + + if options != nil && options.TimeAfterFirstMessage > 0 { + timeAfterFirstMessage = options.TimeAfterFirstMessage + } else if r.receiveMode == ReceiveModeReceiveAndDelete { + timeAfterFirstMessage = time.Second + } + + result := r.fetchMessages(ctx, linksWithID.Receiver, maxMessages, timeAfterFirstMessage) r.amqpLinks.Writef(EventReceiver, "Received %d/%d messages", len(result.Messages), maxMessages) diff --git a/sdk/messaging/azservicebus/receiver_test.go b/sdk/messaging/azservicebus/receiver_test.go index a51b410b1d9a..bbeecb0b710e 100644 --- a/sdk/messaging/azservicebus/receiver_test.go +++ b/sdk/messaging/azservicebus/receiver_test.go @@ -5,6 +5,7 @@ package azservicebus import ( "context" + "errors" "fmt" "regexp" "sort" @@ -941,6 +942,77 @@ func TestReceiveAndSendAndReceive(t *testing.T) { require.Equal(t, msgs[0].Message(), rereceivedMsgs[0].Message(), "all sendable fields are preserved when resending") } +func TestReceiveWithDifferentWaitTime(t *testing.T) { + setup := func(t *testing.T, timeAfterFirstMessage *time.Duration) int { + serviceBusClient, cleanup, queueName := setupLiveTest(t, nil) + defer cleanup() + + sender, err := serviceBusClient.NewSender(queueName, nil) + require.NoError(t, err) + defer sender.Close(context.Background()) + + batch, err := sender.NewMessageBatch(context.Background(), nil) + require.NoError(t, err) + + bigBody := make([]byte, 1000) + + // send a bunch of messages + for i := 0; i < 1000; i++ { + err := batch.AddMessage(&Message{ + Body: bigBody, + }, nil) + + if errors.Is(err, ErrMessageTooLarge) { + err = sender.SendMessageBatch(context.Background(), batch, nil) + require.NoError(t, err) + + batch, err = sender.NewMessageBatch(context.Background(), nil) + require.NoError(t, err) + + i-- + } + } + + if batch.NumMessages() > 0 { + err = sender.SendMessageBatch(context.Background(), batch, nil) + require.NoError(t, err) + } + + receiver, err := serviceBusClient.NewReceiverForQueue(queueName, nil) + require.NoError(t, err) + + var opts *ReceiveMessagesOptions + + if timeAfterFirstMessage != nil { + opts = &ReceiveMessagesOptions{ + TimeAfterFirstMessage: *timeAfterFirstMessage, + } + + t.Logf("Setting time after first message: %s", *timeAfterFirstMessage) + } else { + t.Log("Using default time after first message") + } + + messages, err := receiver.ReceiveMessages(context.Background(), 1000, opts) + require.NoError(t, err) + + return len(messages) + } + + base := setup(t, nil) + require.NotZero(t, base) + t.Logf("Base case: %d messages", base) + + base2 := setup(t, to.Ptr[time.Duration](0)) + require.NotZero(t, base2) + t.Logf("Base case2: %d messages", base2) + + bigger := setup(t, to.Ptr(20*time.Second)) + t.Logf("Bigger: %d messages", bigger) + require.Greater(t, bigger, base) + require.Greater(t, bigger, base2) +} + type receivedMessageSlice []*ReceivedMessage func (messages receivedMessageSlice) Len() int { diff --git a/sdk/messaging/azservicebus/receiver_unit_test.go b/sdk/messaging/azservicebus/receiver_unit_test.go index 0b131f4ee9ca..af705cd1bb42 100644 --- a/sdk/messaging/azservicebus/receiver_unit_test.go +++ b/sdk/messaging/azservicebus/receiver_unit_test.go @@ -82,7 +82,6 @@ func TestReceiverCancellationUnitTests(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) r := &Receiver{ - defaultTimeAfterFirstMsg: time.Second, amqpLinks: &internal.FakeAMQPLinks{ Receiver: &internal.FakeAMQPReceiver{ ReceiveFn: func(ctx context.Context) (*amqp.Message, error) { From 2e04440e0fd22992244163c462893c088be2b329 Mon Sep 17 00:00:00 2001 From: Richard Park Date: Mon, 8 Jan 2024 17:34:01 -0800 Subject: [PATCH 2/2] Adding in proper changelog entry, prepped for release. --- sdk/messaging/azservicebus/CHANGELOG.md | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index a92a3642ba94..13aba620af71 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -1,14 +1,10 @@ # Release History -## 1.5.1 (Unreleased) +## 1.5.1 (2024-01-16) ### Features Added -### Breaking Changes - -### Bugs Fixed - -### Other Changes +- ReceiverOptions.TimeAfterFirstMessage lets you configure the amount of time, after the first message in a batch is received, before we return messages. (PR#22154) ## 1.5.0 (2023-10-10)