This repository has been archived by the owner on Aug 23, 2023. It is now read-only.
/
cfg.go
158 lines (139 loc) · 6.07 KB
/
cfg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package notifierKafka
import (
"flag"
"fmt"
"strconv"
"strings"
"time"
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/tools/tls"
"github.com/grafana/globalconf"
"github.com/grafana/metrictank/kafka"
"github.com/grafana/metrictank/stats"
log "github.com/sirupsen/logrus"
)
var Enabled bool
var kafkaVersionStr string
var brokerStr string
var brokers []string
var topic string
var offsetStr string
var config *sarama.Config
var offsetDuration time.Duration
var partitionStr string
var partitions []int32
var backlogProcessTimeout time.Duration
var backlogProcessTimeoutStr string
var partitionOffset map[int32]*stats.Gauge64
var partitionLogSize map[int32]*stats.Gauge64
var partitionLag map[int32]*stats.Gauge64
var tlsEnabled bool
var tlsSkipVerify bool
var tlsClientCert string
var tlsClientKey string
var FlagSet *flag.FlagSet
// metric cluster.notifier.kafka.messages-published is a counter of messages published to the kafka cluster notifier
var messagesPublished = stats.NewCounter32("cluster.notifier.kafka.messages-published")
// metric cluster.notifier.kafka.message_size is the sizes seen of messages through the kafka cluster notifier
var messagesSize = stats.NewMeter32("cluster.notifier.kafka.message_size", false)
func init() {
FlagSet = flag.NewFlagSet("kafka-cluster", flag.ExitOnError)
FlagSet.BoolVar(&Enabled, "enabled", false, "")
FlagSet.StringVar(&brokerStr, "brokers", "kafka:9092", "tcp address for kafka (may be given multiple times as comma separated list)")
FlagSet.StringVar(&kafkaVersionStr, "kafka-version", "2.0.0", "Kafka version in semver format. All brokers must be this version or newer.")
FlagSet.StringVar(&topic, "topic", "metricpersist", "kafka topic")
FlagSet.StringVar(&partitionStr, "partitions", "*", "kafka partitions to consume. use '*' or a comma separated list of id's. This should match the partitions used for kafka-mdm-in")
FlagSet.StringVar(&offsetStr, "offset", "newest", "Set the offset to start consuming from. Can be oldest, newest or a time duration")
FlagSet.StringVar(&backlogProcessTimeoutStr, "backlog-process-timeout", "60s", "Maximum time backlog processing can block during metrictank startup. Setting to a low value may result in data loss")
FlagSet.BoolVar(&tlsEnabled, "tls-enabled", false, "Whether to enable TLS")
FlagSet.BoolVar(&tlsSkipVerify, "tls-skip-verify", false, "Whether to skip TLS server cert verification")
FlagSet.StringVar(&tlsClientCert, "tls-client-cert", "", "Client cert for client authentication (use with -tls-enabled and -tls-client-key)")
FlagSet.StringVar(&tlsClientKey, "tls-client-key", "", "Client key for client authentication (use with -tls-enabled and -tls-client-cert)")
globalconf.Register("kafka-cluster", FlagSet, flag.ExitOnError)
}
func ConfigProcess(instance string) {
if !Enabled {
return
}
kafkaVersion, err := sarama.ParseKafkaVersion(kafkaVersionStr)
if err != nil {
log.Fatalf("kafka-cluster: invalid kafka-version. %s", err)
}
switch offsetStr {
case "oldest":
case "newest":
default:
offsetDuration, err = time.ParseDuration(offsetStr)
if err != nil {
log.Fatalf("kafka-cluster: invalid offest format. %s", err)
}
}
brokers = strings.Split(brokerStr, ",")
config = sarama.NewConfig()
config.ClientID = instance + "-cluster"
config.Version = kafkaVersion
config.Producer.RequiredAcks = sarama.WaitForAll // Wait for all in-sync replicas to ack the message
config.Producer.Retry.Max = 10 // Retry up to 10 times to produce the message
config.Producer.Compression = sarama.CompressionSnappy
config.Producer.Return.Successes = true
config.Producer.Partitioner = sarama.NewManualPartitioner
if tlsEnabled {
tlsConfig, err := tls.NewConfig(tlsClientCert, tlsClientKey)
if err != nil {
log.Fatalf("Failed to create TLS config: %s", err)
}
config.Net.TLS.Enable = true
config.Net.TLS.Config = tlsConfig
config.Net.TLS.Config.InsecureSkipVerify = tlsSkipVerify
}
err = config.Validate()
if err != nil {
log.Fatalf("kafka-cluster: invalid consumer config: %s", err)
}
backlogProcessTimeout, err = time.ParseDuration(backlogProcessTimeoutStr)
if err != nil {
log.Fatalf("kafka-cluster: unable to parse backlog-process-timeout. %s", err)
}
if partitionStr != "*" {
parts := strings.Split(partitionStr, ",")
for _, part := range parts {
i, err := strconv.Atoi(part)
if err != nil {
log.Fatalf("kafka-cluster: could not parse partition %q. partitions must be '*' or a comma separated list of id's", part)
}
partitions = append(partitions, int32(i))
}
}
// validate our partitions
client, err := sarama.NewClient(brokers, config)
if err != nil {
log.Fatalf("kafka-cluster: failed to create client. %s", err)
}
defer client.Close()
availParts, err := kafka.GetPartitions(client, []string{topic})
if err != nil {
log.Fatalf("kafka-cluster: %s", err.Error())
}
if partitionStr == "*" {
partitions = availParts
} else {
missing := kafka.DiffPartitions(partitions, availParts)
if len(missing) > 0 {
log.Fatalf("kafka-cluster: configured partitions not in list of available partitions. missing %v", missing)
}
}
// initialize our offset metrics
partitionOffset = make(map[int32]*stats.Gauge64)
partitionLogSize = make(map[int32]*stats.Gauge64)
partitionLag = make(map[int32]*stats.Gauge64)
for _, part := range partitions {
// metric cluster.notifier.kafka.partition.%d.offset is the current offset for the partition (%d) that we have consumed
partitionOffset[part] = stats.NewGauge64(fmt.Sprintf("cluster.notifier.kafka.partition.%d.offset", part))
// metric cluster.notifier.kafka.partition.%d.log_size is the size of the kafka partition (%d), aka the newest available offset.
partitionLogSize[part] = stats.NewGauge64(fmt.Sprintf("cluster.notifier.kafka.partition.%d.log_size", part))
// metric cluster.notifier.kafka.partition.%d.lag is how many messages (mechunkWriteRequestsrics) there are in the kafka
// partition (%d) that we have not yet consumed.
partitionLag[part] = stats.NewGauge64(fmt.Sprintf("cluster.notifier.kafka.partition.%d.lag", part))
}
log.Infof("kafka-cluster: consuming from partitions %v", partitions)
}