-
Notifications
You must be signed in to change notification settings - Fork 512
/
store_gateway_client.go
120 lines (101 loc) · 4.29 KB
/
store_gateway_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// SPDX-License-Identifier: AGPL-3.0-only
// Provenance-includes-location: https://github.com/cortexproject/cortex/blob/master/pkg/querier/store_gateway_client.go
// Provenance-includes-license: Apache-2.0
// Provenance-includes-copyright: The Cortex Authors.
package querier
import (
"flag"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/crypto/tls"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/ring/client"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
)
func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "storegateway_client_request_duration_seconds",
Help: "Time spent executing requests to the store-gateway.",
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
ConstLabels: prometheus.Labels{"client": "querier"},
}, []string{"operation", "status_code"})
return func(addr string) (client.PoolClient, error) {
return dialStoreGatewayClient(clientCfg, addr, requestDuration)
}
}
func dialStoreGatewayClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) {
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial store-gateway %s", addr)
}
return &storeGatewayClient{
StoreGatewayClient: storegatewaypb.NewStoreGatewayClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
}, nil
}
type storeGatewayClient struct {
storegatewaypb.StoreGatewayClient
grpc_health_v1.HealthClient
conn *grpc.ClientConn
}
func (c *storeGatewayClient) Close() error {
return c.conn.Close()
}
func (c *storeGatewayClient) String() string {
return c.RemoteAddress()
}
func (c *storeGatewayClient) RemoteAddress() string {
return c.conn.Target()
}
func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConfig ClientConfig, logger log.Logger, reg prometheus.Registerer) *client.Pool {
// We prefer sane defaults instead of exposing further config options.
clientCfg := grpcclient.Config{
MaxRecvMsgSize: 100 << 20,
MaxSendMsgSize: 16 << 20,
GRPCCompression: "",
RateLimit: 0,
RateLimitBurst: 0,
BackoffOnRatelimits: false,
TLSEnabled: clientConfig.TLSEnabled,
TLS: clientConfig.TLS,
}
poolCfg := client.PoolConfig{
CheckInterval: clientConfig.PoolConfig.CleanupPeriod,
HealthCheckEnabled: true,
HealthCheckTimeout: clientConfig.PoolConfig.RemoteTimeout,
}
clientsCount := promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Namespace: "cortex",
Name: "storegateway_clients",
Help: "The current number of store-gateway clients in the pool.",
ConstLabels: map[string]string{"client": "querier"},
})
return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg), clientsCount, logger)
}
// PoolConfig is config for creating a Pool of gRPC clients.
type PoolConfig struct {
CleanupPeriod time.Duration `yaml:"cleanup_period" category:"advanced"`
RemoteTimeout time.Duration `yaml:"remote_timeout" category:"advanced"`
}
type ClientConfig struct {
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
PoolConfig PoolConfig `yaml:",inline"`
}
func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS for gRPC client connecting to store-gateway.")
f.DurationVar(&cfg.PoolConfig.CleanupPeriod, prefix+".cleanup-period", 5*time.Second, "How frequently to clean up clients for store-gateways that have gone away.")
f.DurationVar(&cfg.PoolConfig.RemoteTimeout, prefix+".remote-timeout", time.Second, "Timeout for detecting a store-gateway that has gone away.")
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
}