-
Notifications
You must be signed in to change notification settings - Fork 0
/
server.go
151 lines (127 loc) · 4.2 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package goutube
import (
"context"
"encoding/binary"
streaming_api "github.com/Brijeshlakkad/goutube/api/streaming/v1"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
"go.opencensus.io/plugin/ocgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
)
var (
enc = binary.BigEndian
)
type ServerConfig struct {
StreamingConfig *StreamingConfig
ResolverHelperConfig *ResolverHelperConfig
Rule ParticipationRule
}
type ResolverHelperConfig struct {
GetServerer GetServerer
GetFollowerer GetFollowerer
}
// START: Load Balancer Resolver Helper
type GetServerer interface {
GetServers(*streaming_api.GetServersRequest) ([]*streaming_api.Server, error)
}
func NewLBResolverHelper(config *ResolverHelperConfig) (*LBResolverHelper, error) {
return &LBResolverHelper{
ResolverHelperConfig: config,
}, nil
}
type LBResolverHelper struct {
streaming_api.UnimplementedLBResolverHelperServer
*ResolverHelperConfig
}
func (r *LBResolverHelper) GetServers(ctx context.Context, req *streaming_api.GetServersRequest) (*streaming_api.GetServersResponse, error) {
servers, err := r.GetServerer.GetServers(req)
if err != nil {
return nil, err
}
return &streaming_api.GetServersResponse{Servers: servers}, nil
}
// END: Load Balancer Resolver Helper
// START: Follower Resolver Helper
type GetFollowerer interface {
GetFollowers(*streaming_api.GetFollowersRequest) ([]*streaming_api.Server, error)
}
func NewFollowerResolverHelper(config *ResolverHelperConfig) (*FollowerResolverHelper, error) {
return &FollowerResolverHelper{
ResolverHelperConfig: config,
}, nil
}
type FollowerResolverHelper struct {
streaming_api.UnimplementedFollowerResolverHelperServer
*ResolverHelperConfig
}
func (r *FollowerResolverHelper) GetFollowers(ctx context.Context, req *streaming_api.GetFollowersRequest) (*streaming_api.GetFollowersResponse, error) {
servers, err := r.GetFollowerer.GetFollowers(req)
if err != nil {
return nil, err
}
return &streaming_api.GetFollowersResponse{Servers: servers}, nil
}
// END: Follower Resolver Helper
func NewServer(config *ServerConfig, opts ...grpc.ServerOption) (*grpc.Server, error) {
sm, err := NewStreamingServer(config.StreamingConfig)
if err != nil {
return nil, err
}
opts = append(opts,
grpc.StreamInterceptor(
grpc_middleware.ChainStreamServer(
grpc_auth.StreamServerInterceptor(authenticate),
)),
grpc.UnaryInterceptor(
grpc_middleware.ChainUnaryServer(
grpc_auth.UnaryServerInterceptor(authenticate),
)),
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
)
gRPCServer := grpc.NewServer(opts...)
streaming_api.RegisterStreamingServer(gRPCServer, sm)
// Register LoadBalancer Resolver Helper
// Any member type server can get the load balancers of the ring.
if config.ResolverHelperConfig != nil {
lbm, err := NewLBResolverHelper(config.ResolverHelperConfig)
if err != nil {
return nil, err
}
streaming_api.RegisterLBResolverHelperServer(gRPCServer, lbm)
}
// Register Follower Resolver Helper
// Only ShardMember members are implementing the GetFollowers method.
if config.ResolverHelperConfig != nil && shouldImplementFollowerResolver(config.Rule) {
fm, err := NewFollowerResolverHelper(config.ResolverHelperConfig)
if err != nil {
return nil, err
}
streaming_api.RegisterFollowerResolverHelperServer(gRPCServer, fm)
}
return gRPCServer, nil
}
// Interceptor reading the subject out of the client's cert and writing it to the RPC’s context.
func authenticate(ctx context.Context) (context.Context, error) {
peer, ok := peer.FromContext(ctx)
if !ok {
return ctx, status.New(
codes.Unknown,
"couldn't find peer info",
).Err()
}
if peer.AuthInfo == nil {
return context.WithValue(ctx, subjectContextKey{}, ""), nil
}
tlsInfo := peer.AuthInfo.(credentials.TLSInfo)
subject := tlsInfo.State.VerifiedChains[0][0].Subject.CommonName
ctx = context.WithValue(ctx, subjectContextKey{}, subject)
return ctx, nil
}
func subject(ctx context.Context) string {
return ctx.Value(subjectContextKey{}).(string)
}
type subjectContextKey struct{}