From 39292863899b943973837e383f078525316d3215 Mon Sep 17 00:00:00 2001 From: Dimitris Halatsis Date: Fri, 9 Aug 2024 20:25:50 +0300 Subject: [PATCH] pubsub: tidy and cleanup (#3462) --- pubsub/awssnssqs/awssnssqs.go | 6 +++--- pubsub/pubsub.go | 20 +++++++------------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/pubsub/awssnssqs/awssnssqs.go b/pubsub/awssnssqs/awssnssqs.go index b9bfa2a4ad..e31e1cc195 100644 --- a/pubsub/awssnssqs/awssnssqs.go +++ b/pubsub/awssnssqs/awssnssqs.go @@ -1010,8 +1010,8 @@ func errorCode(err error) gcerrors.ErrorCode { var ae smithy.APIError if errors.As(err, &ae) { code = ae.ErrorCode() - } else if ae, ok := err.(awserr.Error); ok { - code = ae.Code() + } else if awsErr, ok := err.(awserr.Error); ok { + code = awsErr.Code() } else { return gcerrors.Unknown } @@ -1434,7 +1434,7 @@ func (s *subscription) SendNacks(ctx context.Context, ids []driver.AckID) error } numFailed++ } - if numFailed > 0 { + if numFailed > 0 && firstFail != nil { return awserr.New(aws.StringValue(firstFail.Code), fmt.Sprintf("sqs.ChangeMessageVisibilityBatch failed for %d message(s): %s", numFailed, aws.StringValue(firstFail.Message)), nil) } return nil diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index 077637fb4e..9569747cfe 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -66,6 +66,7 @@ package pubsub // import "gocloud.dev/pubsub" import ( "context" + "errors" "fmt" "log" "math" @@ -225,11 +226,6 @@ type Topic struct { cancel func() } -type msgErrChan struct { - msg *Message - errChan chan error -} - // Send publishes a message. It only returns after the message has been // sent, or failed to be sent. Send can be called from multiple goroutines // at once. @@ -276,7 +272,7 @@ func (t *Topic) Shutdown(ctx context.Context) (err error) { defer func() { t.tracer.End(ctx, err) }() t.mu.Lock() - if t.err == errTopicShutdown { + if errors.Is(t.err, errTopicShutdown) { defer t.mu.Unlock() return t.err } @@ -319,7 +315,6 @@ var NewTopic = newTopic // newSendBatcher creates a batcher for topics, for use with NewTopic. func newSendBatcher(ctx context.Context, t *Topic, dt driver.Topic, opts *batcher.Options) *batcher.Batcher { - const maxHandlers = 1 handler := func(items interface{}) error { dms := items.([]*driver.Message) err := retry.Call(ctx, gax.Backoff{}, dt.IsRetryable, func() (err error) { @@ -486,10 +481,10 @@ func (s *Subscription) updateBatchSize() int { // We first combine the previous value and the new value, with weighting // based on decay, and then cap the growth/shrinkage. newBatchSize := s.runningBatchSize*(1-decay) + idealBatchSize*decay - if max := s.runningBatchSize * maxGrowthFactor; newBatchSize > max { - s.runningBatchSize = max - } else if min := s.runningBatchSize * maxShrinkFactor; newBatchSize < min { - s.runningBatchSize = min + if maxSize := s.runningBatchSize * maxGrowthFactor; newBatchSize > maxSize { + s.runningBatchSize = maxSize + } else if minSize := s.runningBatchSize * maxShrinkFactor; newBatchSize < minSize { + s.runningBatchSize = minSize } else { s.runningBatchSize = newBatchSize } @@ -699,7 +694,7 @@ func (s *Subscription) Shutdown(ctx context.Context) (err error) { defer func() { s.tracer.End(ctx, err) }() s.mu.Lock() - if s.err == errSubscriptionShutdown { + if errors.Is(s.err, errSubscriptionShutdown) { // Already Shutdown. defer s.mu.Unlock() return s.err @@ -774,7 +769,6 @@ func newSubscription(ds driver.Subscription, recvBatchOpts, ackBatcherOpts *batc } func newAckBatcher(ctx context.Context, s *Subscription, ds driver.Subscription, opts *batcher.Options) *batcher.Batcher { - const maxHandlers = 1 handler := func(items interface{}) error { var acks, nacks []driver.AckID for _, a := range items.([]*driver.AckInfo) {