Skip to content

Commit

Permalink
Make cassandra reconnect down hosts.
Browse files Browse the repository at this point in the history
* Fix #767 by enabling gocql setting `ReconnectInterval` to reconnect to
down Cassandra hosts at a regular interval.

Signed-off-by: Brendan Shaklovitz <nyanshak@users.noreply.github.com>
  • Loading branch information
nyanshak committed Jul 13, 2018
1 parent a0dc40e commit d8e6faa
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
5 changes: 5 additions & 0 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Configuration struct {
Keyspace string `validate:"nonzero"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"`
Timeout time.Duration `validate:"min=500"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt"`
ProtoVersion int `yaml:"proto_version"`
Expand Down Expand Up @@ -74,6 +75,9 @@ func (c *Configuration) ApplyDefaults(source *Configuration) {
if c.Timeout == 0 {
c.Timeout = source.Timeout
}
if c.ReconnectInterval == 0 {
c.ReconnectInterval = source.ReconnectInterval
}
if c.Port == 0 {
c.Port = source.Port
}
Expand Down Expand Up @@ -109,6 +113,7 @@ func (c *Configuration) NewCluster() *gocql.ClusterConfig {
cluster.Keyspace = c.Keyspace
cluster.NumConns = c.ConnectionsPerHost
cluster.Timeout = c.Timeout
cluster.ReconnectInterval = c.ReconnectInterval
cluster.SocketKeepalive = c.SocketKeepAlive
if c.ProtoVersion > 0 {
cluster.ProtoVersion = c.ProtoVersion
Expand Down
43 changes: 25 additions & 18 deletions plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,25 @@ import (

const (
// session settings
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
suffixServers = ".servers"
suffixPort = ".port"
suffixKeyspace = ".keyspace"
suffixConsistency = ".consistency"
suffixProtoVer = ".proto-version"
suffixSocketKeepAlive = ".socket-keep-alive"
suffixUsername = ".username"
suffixPassword = ".password"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixServerName = ".tls.server-name"
suffixVerifyHost = ".tls.verify-host"
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
suffixReconnectInterval = ".reconnect-interval"
suffixServers = ".servers"
suffixPort = ".port"
suffixKeyspace = ".keyspace"
suffixConsistency = ".consistency"
suffixProtoVer = ".proto-version"
suffixSocketKeepAlive = ".socket-keep-alive"
suffixUsername = ".username"
suffixPassword = ".password"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixServerName = ".tls.server-name"
suffixVerifyHost = ".tls.verify-host"

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
Expand Down Expand Up @@ -83,6 +84,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
Keyspace: "jaeger_v1_test",
ProtoVersion: 4,
ConnectionsPerHost: 2,
ReconnectInterval: 60 * time.Second,
},
servers: "127.0.0.1",
namespace: primaryNamespace,
Expand Down Expand Up @@ -130,6 +132,10 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.namespace+suffixTimeout,
nsConfig.Timeout,
"Timeout used for queries")
flagSet.Duration(
nsConfig.namespace+suffixReconnectInterval,
nsConfig.ReconnectInterval,
"Reconnect interval to retry connecting to downed hosts")
flagSet.String(
nsConfig.namespace+suffixServers,
nsConfig.servers,
Expand Down Expand Up @@ -204,6 +210,7 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
cfg.ConnectionsPerHost = v.GetInt(cfg.namespace + suffixConnPerHost)
cfg.MaxRetryAttempts = v.GetInt(cfg.namespace + suffixMaxRetryAttempts)
cfg.Timeout = v.GetDuration(cfg.namespace + suffixTimeout)
cfg.ReconnectInterval = v.GetDuration(cfg.namespace + suffixReconnectInterval)
cfg.servers = v.GetString(cfg.namespace + suffixServers)
cfg.Port = v.GetInt(cfg.namespace + suffixPort)
cfg.Keyspace = v.GetString(cfg.namespace + suffixKeyspace)
Expand Down

0 comments on commit d8e6faa

Please sign in to comment.