Skip to content

Commit

Permalink
Rename config to opts
Browse files Browse the repository at this point in the history
  • Loading branch information
k-yomo committed Apr 5, 2021
1 parent fb0c35b commit 7f0ea42
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 23 deletions.
11 changes: 5 additions & 6 deletions publiser.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,19 @@ type MessagePublisher = func(ctx context.Context, topic *pubsub.Topic, m *pubsub
// PublishInterceptor provides a hook to intercept the execution of a publishment.
type PublishInterceptor = func(next MessagePublisher) MessagePublisher

// PublisherConfig represents a configuration of Publisher.
type PublisherConfig struct {
type publisherOptions struct {
publishInterceptors []PublishInterceptor
}

// Publisher represents a wrapper of Pub/Sub client focusing on publishment.
type Publisher struct {
config *PublisherConfig
opts *publisherOptions
*pubsub.Client
}

// NewPublisher initializes new Publisher.
func NewPublisher(pubsubClient *pubsub.Client, opts ...PublisherOption) *Publisher {
c := PublisherConfig{}
c := publisherOptions{}
for _, o := range opts {
o.apply(&c)
}
Expand All @@ -38,8 +37,8 @@ func NewPublisher(pubsubClient *pubsub.Client, opts ...PublisherOption) *Publish
// Publish publishes Pub/Sub message with applying middlewares
func (p *Publisher) Publish(ctx context.Context, topic *pubsub.Topic, m *pubsub.Message) *pubsub.PublishResult {
last := publish
for i := len(p.config.publishInterceptors) - 1; i >= 0; i-- {
last = p.config.publishInterceptors[i](last)
for i := len(p.opts.publishInterceptors) - 1; i >= 0; i-- {
last = p.opts.publishInterceptors[i](last)
}
return last(ctx, topic, m)
}
Expand Down
10 changes: 5 additions & 5 deletions publisher_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@ package pm

// PublisherOption is a option to change publisher configuration.
type PublisherOption interface {
apply(*PublisherConfig)
apply(*publisherOptions)
}

type publisherOptionFunc struct {
f func(config *PublisherConfig)
f func(config *publisherOptions)
}

func (p *publisherOptionFunc) apply(pc *PublisherConfig) {
func (p *publisherOptionFunc) apply(pc *publisherOptions) {
p.f(pc)
}

func newPublisherOptionFunc(f func(pc *PublisherConfig)) *publisherOptionFunc {
func newPublisherOptionFunc(f func(pc *publisherOptions)) *publisherOptionFunc {
return &publisherOptionFunc{
f: f,
}
}

// WithPublishInterceptor sets publish interceptors.
func WithPublishInterceptor(interceptors ...PublishInterceptor) PublisherOption {
return newPublisherOptionFunc(func(sc *PublisherConfig) {
return newPublisherOptionFunc(func(sc *publisherOptions) {
sc.publishInterceptors = interceptors
})
}
13 changes: 6 additions & 7 deletions subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,26 @@ type MessageHandler = func(ctx context.Context, m *pubsub.Message) error
// SubscriptionInterceptor provides a hook to intercept the execution of a message handling.
type SubscriptionInterceptor = func(next MessageHandler) MessageHandler

// SubscriberConfig represents a configuration of Subscriber.
type SubscriberConfig struct {
type subscriberOptions struct {
subscriptionInterceptors []SubscriptionInterceptor
}

// Subscriber represents a wrapper of Pub/Sub client mainly focusing on pull subscription.
type Subscriber struct {
config *SubscriberConfig
opts *subscriberOptions
pubsubClient *pubsub.Client
subscriptionHandlers map[string]MessageHandler
cancel context.CancelFunc
}

// NewSubscriber initializes new Subscriber.
func NewSubscriber(pubsubClient *pubsub.Client, opts ...SubscriberOption) *Subscriber {
c := SubscriberConfig{}
c := subscriberOptions{}
for _, o := range opts {
o.apply(&c)
}
return &Subscriber{
config: &c,
opts: &c,
pubsubClient: pubsubClient,
subscriptionHandlers: map[string]MessageHandler{},
}
Expand Down Expand Up @@ -65,8 +64,8 @@ func (p *Subscriber) Run() {
f := f
go func() {
last := f
for i := len(p.config.subscriptionInterceptors) - 1; i >= 0; i-- {
last = p.config.subscriptionInterceptors[i](last)
for i := len(p.opts.subscriptionInterceptors) - 1; i >= 0; i-- {
last = p.opts.subscriptionInterceptors[i](last)
}
err := sub.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
_ = last(ctx, m)
Expand Down
10 changes: 5 additions & 5 deletions subscriber_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@ package pm

// SubscriberOption is a option to change subscriber configuration.
type SubscriberOption interface {
apply(*SubscriberConfig)
apply(*subscriberOptions)
}

type subscriberOptionFunc struct {
f func(config *SubscriberConfig)
f func(config *subscriberOptions)
}

func (s *subscriberOptionFunc) apply(sc *SubscriberConfig) {
func (s *subscriberOptionFunc) apply(sc *subscriberOptions) {
s.f(sc)
}

func newSubscriberOptionFunc(f func(sc *SubscriberConfig)) *subscriberOptionFunc {
func newSubscriberOptionFunc(f func(sc *subscriberOptions)) *subscriberOptionFunc {
return &subscriberOptionFunc{
f: f,
}
}

// WithSubscriptionInterceptor sets subscription interceptors.
func WithSubscriptionInterceptor(interceptors ...SubscriptionInterceptor) SubscriberOption {
return newSubscriberOptionFunc(func(sc *SubscriberConfig) {
return newSubscriberOptionFunc(func(sc *subscriberOptions) {
sc.subscriptionInterceptors = interceptors
})
}

0 comments on commit 7f0ea42

Please sign in to comment.