Skip to content

Commit

Permalink
Refactor subscription intercepter to pass subscription info
Browse files Browse the repository at this point in the history
  • Loading branch information
k-yomo committed May 14, 2021
1 parent 4fec514 commit bc4b312
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 43 deletions.
11 changes: 7 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ go 1.16

require (
cloud.google.com/go v0.81.0 // indirect
cloud.google.com/go/pubsub v1.10.2
cloud.google.com/go/pubsub v1.10.3
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/pkg/errors v0.9.1
go.uber.org/multierr v1.7.0 // indirect
go.uber.org/zap v1.16.0 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c // indirect
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 // indirect
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect
golang.org/x/net v0.0.0-20210510120150-4163338589ed // indirect
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744 // indirect
golang.org/x/text v0.3.6 // indirect
google.golang.org/genproto v0.0.0-20210513213006-bf773b8c8384 // indirect
google.golang.org/grpc v1.37.1 // indirect
)
46 changes: 46 additions & 0 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion middleware/pm_autoack/pm_autoack.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// SubscriptionInterceptor automatically ack / nack subscription based on the returned error.
func SubscriptionInterceptor() pm.SubscriptionInterceptor {
return func(next pm.MessageHandler) pm.MessageHandler {
return func(_ *pm.SubscriptionInfo, next pm.MessageHandler) pm.MessageHandler {
return func(ctx context.Context, m *pubsub.Message) error {
err := next(ctx, m)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion middleware/pm_recovery/pm_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func SubscriptionInterceptor(opt ...Option) pm.SubscriptionInterceptor {
for _, o := range opt {
o(&opts)
}
return func(next pm.MessageHandler) pm.MessageHandler {
return func(_ *pm.SubscriptionInfo, next pm.MessageHandler) pm.MessageHandler {
return func(ctx context.Context, m *pubsub.Message) (err error) {
defer func() {
if r := recover(); r != nil {
Expand Down
7 changes: 4 additions & 3 deletions middleware/pm_recovery/pm_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pm_recovery
import (
"cloud.google.com/go/pubsub"
"context"
"github.com/k-yomo/pm"
"testing"
)

Expand All @@ -16,7 +17,7 @@ func TestSubscriptionInterceptor(t *testing.T) {
t.Run("recovers with default recovery handler", func(t *testing.T) {
t.Parallel()
interceptor := SubscriptionInterceptor()
_ = interceptor(next)(context.Background(), &pubsub.Message{})
_ = interceptor(&pm.SubscriptionInfo{}, next)(context.Background(), &pubsub.Message{})
})

t.Run("recovers with default recovery handler", func(t *testing.T) {
Expand All @@ -27,7 +28,7 @@ func TestSubscriptionInterceptor(t *testing.T) {
called = true
})}
interceptor := SubscriptionInterceptor(opts...)
_ = interceptor(next)(context.Background(), &pubsub.Message{})
_ = interceptor(&pm.SubscriptionInfo{}, next)(context.Background(), &pubsub.Message{})
if !called {
t.Error("The custom recovery handler is not called")
}
Expand All @@ -38,6 +39,6 @@ func TestSubscriptionInterceptor(t *testing.T) {

opts := []Option{WithDebugRecoveryHandler()}
interceptor := SubscriptionInterceptor(opts...)
_ = interceptor(next)(context.Background(), &pubsub.Message{})
_ = interceptor(&pm.SubscriptionInfo{}, next)(context.Background(), &pubsub.Message{})
})
}
4 changes: 0 additions & 4 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ type MessagePublisher = func(ctx context.Context, topic *pubsub.Topic, m *pubsub
// PublishInterceptor provides a hook to intercept the execution of a publishment.
type PublishInterceptor = func(next MessagePublisher) MessagePublisher

type publisherOptions struct {
publishInterceptors []PublishInterceptor
}

// Publisher represents a wrapper of Pub/Sub client focusing on publishment.
type Publisher struct {
opts *publisherOptions
Expand Down
4 changes: 4 additions & 0 deletions publisher_option.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package pm

type publisherOptions struct {
publishInterceptors []PublishInterceptor
}

// PublisherOption is a option to change publisher configuration.
type PublisherOption interface {
apply(*publisherOptions)
Expand Down
53 changes: 30 additions & 23 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,28 @@ package pm
import (
"cloud.google.com/go/pubsub"
"context"
"github.com/pkg/errors"
"fmt"
"log"
)

// MessageHandler defines the message handler invoked by SubscriptionInterceptor to complete the normal
// message handling.
type MessageHandler = func(ctx context.Context, m *pubsub.Message) error

// SubscriptionInterceptor provides a hook to intercept the execution of a message handling.
type SubscriptionInterceptor = func(next MessageHandler) MessageHandler

type subscriberOptions struct {
subscriptionInterceptors []SubscriptionInterceptor
}

// Subscriber represents a wrapper of Pub/Sub client mainly focusing on pull subscription.
type Subscriber struct {
opts *subscriberOptions
pubsubClient *pubsub.Client
subscriptionHandlers map[string]MessageHandler
subscriptionHandlers map[string]*subscriptionHandler
cancel context.CancelFunc
}

type subscriptionHandler struct {
topicID string
subscription *pubsub.Subscription
handleFunc MessageHandler
}

// MessageHandler defines the message handler invoked by SubscriptionInterceptor to complete the normal
// message handling.
type MessageHandler = func(ctx context.Context, m *pubsub.Message) error

// NewSubscriber initializes new Subscriber.
func NewSubscriber(pubsubClient *pubsub.Client, opt ...SubscriberOption) *Subscriber {
opts := subscriberOptions{}
Expand All @@ -35,22 +34,26 @@ func NewSubscriber(pubsubClient *pubsub.Client, opt ...SubscriberOption) *Subscr
return &Subscriber{
opts: &opts,
pubsubClient: pubsubClient,
subscriptionHandlers: map[string]MessageHandler{},
subscriptionHandlers: map[string]*subscriptionHandler{},
}
}

// HandleSubscriptionFunc registers subscription handler for the given id's subscription.
// If subscription does not exist, it will return error.
func (s *Subscriber) HandleSubscriptionFunc(subscriptionID string, f MessageHandler) error {
if _, ok := s.subscriptionHandlers[subscriptionID]; ok {
return fmt.Errorf("handler for subscription '%s' is already registered", subscriptionID)
}
sub := s.pubsubClient.Subscription(subscriptionID)
ok, err := sub.Exists(context.Background())
cfg, err := sub.Config(context.Background())
if err != nil {
return err
}
if !ok {
return errors.Errorf("pubsub subscription '%s' does not exist", subscriptionID)
s.subscriptionHandlers[subscriptionID] = &subscriptionHandler{
topicID: cfg.Topic.ID(),
subscription: sub,
handleFunc: f,
}
s.subscriptionHandlers[subscriptionID] = f

return nil
}
Expand All @@ -60,19 +63,23 @@ func (s *Subscriber) Run(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel

for subscriptionID, f := range s.subscriptionHandlers {
for subscriptionID, handler := range s.subscriptionHandlers {
sub := s.pubsubClient.Subscription(subscriptionID)
f := f
h := handler
subscriptionInfo := SubscriptionInfo{
TopicID: h.topicID,
SubscriptionID: h.subscription.ID(),
}
go func() {
last := f
last := h.handleFunc
for i := len(s.opts.subscriptionInterceptors) - 1; i >= 0; i-- {
last = s.opts.subscriptionInterceptors[i](last)
last = s.opts.subscriptionInterceptors[i](&subscriptionInfo, last)
}
err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
_ = last(ctx, m)
})
if err != nil {
log.Printf("%v\n", err)
log.Printf("%+v\n", err)
}
}()
}
Expand Down
4 changes: 4 additions & 0 deletions subscriber_option.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package pm

type subscriberOptions struct {
subscriptionInterceptors []SubscriptionInterceptor
}

// SubscriberOption is a option to change subscriber configuration.
type SubscriberOption interface {
apply(*subscriberOptions)
Expand Down
25 changes: 18 additions & 7 deletions subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestNewSubscriber(t *testing.T) {
want: &Subscriber{
opts: &subscriberOptions{subscriptionInterceptors: []SubscriptionInterceptor{nil}},
pubsubClient: &pubsub.Client{},
subscriptionHandlers: map[string]MessageHandler{},
subscriptionHandlers: map[string]*subscriptionHandler{},
cancel: nil,
},
},
Expand Down Expand Up @@ -89,13 +89,24 @@ func TestSubscriber_HandleSubscriptionFunc(t *testing.T) {
subscriber: NewSubscriber(pubsubClient),
args: args{
subscriptionID: sub.ID(),
f: func(ctx context.Context, m *pubsub.Message) error {
return nil
},
f: func(ctx context.Context, m *pubsub.Message) error { return nil },
},
},
{
name: "if the subscription does not exist, it returns error",
name: "when a handler is already registered for the give subscription id, it returns error",
subscriber: func() *Subscriber {
s := NewSubscriber(pubsubClient)
_ = s.HandleSubscriptionFunc(sub.ID(), func(ctx context.Context, m *pubsub.Message) error { return nil })
return s
}(),
args: args{
subscriptionID: sub.ID(),
f: func(ctx context.Context, m *pubsub.Message) error { return nil },
},
wantErr: true,
},
{
name: "when the subscription does not exist, it returns error",
subscriber: NewSubscriber(pubsubClient),
args: args{
subscriptionID: "invalid",
Expand Down Expand Up @@ -142,12 +153,12 @@ func TestSubscriber_Run(t *testing.T) {

subscriber := NewSubscriber(
pubsubClient,
WithSubscriptionInterceptor(func(next MessageHandler) MessageHandler {
WithSubscriptionInterceptor(func(_ *SubscriptionInfo, next MessageHandler) MessageHandler {
return func(ctx context.Context, m *pubsub.Message) error {
m.Attributes = map[string]string{"intercepted": "abc"}
return next(ctx, m)
}
}, func(next MessageHandler) MessageHandler {
}, func(_ *SubscriptionInfo, next MessageHandler) MessageHandler {
return func(ctx context.Context, m *pubsub.Message) error {
// this will overwrite the first interceptor
m.Attributes = map[string]string{"intercepted": "true"}
Expand Down
10 changes: 10 additions & 0 deletions subscription_intercepter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pm

// SubscriptionInfo contains various info about the subscription.
type SubscriptionInfo struct {
TopicID string
SubscriptionID string
}

// SubscriptionInterceptor provides a hook to intercept the execution of a message handling.
type SubscriptionInterceptor = func(info *SubscriptionInfo, next MessageHandler) MessageHandler

0 comments on commit bc4b312

Please sign in to comment.