From 665d668f058192b3e29fff5337d7ad144fb4f20b Mon Sep 17 00:00:00 2001 From: Ivan Buymov Date: Mon, 27 Jul 2020 22:18:43 +0300 Subject: [PATCH] update gocql version for cassandra physical backend (#9602) --- go.mod | 2 +- go.sum | 5 +- vendor/github.com/gocql/gocql/.travis.yml | 16 +- vendor/github.com/gocql/gocql/AUTHORS | 4 + vendor/github.com/gocql/gocql/README.md | 20 +- vendor/github.com/gocql/gocql/cluster.go | 9 + vendor/github.com/gocql/gocql/conn.go | 305 +++++++------- .../github.com/gocql/gocql/connectionpool.go | 3 +- vendor/github.com/gocql/gocql/control.go | 29 +- vendor/github.com/gocql/gocql/frame.go | 3 + vendor/github.com/gocql/gocql/go.mod | 6 + vendor/github.com/gocql/gocql/go.modverify | 3 - vendor/github.com/gocql/gocql/go.sum | 22 + vendor/github.com/gocql/gocql/helpers.go | 6 + vendor/github.com/gocql/gocql/host_source.go | 41 +- vendor/github.com/gocql/gocql/integration.sh | 7 +- .../gocql/internal/murmur/murmur_appengine.go | 6 +- .../gocql/internal/murmur/murmur_unsafe.go | 1 + vendor/github.com/gocql/gocql/marshal.go | 168 ++++++-- vendor/github.com/gocql/gocql/metadata.go | 134 ++++++- vendor/github.com/gocql/gocql/policies.go | 338 ++++++++++------ .../github.com/gocql/gocql/prepared_cache.go | 27 +- vendor/github.com/gocql/gocql/session.go | 376 +++++++++++------- vendor/github.com/gocql/gocql/token.go | 30 +- vendor/github.com/gocql/gocql/topology.go | 143 +++++-- vendor/github.com/gocql/gocql/uuid.go | 51 ++- vendor/modules.txt | 2 +- 27 files changed, 1193 insertions(+), 564 deletions(-) delete mode 100644 vendor/github.com/gocql/gocql/go.modverify create mode 100644 vendor/github.com/gocql/gocql/go.sum diff --git a/go.mod b/go.mod index ae7847cc6172..b5a3b71ec5c8 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/go-ole/go-ole v1.2.4 // indirect github.com/go-sql-driver/mysql v1.5.0 github.com/go-test/deep v1.0.3 - github.com/gocql/gocql v0.0.0-20190402132108-0e1d5de854df + github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e github.com/golang/protobuf v1.4.2 github.com/google/go-github v17.0.0+incompatible github.com/google/go-metrics-stackdriver v0.2.0 diff --git a/go.sum b/go.sum index fa8eeea9d3df..b022dab456cb 100644 --- a/go.sum +++ b/go.sum @@ -143,6 +143,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= github.com/bitly/go-hostpool v0.1.0 h1:XKmsF6k5el6xHG3WPJ8U0Ku/ye7njX7W81Ng7O2ioR0= github.com/bitly/go-hostpool v0.1.0/go.mod h1:4gOCgp6+NZnVqlKyZ/iBZFTAJKembaVENUpMkpg42fw= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= @@ -307,8 +308,8 @@ github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-yaml/yaml v2.1.0+incompatible h1:RYi2hDdss1u4YE7GwixGzWwVo47T8UQwnTLB6vQiq+o= github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= -github.com/gocql/gocql v0.0.0-20190402132108-0e1d5de854df h1:fwXmhM0OqixzJDOGgTSyNH9eEDij9uGTXwsyWXvyR0A= -github.com/gocql/gocql v0.0.0-20190402132108-0e1d5de854df/go.mod h1:4Fw1eo5iaEhDUs8XyuhSVCVy52Jq3L+/3GJgYkwc+/0= +github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e h1:SroDcndcOU9BVAduPf/PXihXoR2ZYTQYLXbupbqxAyQ= +github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY= github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= diff --git a/vendor/github.com/gocql/gocql/.travis.yml b/vendor/github.com/gocql/gocql/.travis.yml index f11cb137393d..e1e9efd349b0 100644 --- a/vendor/github.com/gocql/gocql/.travis.yml +++ b/vendor/github.com/gocql/gocql/.travis.yml @@ -13,24 +13,26 @@ matrix: branches: only: - - master + - master env: global: - GOMAXPROCS=2 matrix: - - CASS=2.2.13 + - CASS=2.1.21 AUTH=true - - CASS=2.2.13 + - CASS=2.2.14 + AUTH=true + - CASS=2.2.14 AUTH=false - - CASS=3.0.17 + - CASS=3.0.18 AUTH=false - - CASS=3.11.3 + - CASS=3.11.4 AUTH=false go: - - "1.10" - - "1.11" + - 1.13.x + - 1.14.x install: - ./install_test_deps.sh $TRAVIS_REPO_SLUG diff --git a/vendor/github.com/gocql/gocql/AUTHORS b/vendor/github.com/gocql/gocql/AUTHORS index 6799f58ead48..e908b94add22 100644 --- a/vendor/github.com/gocql/gocql/AUTHORS +++ b/vendor/github.com/gocql/gocql/AUTHORS @@ -111,3 +111,7 @@ Marco Cadetg Karl Matthias Thomas Meson Martin Sucha ; +Pavel Buchinchik +Rintaro Okamura +Yura Sokolov ; +Jorge Bay diff --git a/vendor/github.com/gocql/gocql/README.md b/vendor/github.com/gocql/gocql/README.md index 1b3fd03bad46..e5ebd3f4b9f5 100644 --- a/vendor/github.com/gocql/gocql/README.md +++ b/vendor/github.com/gocql/gocql/README.md @@ -19,8 +19,8 @@ The following matrix shows the versions of Go and Cassandra that are tested with Go/Cassandra | 2.1.x | 2.2.x | 3.x.x -------------| -------| ------| --------- -1.10 | yes | yes | yes -1.11 | yes | yes | yes +1.13 | yes | yes | yes +1.14 | yes | yes | yes Gocql has been tested in production against many different versions of Cassandra. Due to limits in our CI setup we only test against the latest 3 major releases, which coincide with the official support from the Apache project. @@ -166,6 +166,22 @@ func main() { } ``` + +Authentication +------- + +```go +cluster := gocql.NewCluster("192.168.1.1", "192.168.1.2", "192.168.1.3") +cluster.Authenticator = gocql.PasswordAuthenticator{ + Username: "user", + Password: "password" +} +cluster.Keyspace = "example" +cluster.Consistency = gocql.Quorum +session, _ := cluster.CreateSession() +defer session.Close() +``` + Data Binding ------------ diff --git a/vendor/github.com/gocql/gocql/cluster.go b/vendor/github.com/gocql/gocql/cluster.go index ab0ab8a0c930..2c29437ff2fe 100644 --- a/vendor/github.com/gocql/gocql/cluster.go +++ b/vendor/github.com/gocql/gocql/cluster.go @@ -5,6 +5,7 @@ package gocql import ( + "context" "errors" "net" "time" @@ -144,10 +145,18 @@ type ClusterConfig struct { // (default: 200 microseconds) WriteCoalesceWaitTime time.Duration + // Dialer will be used to establish all connections created for this Cluster. + // If not provided, a default dialer configured with ConnectTimeout will be used. + Dialer Dialer + // internal config for testing disableControlConn bool } +type Dialer interface { + DialContext(ctx context.Context, network, addr string) (net.Conn, error) +} + // NewCluster generates a new config for the default cluster implementation. // // The supplied hosts are used to initially connect to the cluster then the rest of diff --git a/vendor/github.com/gocql/gocql/conn.go b/vendor/github.com/gocql/gocql/conn.go index f1bc78b54ad1..9e349c28192f 100644 --- a/vendor/github.com/gocql/gocql/conn.go +++ b/vendor/github.com/gocql/gocql/conn.go @@ -29,6 +29,9 @@ var ( "com.instaclustr.cassandra.auth.SharedSecretAuthenticator", "com.datastax.bdp.cassandra.auth.DseAuthenticator", "io.aiven.cassandra.auth.AivenAuthenticator", + "com.ericsson.bss.cassandra.ecaudit.auth.AuditPasswordAuthenticator", + "com.amazon.helenus.auth.HelenusAuthenticator", + "com.ericsson.bss.cassandra.ecaudit.auth.AuditAuthenticator", } ) @@ -97,6 +100,7 @@ type ConnConfig struct { CQLVersion string Timeout time.Duration ConnectTimeout time.Duration + Dialer Dialer Compressor Compressor Authenticator Authenticator AuthProvider func(h *HostInfo) (Authenticator, error) @@ -154,25 +158,26 @@ type Conn struct { session *Session closed int32 - quit chan struct{} + ctx context.Context + cancel context.CancelFunc timeouts int64 } // connect establishes a connection to a Cassandra node using session's connection config. -func (s *Session) connect(host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) { - return s.dial(host, s.connCfg, errorHandler) +func (s *Session) connect(ctx context.Context, host *HostInfo, errorHandler ConnErrorHandler) (*Conn, error) { + return s.dial(ctx, host, s.connCfg, errorHandler) } // dial establishes a connection to a Cassandra node and notifies the session's connectObserver. -func (s *Session) dial(host *HostInfo, connConfig *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) { +func (s *Session) dial(ctx context.Context, host *HostInfo, connConfig *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) { var obs ObservedConnect if s.connectObserver != nil { obs.Host = host obs.Start = time.Now() } - conn, err := s.dialWithoutObserver(host, connConfig, errorHandler) + conn, err := s.dialWithoutObserver(ctx, host, connConfig, errorHandler) if s.connectObserver != nil { obs.End = time.Now() @@ -186,44 +191,44 @@ func (s *Session) dial(host *HostInfo, connConfig *ConnConfig, errorHandler Conn // dialWithoutObserver establishes connection to a Cassandra node. // // dialWithoutObserver does not notify the connection observer, so you most probably want to call dial() instead. -func (s *Session) dialWithoutObserver(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) { +func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler) (*Conn, error) { ip := host.ConnectAddress() port := host.port // TODO(zariel): remove these - if len(ip) == 0 || ip.IsUnspecified() { + if !validIpAddr(ip) { panic(fmt.Sprintf("host missing connect ip address: %v", ip)) } else if port == 0 { panic(fmt.Sprintf("host missing port: %v", port)) } - var ( - err error - conn net.Conn - ) - - dialer := &net.Dialer{ - Timeout: cfg.ConnectTimeout, - } - if cfg.Keepalive > 0 { - dialer.KeepAlive = cfg.Keepalive + dialer := cfg.Dialer + if dialer == nil { + d := &net.Dialer{ + Timeout: cfg.ConnectTimeout, + } + if cfg.Keepalive > 0 { + d.KeepAlive = cfg.Keepalive + } + dialer = d } - // TODO(zariel): handle ipv6 zone - addr := (&net.TCPAddr{IP: ip, Port: port}).String() - + conn, err := dialer.DialContext(ctx, "tcp", host.HostnameAndPort()) + if err != nil { + return nil, err + } if cfg.tlsConfig != nil { // the TLS config is safe to be reused by connections but it must not // be modified after being used. - conn, err = tls.DialWithDialer(dialer, "tcp", addr, cfg.tlsConfig) - } else { - conn, err = dialer.Dial("tcp", addr) - } - - if err != nil { - return nil, err + tconn := tls.Client(conn, cfg.tlsConfig) + if err := tconn.Handshake(); err != nil { + conn.Close() + return nil, err + } + conn = tconn } + ctx, cancel := context.WithCancel(ctx) c := &Conn{ conn: conn, r: bufio.NewReader(conn), @@ -233,7 +238,6 @@ func (s *Session) dialWithoutObserver(host *HostInfo, cfg *ConnConfig, errorHand addr: conn.RemoteAddr().String(), errorHandler: errorHandler, compressor: cfg.Compressor, - quit: make(chan struct{}), session: s, streams: streams.New(cfg.ProtoVersion), host: host, @@ -242,50 +246,51 @@ func (s *Session) dialWithoutObserver(host *HostInfo, cfg *ConnConfig, errorHand w: conn, timeout: cfg.Timeout, }, + ctx: ctx, + cancel: cancel, } - if cfg.AuthProvider != nil { - c.auth, err = cfg.AuthProvider(host) - if err != nil { - return nil, err - } - } else { - c.auth = cfg.Authenticator + if err := c.init(ctx); err != nil { + cancel() + c.Close() + return nil, err } - var ( - ctx context.Context - cancel func() - ) - if cfg.ConnectTimeout > 0 { - ctx, cancel = context.WithTimeout(context.TODO(), cfg.ConnectTimeout) + return c, nil +} + +func (c *Conn) init(ctx context.Context) error { + if c.session.cfg.AuthProvider != nil { + var err error + c.auth, err = c.cfg.AuthProvider(c.host) + if err != nil { + return err + } } else { - ctx, cancel = context.WithCancel(context.TODO()) + c.auth = c.cfg.Authenticator } - defer cancel() startup := &startupCoordinator{ frameTicker: make(chan struct{}), conn: c, } - c.timeout = cfg.ConnectTimeout + c.timeout = c.cfg.ConnectTimeout if err := startup.setupConn(ctx); err != nil { - c.close() - return nil, err + return err } - c.timeout = cfg.Timeout + c.timeout = c.cfg.Timeout // dont coalesce startup frames - if s.cfg.WriteCoalesceWaitTime > 0 && !cfg.disableCoalesce { - c.w = newWriteCoalescer(conn, c.timeout, s.cfg.WriteCoalesceWaitTime, c.quit) + if c.session.cfg.WriteCoalesceWaitTime > 0 && !c.cfg.disableCoalesce { + c.w = newWriteCoalescer(c.conn, c.timeout, c.session.cfg.WriteCoalesceWaitTime, ctx.Done()) } - go c.serve() - go c.heartBeat() + go c.serve(ctx) + go c.heartBeat(ctx) - return c, nil + return nil } func (c *Conn) Write(p []byte) (n int, err error) { @@ -321,10 +326,18 @@ type startupCoordinator struct { } func (s *startupCoordinator) setupConn(ctx context.Context) error { + var cancel context.CancelFunc + if s.conn.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, s.conn.timeout) + } else { + ctx, cancel = context.WithCancel(ctx) + } + defer cancel() + startupErr := make(chan error) go func() { for range s.frameTicker { - err := s.conn.recv() + err := s.conn.recv(ctx) if err != nil { select { case startupErr <- err: @@ -484,7 +497,7 @@ func (c *Conn) closeWithError(err error) { } // if error was nil then unblock the quit channel - close(c.quit) + c.cancel() cerr := c.close() if err != nil { @@ -506,10 +519,10 @@ func (c *Conn) Close() { // Serve starts the stream multiplexer for this connection, which is required // to execute any queries. This method runs as long as the connection is // open and is therefore usually called in a separate goroutine. -func (c *Conn) serve() { +func (c *Conn) serve(ctx context.Context) { var err error for err == nil { - err = c.recv() + err = c.recv(ctx) } c.closeWithError(err) @@ -534,7 +547,7 @@ func (p *protocolError) Error() string { return fmt.Sprintf("gocql: received unexpected frame on stream %d: %v", p.frame.Header().stream, p.frame) } -func (c *Conn) heartBeat() { +func (c *Conn) heartBeat(ctx context.Context) { sleepTime := 1 * time.Second timer := time.NewTimer(sleepTime) defer timer.Stop() @@ -550,7 +563,7 @@ func (c *Conn) heartBeat() { timer.Reset(sleepTime) select { - case <-c.quit: + case <-ctx.Done(): return case <-timer.C: } @@ -581,7 +594,7 @@ func (c *Conn) heartBeat() { } } -func (c *Conn) recv() error { +func (c *Conn) recv(ctx context.Context) error { // not safe for concurrent reads // read a full header, ignore timeouts, as this is being ran in a loop @@ -607,6 +620,7 @@ func (c *Conn) recv() error { Length: int32(head.length), Start: headStartTime, End: headEndTime, + Host: c.host, }) } @@ -664,7 +678,7 @@ func (c *Conn) recv() error { case call.resp <- err: case <-call.timeout: c.releaseStream(call) - case <-c.quit: + case <-ctx.Done(): } return nil @@ -920,7 +934,7 @@ func (c *Conn) exec(ctx context.Context, req frameWriter, tracer Tracer) (*frame case <-ctxDone: close(call.timeout) return nil, ctx.Err() - case <-c.quit: + case <-c.ctx.Done(): return nil, ErrConnectionClosed } @@ -946,8 +960,8 @@ type preparedStatment struct { } type inflightPrepare struct { - wg sync.WaitGroup - err error + done chan struct{} + err error preparedStatment *preparedStatment } @@ -955,69 +969,76 @@ type inflightPrepare struct { func (c *Conn) prepareStatement(ctx context.Context, stmt string, tracer Tracer) (*preparedStatment, error) { stmtCacheKey := c.session.stmtsLRU.keyFor(c.addr, c.currentKeyspace, stmt) flight, ok := c.session.stmtsLRU.execIfMissing(stmtCacheKey, func(lru *lru.Cache) *inflightPrepare { - flight := new(inflightPrepare) - flight.wg.Add(1) + flight := &inflightPrepare{ + done: make(chan struct{}), + } lru.Add(stmtCacheKey, flight) return flight }) - if ok { - flight.wg.Wait() - return flight.preparedStatment, flight.err - } + if !ok { + go func() { + defer close(flight.done) - prep := &writePrepareFrame{ - statement: stmt, - } - if c.version > protoVersion4 { - prep.keyspace = c.currentKeyspace - } + prep := &writePrepareFrame{ + statement: stmt, + } + if c.version > protoVersion4 { + prep.keyspace = c.currentKeyspace + } - framer, err := c.exec(ctx, prep, tracer) - if err != nil { - flight.err = err - flight.wg.Done() - c.session.stmtsLRU.remove(stmtCacheKey) - return nil, err - } + // we won the race to do the load, if our context is canceled we shouldnt + // stop the load as other callers are waiting for it but this caller should get + // their context cancelled error. + framer, err := c.exec(c.ctx, prep, tracer) + if err != nil { + flight.err = err + c.session.stmtsLRU.remove(stmtCacheKey) + return + } - frame, err := framer.parseFrame() - if err != nil { - flight.err = err - flight.wg.Done() - c.session.stmtsLRU.remove(stmtCacheKey) - return nil, err - } + frame, err := framer.parseFrame() + if err != nil { + flight.err = err + c.session.stmtsLRU.remove(stmtCacheKey) + return + } - // TODO(zariel): tidy this up, simplify handling of frame parsing so its not duplicated - // everytime we need to parse a frame. - if len(framer.traceID) > 0 && tracer != nil { - tracer.Trace(framer.traceID) - } + // TODO(zariel): tidy this up, simplify handling of frame parsing so its not duplicated + // everytime we need to parse a frame. + if len(framer.traceID) > 0 && tracer != nil { + tracer.Trace(framer.traceID) + } - switch x := frame.(type) { - case *resultPreparedFrame: - flight.preparedStatment = &preparedStatment{ - // defensively copy as we will recycle the underlying buffer after we - // return. - id: copyBytes(x.preparedID), - // the type info's should _not_ have a reference to the framers read buffer, - // therefore we can just copy them directly. - request: x.reqMeta, - response: x.respMeta, - } - case error: - flight.err = x - default: - flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x) - } - flight.wg.Done() + switch x := frame.(type) { + case *resultPreparedFrame: + flight.preparedStatment = &preparedStatment{ + // defensively copy as we will recycle the underlying buffer after we + // return. + id: copyBytes(x.preparedID), + // the type info's should _not_ have a reference to the framers read buffer, + // therefore we can just copy them directly. + request: x.reqMeta, + response: x.respMeta, + } + case error: + flight.err = x + default: + flight.err = NewErrProtocol("Unknown type in response to prepare frame: %s", x) + } - if flight.err != nil { - c.session.stmtsLRU.remove(stmtCacheKey) + if flight.err != nil { + c.session.stmtsLRU.remove(stmtCacheKey) + } + }() } - return flight.preparedStatment, flight.err + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-flight.done: + return flight.preparedStatment, flight.err + } } func marshalQueryValue(typ TypeInfo, value interface{}, dst *queryValues) error { @@ -1065,7 +1086,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { info *preparedStatment ) - if qry.shouldPrepare() { + if !qry.skipPrepare && qry.shouldPrepare() { // Prepare all DML queries. Other queries can not be prepared. var err error info, err = c.prepareStatement(ctx, qry.stmt, qry.trace) @@ -1073,11 +1094,8 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { return &Iter{err: err} } - var values []interface{} - - if qry.binding == nil { - values = qry.values - } else { + values := qry.values + if qry.binding != nil { values, err = qry.binding(&QueryInfo{ Id: info.id, Args: info.request.columns, @@ -1181,11 +1199,8 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter { return iter case *RequestErrUnprepared: stmtCacheKey := c.session.stmtsLRU.keyFor(c.addr, c.currentKeyspace, qry.stmt) - if c.session.stmtsLRU.remove(stmtCacheKey) { - return c.executeQuery(ctx, qry) - } - - return &Iter{err: x, framer: framer} + c.session.stmtsLRU.evictPreparedID(stmtCacheKey, x.StatementId) + return c.executeQuery(ctx, qry) case error: return &Iter{err: x, framer: framer} default: @@ -1217,9 +1232,9 @@ func (c *Conn) AvailableStreams() int { func (c *Conn) UseKeyspace(keyspace string) error { q := &writeQueryFrame{statement: `USE "` + keyspace + `"`} - q.params.consistency = Any + q.params.consistency = c.session.cons - framer, err := c.exec(context.Background(), q, nil) + framer, err := c.exec(c.ctx, q, nil) if err != nil { return err } @@ -1325,14 +1340,9 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) *Iter { stmt, found := stmts[string(x.StatementId)] if found { key := c.session.stmtsLRU.keyFor(c.addr, c.currentKeyspace, stmt) - c.session.stmtsLRU.remove(key) - } - - if found { - return c.executeBatch(ctx, batch) - } else { - return &Iter{err: x, framer: framer} + c.session.stmtsLRU.evictPreparedID(key, x.StatementId) } + return c.executeBatch(ctx, batch) case *resultRowsFrame: iter := &Iter{ meta: x.meta, @@ -1351,16 +1361,19 @@ func (c *Conn) executeBatch(ctx context.Context, batch *Batch) *Iter { func (c *Conn) query(ctx context.Context, statement string, values ...interface{}) (iter *Iter) { q := c.session.Query(statement, values...).Consistency(One) q.trace = nil + q.skipPrepare = true + q.disableSkipMetadata = true return c.executeQuery(ctx, q) } func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { const ( - peerSchemas = "SELECT schema_version, peer FROM system.peers" + peerSchemas = "SELECT * FROM system.peers" localSchemas = "SELECT schema_version FROM system.local WHERE key='local'" ) var versions map[string]struct{} + var schemaVersion string endDeadline := time.Now().Add(c.session.cfg.MaxWaitSchemaAgreement) for time.Now().Before(endDeadline) { @@ -1368,16 +1381,22 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) { versions = make(map[string]struct{}) - var schemaVersion string - var peer string - for iter.Scan(&schemaVersion, &peer) { - if schemaVersion == "" { - Logger.Printf("skipping peer entry with empty schema_version: peer=%q", peer) + rows, err := iter.SliceMap() + if err != nil { + goto cont + } + + for _, row := range rows { + host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.ConnectAddress(), port: c.session.cfg.Port}) + if err != nil { + goto cont + } + if !isValidPeer(host) || host.schemaVersion == "" { + Logger.Printf("invalid peer or peer with empty schema_version: peer=%q", host) continue } - versions[schemaVersion] = struct{}{} - schemaVersion = "" + versions[host.schemaVersion] = struct{}{} } if err = iter.Close(); err != nil { @@ -1428,7 +1447,7 @@ func (c *Conn) localHostInfo(ctx context.Context) (*HostInfo, error) { port := c.conn.RemoteAddr().(*net.TCPAddr).Port // TODO(zariel): avoid doing this here - host, err := c.session.hostInfoFromMap(row, port) + host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.connectAddress, port: port}) if err != nil { return nil, err } diff --git a/vendor/github.com/gocql/gocql/connectionpool.go b/vendor/github.com/gocql/gocql/connectionpool.go index bf2388e1e71b..1d2419bf4d99 100644 --- a/vendor/github.com/gocql/gocql/connectionpool.go +++ b/vendor/github.com/gocql/gocql/connectionpool.go @@ -94,6 +94,7 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) { CQLVersion: cfg.CQLVersion, Timeout: cfg.Timeout, ConnectTimeout: cfg.ConnectTimeout, + Dialer: cfg.Dialer, Compressor: cfg.Compressor, Authenticator: cfg.Authenticator, AuthProvider: cfg.AuthProvider, @@ -506,7 +507,7 @@ func (pool *hostConnPool) connect() (err error) { var conn *Conn reconnectionPolicy := pool.session.cfg.ReconnectionPolicy for i := 0; i < reconnectionPolicy.GetMaxRetries(); i++ { - conn, err = pool.session.connect(pool.host, pool) + conn, err = pool.session.connect(pool.session.ctx, pool.host, pool) if err == nil { break } diff --git a/vendor/github.com/gocql/gocql/control.go b/vendor/github.com/gocql/gocql/control.go index 4321decb839d..aa5cf3570a52 100644 --- a/vendor/github.com/gocql/gocql/control.go +++ b/vendor/github.com/gocql/gocql/control.go @@ -116,7 +116,7 @@ func hostInfo(addr string, defaultPort int) ([]*HostInfo, error) { // Check if host is a literal IP address if ip := net.ParseIP(host); ip != nil { - hosts = append(hosts, &HostInfo{connectAddress: ip, port: port}) + hosts = append(hosts, &HostInfo{hostname: host, connectAddress: ip, port: port}) return hosts, nil } @@ -142,21 +142,21 @@ func hostInfo(addr string, defaultPort int) ([]*HostInfo, error) { } for _, ip := range ips { - hosts = append(hosts, &HostInfo{connectAddress: ip, port: port}) + hosts = append(hosts, &HostInfo{hostname: host, connectAddress: ip, port: port}) } return hosts, nil } func shuffleHosts(hosts []*HostInfo) []*HostInfo { - mutRandr.Lock() - perm := randr.Perm(len(hosts)) - mutRandr.Unlock() shuffled := make([]*HostInfo, len(hosts)) + copy(shuffled, hosts) - for i, host := range hosts { - shuffled[perm[i]] = host - } + mutRandr.Lock() + randr.Shuffle(len(hosts), func(i, j int) { + shuffled[i], shuffled[j] = shuffled[j], shuffled[i] + }) + mutRandr.Unlock() return shuffled } @@ -172,7 +172,7 @@ func (c *controlConn) shuffleDial(endpoints []*HostInfo) (*Conn, error) { var err error for _, host := range shuffled { var conn *Conn - conn, err = c.session.dial(host, &cfg, c) + conn, err = c.session.dial(c.session.ctx, host, &cfg, c) if err == nil { return conn, nil } @@ -221,7 +221,7 @@ func (c *controlConn) discoverProtocol(hosts []*HostInfo) (int, error) { var err error for _, host := range hosts { var conn *Conn - conn, err = c.session.dial(host, &connCfg, handler) + conn, err = c.session.dial(c.session.ctx, host, &connCfg, handler) if conn != nil { conn.Close() } @@ -343,7 +343,7 @@ func (c *controlConn) reconnect(refreshring bool) { var newConn *Conn if host != nil { // try to connect to the old host - conn, err := c.session.connect(host, c) + conn, err := c.session.connect(c.session.ctx, host, c) if err != nil { // host is dead // TODO: this is replicated in a few places @@ -365,7 +365,7 @@ func (c *controlConn) reconnect(refreshring bool) { } var err error - newConn, err = c.session.connect(host, c) + newConn, err = c.session.connect(c.session.ctx, host, c) if err != nil { // TODO: add log handler for things like this return @@ -389,7 +389,10 @@ func (c *controlConn) HandleError(conn *Conn, err error, closed bool) { } oldConn := c.getConn() - if oldConn.conn != conn { + + // If connection has long gone, and not been attempted for awhile, + // it's possible to have oldConn as nil here (#1297). + if oldConn != nil && oldConn.conn != conn { return } diff --git a/vendor/github.com/gocql/gocql/frame.go b/vendor/github.com/gocql/gocql/frame.go index d959b2c10180..5fc948895d2f 100644 --- a/vendor/github.com/gocql/gocql/frame.go +++ b/vendor/github.com/gocql/gocql/frame.go @@ -361,6 +361,9 @@ type ObservedFrameHeader struct { Start time.Time // EndHeader is the time we finished reading the frame header off the network connection. End time.Time + + // Host is Host of the connection the frame header was read from. + Host *HostInfo } func (f ObservedFrameHeader) String() string { diff --git a/vendor/github.com/gocql/gocql/go.mod b/vendor/github.com/gocql/gocql/go.mod index a3c38054eafe..70e98c86cfba 100644 --- a/vendor/github.com/gocql/gocql/go.mod +++ b/vendor/github.com/gocql/gocql/go.mod @@ -1,7 +1,13 @@ module github.com/gocql/gocql require ( + github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 // indirect + github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 // indirect github.com/golang/snappy v0.0.0-20170215233205-553a64147049 github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed + github.com/kr/pretty v0.1.0 // indirect + github.com/stretchr/testify v1.3.0 // indirect gopkg.in/inf.v0 v0.9.1 ) + +go 1.13 diff --git a/vendor/github.com/gocql/gocql/go.modverify b/vendor/github.com/gocql/gocql/go.modverify deleted file mode 100644 index fa9fc60188ab..000000000000 --- a/vendor/github.com/gocql/gocql/go.modverify +++ /dev/null @@ -1,3 +0,0 @@ -github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk= -github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= -gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= diff --git a/vendor/github.com/gocql/gocql/go.sum b/vendor/github.com/gocql/gocql/go.sum new file mode 100644 index 000000000000..a2bcaf78fcba --- /dev/null +++ b/vendor/github.com/gocql/gocql/go.sum @@ -0,0 +1,22 @@ +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/golang/snappy v0.0.0-20170215233205-553a64147049 h1:K9KHZbXKpGydfDN0aZrsoHpLJlZsBrGMFWbgLDGnPZk= +github.com/golang/snappy v0.0.0-20170215233205-553a64147049/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= +github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= diff --git a/vendor/github.com/gocql/gocql/helpers.go b/vendor/github.com/gocql/gocql/helpers.go index 0eb30d07e3f0..eb07f4f6974e 100644 --- a/vendor/github.com/gocql/gocql/helpers.go +++ b/vendor/github.com/gocql/gocql/helpers.go @@ -85,14 +85,20 @@ func getCassandraBaseType(name string) Type { return TypeBoolean case "counter": return TypeCounter + case "date": + return TypeDate case "decimal": return TypeDecimal case "double": return TypeDouble + case "duration": + return TypeDuration case "float": return TypeFloat case "int": return TypeInt + case "smallint": + return TypeSmallInt case "tinyint": return TypeTinyInt case "time": diff --git a/vendor/github.com/gocql/gocql/host_source.go b/vendor/github.com/gocql/gocql/host_source.go index 97f9455b73b7..f8ab3c109c67 100644 --- a/vendor/github.com/gocql/gocql/host_source.go +++ b/vendor/github.com/gocql/gocql/host_source.go @@ -110,6 +110,7 @@ type HostInfo struct { // TODO(zariel): reduce locking maybe, not all values will change, but to ensure // that we are thread safe use a mutex to access all fields. mu sync.RWMutex + hostname string peer net.IP broadcastAddress net.IP listenAddress net.IP @@ -127,6 +128,7 @@ type HostInfo struct { clusterName string version cassVersion state nodeState + schemaVersion string tokens []string } @@ -226,8 +228,9 @@ func (h *HostInfo) PreferredIP() net.IP { func (h *HostInfo) DataCenter() string { h.mu.RLock() - defer h.mu.RUnlock() - return h.dataCenter + dc := h.dataCenter + h.mu.RUnlock() + return dc } func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo { @@ -239,8 +242,9 @@ func (h *HostInfo) setDataCenter(dataCenter string) *HostInfo { func (h *HostInfo) Rack() string { h.mu.RLock() - defer h.mu.RUnlock() - return h.rack + rack := h.rack + h.mu.RUnlock() + return rack } func (h *HostInfo) setRack(rack string) *HostInfo { @@ -411,15 +415,22 @@ func (h *HostInfo) IsUp() bool { return h != nil && h.State() == NodeUp } +func (h *HostInfo) HostnameAndPort() string { + if h.hostname == "" { + h.hostname = h.ConnectAddress().String() + } + return net.JoinHostPort(h.hostname, strconv.Itoa(h.port)) +} + func (h *HostInfo) String() string { h.mu.RLock() defer h.mu.RUnlock() connectAddr, source := h.connectAddressLocked() - return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+ + return fmt.Sprintf("[HostInfo hostname=%q connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+ "preferred_ip=%q connect_addr=%q connect_addr_source=%q "+ "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", - h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP, + h.hostname, h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, h.preferredIP, connectAddr, source, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens)) } @@ -450,15 +461,11 @@ func checkSystemSchema(control *controlConn) (bool, error) { // Given a map that represents a row from either system.local or system.peers // return as much information as we can in *HostInfo -func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostInfo, error) { +func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*HostInfo, error) { const assertErrorMsg = "Assertion failed for %s" var ok bool // Default to our connected port if the cluster doesn't have port information - host := HostInfo{ - port: port, - } - for key, value := range row { switch key { case "data_center": @@ -543,6 +550,12 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostIn if !ok { return nil, fmt.Errorf(assertErrorMsg, "dse_version") } + case "schema_version": + schemaVersion, ok := value.(UUID) + if !ok { + return nil, fmt.Errorf(assertErrorMsg, "schema_version") + } + host.schemaVersion = schemaVersion.String() } // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete // Not sure what the port field will be called until the JIRA issue is complete @@ -552,7 +565,7 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, port int) (*HostIn host.connectAddress = ip host.port = port - return &host, nil + return host, nil } // Ask the control node for host info on all it's known peers @@ -575,7 +588,7 @@ func (r *ringDescriber) getClusterPeerInfo() ([]*HostInfo, error) { for _, row := range rows { // extract all available info about the peer - host, err := r.session.hostInfoFromMap(row, r.session.cfg.Port) + host, err := r.session.hostInfoFromMap(row, &HostInfo{port: r.session.cfg.Port}) if err != nil { return nil, err } else if !isValidPeer(host) { @@ -637,7 +650,7 @@ func (r *ringDescriber) getHostInfo(ip net.IP, port int) (*HostInfo, error) { } for _, row := range rows { - h, err := r.session.hostInfoFromMap(row, port) + h, err := r.session.hostInfoFromMap(row, &HostInfo{port: port}) if err != nil { return nil, err } diff --git a/vendor/github.com/gocql/gocql/integration.sh b/vendor/github.com/gocql/gocql/integration.sh index 6e49f84451cd..1c6311b29231 100644 --- a/vendor/github.com/gocql/gocql/integration.sh +++ b/vendor/github.com/gocql/gocql/integration.sh @@ -75,10 +75,15 @@ function run_tests() { else sleep 1s go test -tags "cassandra gocql_debug" -timeout=5m -race $args + + ccm clear + ccm start --wait-for-binary-proto + sleep 1s + go test -tags "integration gocql_debug" -timeout=5m -race $args ccm clear - ccm start + ccm start --wait-for-binary-proto sleep 1s go test -tags "ccm gocql_debug" -timeout=5m -race $args diff --git a/vendor/github.com/gocql/gocql/internal/murmur/murmur_appengine.go b/vendor/github.com/gocql/gocql/internal/murmur/murmur_appengine.go index fd9ab5c14c32..63c3eb2ec867 100644 --- a/vendor/github.com/gocql/gocql/internal/murmur/murmur_appengine.go +++ b/vendor/github.com/gocql/gocql/internal/murmur/murmur_appengine.go @@ -1,11 +1,11 @@ -// +build appengine +// +build appengine s390x package murmur import "encoding/binary" func getBlock(data []byte, n int) (int64, int64) { - k1 := binary.LittleEndian.Int64(data[n*16:]) - k2 := binary.LittleEndian.Int64(data[(n*16)+8:]) + k1 := int64(binary.LittleEndian.Uint64(data[n*16:])) + k2 := int64(binary.LittleEndian.Uint64(data[(n*16)+8:])) return k1, k2 } diff --git a/vendor/github.com/gocql/gocql/internal/murmur/murmur_unsafe.go b/vendor/github.com/gocql/gocql/internal/murmur/murmur_unsafe.go index 501537c77e27..8fc950cfbb31 100644 --- a/vendor/github.com/gocql/gocql/internal/murmur/murmur_unsafe.go +++ b/vendor/github.com/gocql/gocql/internal/murmur/murmur_unsafe.go @@ -1,4 +1,5 @@ // +build !appengine +// +build !s390x package murmur diff --git a/vendor/github.com/gocql/gocql/marshal.go b/vendor/github.com/gocql/gocql/marshal.go index de6af1a3640d..e95c1c8f9ded 100644 --- a/vendor/github.com/gocql/gocql/marshal.go +++ b/vendor/github.com/gocql/gocql/marshal.go @@ -713,9 +713,6 @@ func unmarshalIntlike(info TypeInfo, int64Val int64, data []byte, value interfac return nil case *uint: unitVal := uint64(int64Val) - if ^uint(0) == math.MaxUint32 && unitVal > math.MaxUint32 { - return unmarshalErrorf("unmarshal int: value %d out of range for %T", unitVal, *v) - } switch info.Type() { case TypeInt: *v = uint(unitVal) & 0xFFFFFFFF @@ -724,6 +721,9 @@ func unmarshalIntlike(info TypeInfo, int64Val int64, data []byte, value interfac case TypeTinyInt: *v = uint(unitVal) & 0xFF default: + if ^uint(0) == math.MaxUint32 && (int64Val < 0 || int64Val > math.MaxUint32) { + return unmarshalErrorf("unmarshal int: value %d out of range for %T", unitVal, *v) + } *v = uint(unitVal) } return nil @@ -749,15 +749,17 @@ func unmarshalIntlike(info TypeInfo, int64Val int64, data []byte, value interfac *v = int32(int64Val) return nil case *uint32: - if int64Val > math.MaxUint32 { - return unmarshalErrorf("unmarshal int: value %d out of range for %T", int64Val, *v) - } switch info.Type() { + case TypeInt: + *v = uint32(int64Val) & 0xFFFFFFFF case TypeSmallInt: *v = uint32(int64Val) & 0xFFFF case TypeTinyInt: *v = uint32(int64Val) & 0xFF default: + if int64Val < 0 || int64Val > math.MaxUint32 { + return unmarshalErrorf("unmarshal int: value %d out of range for %T", int64Val, *v) + } *v = uint32(int64Val) & 0xFFFFFFFF } return nil @@ -768,13 +770,15 @@ func unmarshalIntlike(info TypeInfo, int64Val int64, data []byte, value interfac *v = int16(int64Val) return nil case *uint16: - if int64Val > math.MaxUint16 { - return unmarshalErrorf("unmarshal int: value %d out of range for %T", int64Val, *v) - } switch info.Type() { + case TypeSmallInt: + *v = uint16(int64Val) & 0xFFFF case TypeTinyInt: *v = uint16(int64Val) & 0xFF default: + if int64Val < 0 || int64Val > math.MaxUint16 { + return unmarshalErrorf("unmarshal int: value %d out of range for %T", int64Val, *v) + } *v = uint16(int64Val) & 0xFFFF } return nil @@ -785,7 +789,7 @@ func unmarshalIntlike(info TypeInfo, int64Val int64, data []byte, value interfac *v = int8(int64Val) return nil case *uint8: - if int64Val > math.MaxUint8 { + if info.Type() != TypeTinyInt && (int64Val < 0 || int64Val > math.MaxUint8) { return unmarshalErrorf("unmarshal int: value %d out of range for %T", int64Val, *v) } *v = uint8(int64Val) & 0xFF @@ -833,34 +837,69 @@ func unmarshalIntlike(info TypeInfo, int64Val int64, data []byte, value interfac rv.SetInt(int64Val) return nil case reflect.Uint: - if int64Val < 0 || (^uint(0) == math.MaxUint32 && int64Val > math.MaxUint32) { - return unmarshalErrorf("unmarshal int: value %d out of range", int64Val) + unitVal := uint64(int64Val) + switch info.Type() { + case TypeInt: + rv.SetUint(unitVal & 0xFFFFFFFF) + case TypeSmallInt: + rv.SetUint(unitVal & 0xFFFF) + case TypeTinyInt: + rv.SetUint(unitVal & 0xFF) + default: + if ^uint(0) == math.MaxUint32 && (int64Val < 0 || int64Val > math.MaxUint32) { + return unmarshalErrorf("unmarshal int: value %d out of range for %s", unitVal, rv.Type()) + } + rv.SetUint(unitVal) } - rv.SetUint(uint64(int64Val)) return nil case reflect.Uint64: - if int64Val < 0 { - return unmarshalErrorf("unmarshal int: value %d out of range", int64Val) + unitVal := uint64(int64Val) + switch info.Type() { + case TypeInt: + rv.SetUint(unitVal & 0xFFFFFFFF) + case TypeSmallInt: + rv.SetUint(unitVal & 0xFFFF) + case TypeTinyInt: + rv.SetUint(unitVal & 0xFF) + default: + rv.SetUint(unitVal) } - rv.SetUint(uint64(int64Val)) return nil case reflect.Uint32: - if int64Val < 0 || int64Val > math.MaxUint32 { - return unmarshalErrorf("unmarshal int: value %d out of range", int64Val) + unitVal := uint64(int64Val) + switch info.Type() { + case TypeInt: + rv.SetUint(unitVal & 0xFFFFFFFF) + case TypeSmallInt: + rv.SetUint(unitVal & 0xFFFF) + case TypeTinyInt: + rv.SetUint(unitVal & 0xFF) + default: + if int64Val < 0 || int64Val > math.MaxUint32 { + return unmarshalErrorf("unmarshal int: value %d out of range for %s", int64Val, rv.Type()) + } + rv.SetUint(unitVal & 0xFFFFFFFF) } - rv.SetUint(uint64(int64Val)) return nil case reflect.Uint16: - if int64Val < 0 || int64Val > math.MaxUint16 { - return unmarshalErrorf("unmarshal int: value %d out of range", int64Val) + unitVal := uint64(int64Val) + switch info.Type() { + case TypeSmallInt: + rv.SetUint(unitVal & 0xFFFF) + case TypeTinyInt: + rv.SetUint(unitVal & 0xFF) + default: + if int64Val < 0 || int64Val > math.MaxUint16 { + return unmarshalErrorf("unmarshal int: value %d out of range for %s", int64Val, rv.Type()) + } + rv.SetUint(unitVal & 0xFFFF) } - rv.SetUint(uint64(int64Val)) return nil case reflect.Uint8: - if int64Val < 0 || int64Val > math.MaxUint8 { - return unmarshalErrorf("unmarshal int: value %d out of range", int64Val) + if info.Type() != TypeTinyInt && (int64Val < 0 || int64Val > math.MaxUint8) { + return unmarshalErrorf("unmarshal int: value %d out of range for %s", int64Val, rv.Type()) } - rv.SetUint(uint64(int64Val)) + rv.SetUint(uint64(int64Val) & 0xff) return nil } return unmarshalErrorf("can not unmarshal %s into %T", info, value) @@ -1261,6 +1300,16 @@ func unmarshalDate(info TypeInfo, data []byte, value interface{}) error { timestamp := (int64(current) - int64(origin)) * 86400000 *v = time.Unix(0, timestamp*int64(time.Millisecond)).In(time.UTC) return nil + case *string: + if len(data) == 0 { + *v = "" + return nil + } + var origin uint32 = 1 << 31 + var current uint32 = binary.BigEndian.Uint32(data) + timestamp := (int64(current) - int64(origin)) * 86400000 + *v = time.Unix(0, timestamp*int64(time.Millisecond)).In(time.UTC).Format("2006-01-02") + return nil } return unmarshalErrorf("can not unmarshal %s into %T", info, value) } @@ -1510,6 +1559,9 @@ func unmarshalList(info TypeInfo, data []byte, value interface{}) error { return err } data = data[p:] + if len(data) < m { + return unmarshalErrorf("unmarshal list: unexpected eof") + } if err := Unmarshal(listInfo.Elem, data[:m], rv.Index(i).Addr().Interface()); err != nil { return err } @@ -1604,6 +1656,9 @@ func unmarshalMap(info TypeInfo, data []byte, value interface{}) error { return err } data = data[p:] + if len(data) < m { + return unmarshalErrorf("unmarshal map: unexpected eof") + } key := reflect.New(t.Key()) if err := Unmarshal(mapInfo.Key, data[:m], key.Interface()); err != nil { return err @@ -1615,6 +1670,9 @@ func unmarshalMap(info TypeInfo, data []byte, value interface{}) error { return err } data = data[p:] + if len(data) < m { + return unmarshalErrorf("unmarshal map: unexpected eof") + } val := reflect.New(t.Elem()) if err := Unmarshal(mapInfo.Elem, data[:m], val.Interface()); err != nil { return err @@ -1781,6 +1839,11 @@ func marshalTuple(info TypeInfo, value interface{}) ([]byte, error) { var buf []byte for i, elem := range v { + if elem == nil { + buf = appendInt(buf, int32(-1)) + continue + } + data, err := Marshal(tuple.Elems[i], elem) if err != nil { return nil, err @@ -1806,7 +1869,14 @@ func marshalTuple(info TypeInfo, value interface{}) ([]byte, error) { var buf []byte for i, elem := range tuple.Elems { - data, err := Marshal(elem, rv.Field(i).Interface()) + field := rv.Field(i) + + if field.Kind() == reflect.Ptr && field.IsNil() { + buf = appendInt(buf, int32(-1)) + continue + } + + data, err := Marshal(elem, field.Interface()) if err != nil { return nil, err } @@ -1825,7 +1895,14 @@ func marshalTuple(info TypeInfo, value interface{}) ([]byte, error) { var buf []byte for i, elem := range tuple.Elems { - data, err := Marshal(elem, rv.Index(i).Interface()) + item := rv.Index(i) + + if item.Kind() == reflect.Ptr && item.IsNil() { + buf = appendInt(buf, int32(-1)) + continue + } + + data, err := Marshal(elem, item.Interface()) if err != nil { return nil, err } @@ -1865,8 +1942,9 @@ func unmarshalTuple(info TypeInfo, data []byte, value interface{}) error { for i, elem := range tuple.Elems { // each element inside data is a [bytes] var p []byte - p, data = readBytes(data) - + if len(data) > 4 { + p, data = readBytes(data) + } err := Unmarshal(elem, p, v[i]) if err != nil { return err @@ -1892,16 +1970,22 @@ func unmarshalTuple(info TypeInfo, data []byte, value interface{}) error { } for i, elem := range tuple.Elems { - m := readInt(data) - data = data[4:] + var p []byte + if len(data) > 4 { + p, data = readBytes(data) + } v := elem.New() - if err := Unmarshal(elem, data[:m], v); err != nil { + if err := Unmarshal(elem, p, v); err != nil { return err } - rv.Field(i).Set(reflect.ValueOf(v).Elem()) - data = data[m:] + switch rv.Field(i).Kind() { + case reflect.Ptr: + rv.Field(i).Set(reflect.ValueOf(v)) + default: + rv.Field(i).Set(reflect.ValueOf(v).Elem()) + } } return nil @@ -1916,16 +2000,22 @@ func unmarshalTuple(info TypeInfo, data []byte, value interface{}) error { } for i, elem := range tuple.Elems { - m := readInt(data) - data = data[4:] + var p []byte + if len(data) > 4 { + p, data = readBytes(data) + } v := elem.New() - if err := Unmarshal(elem, data[:m], v); err != nil { + if err := Unmarshal(elem, p, v); err != nil { return err } - rv.Index(i).Set(reflect.ValueOf(v).Elem()) - data = data[m:] + switch rv.Index(i).Kind() { + case reflect.Ptr: + rv.Index(i).Set(reflect.ValueOf(v)) + default: + rv.Index(i).Set(reflect.ValueOf(v).Elem()) + } } return nil diff --git a/vendor/github.com/gocql/gocql/metadata.go b/vendor/github.com/gocql/gocql/metadata.go index 5a17559b0f18..e586dd48f23a 100644 --- a/vendor/github.com/gocql/gocql/metadata.go +++ b/vendor/github.com/gocql/gocql/metadata.go @@ -22,7 +22,10 @@ type KeyspaceMetadata struct { Tables map[string]*TableMetadata Functions map[string]*FunctionMetadata Aggregates map[string]*AggregateMetadata - Views map[string]*ViewMetadata + // Deprecated: use the MaterializedViews field for views and UserTypes field for udts instead. + Views map[string]*ViewMetadata + MaterializedViews map[string]*MaterializedViewMetadata + UserTypes map[string]*UserTypeMetadata } // schema metadata for a table (a.k.a. column family) @@ -83,6 +86,7 @@ type AggregateMetadata struct { } // ViewMetadata holds the metadata for views. +// Deprecated: this is kept for backwards compatibility issues. Use MaterializedViewMetadata. type ViewMetadata struct { Keyspace string Name string @@ -90,6 +94,40 @@ type ViewMetadata struct { FieldTypes []TypeInfo } +// MaterializedViewMetadata holds the metadata for materialized views. +type MaterializedViewMetadata struct { + Keyspace string + Name string + BaseTableId UUID + BaseTable *TableMetadata + BloomFilterFpChance float64 + Caching map[string]string + Comment string + Compaction map[string]string + Compression map[string]string + CrcCheckChance float64 + DcLocalReadRepairChance float64 + DefaultTimeToLive int + Extensions map[string]string + GcGraceSeconds int + Id UUID + IncludeAllColumns bool + MaxIndexInterval int + MemtableFlushPeriodInMs int + MinIndexInterval int + ReadRepairChance float64 + SpeculativeRetry string + + baseTableName string +} + +type UserTypeMetadata struct { + Keyspace string + Name string + FieldNames []string + FieldTypes []TypeInfo +} + // the ordering of the column with regard to its comparator type ColumnOrder bool @@ -246,9 +284,13 @@ func (s *schemaDescriber) refreshSchema(keyspaceName string) error { if err != nil { return err } + materializedViews, err := getMaterializedViewsMetadata(s.session, keyspaceName) + if err != nil { + return err + } // organize the schema data - compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns, functions, aggregates, views) + compileMetadata(s.session.cfg.ProtoVersion, keyspace, tables, columns, functions, aggregates, views, materializedViews) // update the cache s.cache[keyspaceName] = keyspace @@ -269,6 +311,7 @@ func compileMetadata( functions []FunctionMetadata, aggregates []AggregateMetadata, views []ViewMetadata, + materializedViews []MaterializedViewMetadata, ) { keyspace.Tables = make(map[string]*TableMetadata) for i := range tables { @@ -290,6 +333,24 @@ func compileMetadata( for i := range views { keyspace.Views[views[i].Name] = &views[i] } + // Views currently holds the types and hasn't been deleted for backward compatibility issues. + // That's why it's ok to copy Views into Types in this case. For the real Views use MaterializedViews. + types := make([]UserTypeMetadata, len(views)) + for i := range views { + types[i].Keyspace = views[i].Keyspace + types[i].Name = views[i].Name + types[i].FieldNames = views[i].FieldNames + types[i].FieldTypes = views[i].FieldTypes + } + keyspace.UserTypes = make(map[string]*UserTypeMetadata, len(views)) + for i := range types { + keyspace.UserTypes[types[i].Name] = &types[i] + } + keyspace.MaterializedViews = make(map[string]*MaterializedViewMetadata, len(materializedViews)) + for _, materializedView := range materializedViews { + materializedView.BaseTable = keyspace.Tables[materializedView.baseTableName] + keyspace.MaterializedViews[materializedView.Name] = &materializedView + } // add columns from the schema data for i := range columns { @@ -912,6 +973,75 @@ func getViewsMetadata(session *Session, keyspaceName string) ([]ViewMetadata, er return views, nil } +func getMaterializedViewsMetadata(session *Session, keyspaceName string) ([]MaterializedViewMetadata, error) { + if !session.useSystemSchema { + return nil, nil + } + var tableName = "system_schema.views" + stmt := fmt.Sprintf(` + SELECT + view_name, + base_table_id, + base_table_name, + bloom_filter_fp_chance, + caching, + comment, + compaction, + compression, + crc_check_chance, + dclocal_read_repair_chance, + default_time_to_live, + extensions, + gc_grace_seconds, + id, + include_all_columns, + max_index_interval, + memtable_flush_period_in_ms, + min_index_interval, + read_repair_chance, + speculative_retry + FROM %s + WHERE keyspace_name = ?`, tableName) + + var materializedViews []MaterializedViewMetadata + + rows := session.control.query(stmt, keyspaceName).Scanner() + for rows.Next() { + materializedView := MaterializedViewMetadata{Keyspace: keyspaceName} + err := rows.Scan(&materializedView.Name, + &materializedView.BaseTableId, + &materializedView.baseTableName, + &materializedView.BloomFilterFpChance, + &materializedView.Caching, + &materializedView.Comment, + &materializedView.Compaction, + &materializedView.Compression, + &materializedView.CrcCheckChance, + &materializedView.DcLocalReadRepairChance, + &materializedView.DefaultTimeToLive, + &materializedView.Extensions, + &materializedView.GcGraceSeconds, + &materializedView.Id, + &materializedView.IncludeAllColumns, + &materializedView.MaxIndexInterval, + &materializedView.MemtableFlushPeriodInMs, + &materializedView.MinIndexInterval, + &materializedView.ReadRepairChance, + &materializedView.SpeculativeRetry, + ) + if err != nil { + return nil, err + } + materializedViews = append(materializedViews, materializedView) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return materializedViews, nil +} + func getFunctionsMetadata(session *Session, keyspaceName string) ([]FunctionMetadata, error) { if session.cfg.ProtoVersion == protoVersion1 || !session.hasAggregatesAndFunctions { return nil, nil diff --git a/vendor/github.com/gocql/gocql/policies.go b/vendor/github.com/gocql/gocql/policies.go index 276157352251..62d809c865da 100644 --- a/vendor/github.com/gocql/gocql/policies.go +++ b/vendor/github.com/gocql/gocql/policies.go @@ -333,9 +333,8 @@ func RoundRobinHostPolicy() HostSelectionPolicy { } type roundRobinHostPolicy struct { - hosts cowHostList - pos uint32 - mu sync.RWMutex + hosts cowHostList + lastUsedHostIdx uint64 } func (r *roundRobinHostPolicy) IsLocal(*HostInfo) bool { return true } @@ -344,25 +343,8 @@ func (r *roundRobinHostPolicy) SetPartitioner(partitioner string) {} func (r *roundRobinHostPolicy) Init(*Session) {} func (r *roundRobinHostPolicy) Pick(qry ExecutableQuery) NextHost { - // i is used to limit the number of attempts to find a host - // to the number of hosts known to this policy - var i int - return func() SelectedHost { - hosts := r.hosts.get() - if len(hosts) == 0 { - return nil - } - - // always increment pos to evenly distribute traffic in case of - // failures - pos := atomic.AddUint32(&r.pos, 1) - 1 - if i >= len(hosts) { - return nil - } - host := hosts[(pos)%uint32(len(hosts))] - i++ - return (*selectedHost)(host) - } + nextStartOffset := atomic.AddUint64(&r.lastUsedHostIdx, 1) + return roundRobbin(int(nextStartOffset), r.hosts.get()) } func (r *roundRobinHostPolicy) AddHost(host *HostInfo) { @@ -387,6 +369,18 @@ func ShuffleReplicas() func(*tokenAwareHostPolicy) { } } +// NonLocalReplicasFallback enables fallback to replicas that are not considered local. +// +// TokenAwareHostPolicy used with DCAwareHostPolicy fallback first selects replicas by partition key in local DC, then +// falls back to other nodes in the local DC. Enabling NonLocalReplicasFallback causes TokenAwareHostPolicy +// to first select replicas by partition key in local DC, then replicas by partition key in remote DCs and fall back +// to other nodes in local DC. +func NonLocalReplicasFallback() func(policy *tokenAwareHostPolicy) { + return func(t *tokenAwareHostPolicy) { + t.nonLocalReplicasFallback = true + } +} + // TokenAwareHostPolicy is a token aware host selection policy, where hosts are // selected based on the partition key, so queries are sent to the host which // owns the partition. Fallback is used when routing information is not available. @@ -398,25 +392,35 @@ func TokenAwareHostPolicy(fallback HostSelectionPolicy, opts ...func(*tokenAware return p } -type keyspaceMeta struct { - replicas map[string]map[token][]*HostInfo +// clusterMeta holds metadata about cluster topology. +// It is used inside atomic.Value and shallow copies are used when replacing it, +// so fields should not be modified in-place. Instead, to modify a field a copy of the field should be made +// and the pointer in clusterMeta updated to point to the new value. +type clusterMeta struct { + // replicas is map[keyspace]map[token]hosts + replicas map[string]tokenRingReplicas + tokenRing *tokenRing } type tokenAwareHostPolicy struct { - hosts cowHostList - mu sync.RWMutex - partitioner string - fallback HostSelectionPolicy - session *Session + fallback HostSelectionPolicy + getKeyspaceMetadata func(keyspace string) (*KeyspaceMetadata, error) + getKeyspaceName func() string - tokenRing atomic.Value // *tokenRing - keyspaces atomic.Value // *keyspaceMeta + shuffleReplicas bool + nonLocalReplicasFallback bool - shuffleReplicas bool + // mu protects writes to hosts, partitioner, metadata. + // reads can be unlocked as long as they are not used for updating state later. + mu sync.Mutex + hosts cowHostList + partitioner string + metadata atomic.Value // *clusterMeta } func (t *tokenAwareHostPolicy) Init(s *Session) { - t.session = s + t.getKeyspaceMetadata = s.KeyspaceMetadata + t.getKeyspaceName = func() string { return s.cfg.Keyspace } } func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool { @@ -424,40 +428,36 @@ func (t *tokenAwareHostPolicy) IsLocal(host *HostInfo) bool { } func (t *tokenAwareHostPolicy) KeyspaceChanged(update KeyspaceUpdateEvent) { - t.updateKeyspaceMetadata(update.Keyspace) + t.mu.Lock() + defer t.mu.Unlock() + meta := t.getMetadataForUpdate() + t.updateReplicas(meta, update.Keyspace) + t.metadata.Store(meta) } -func (t *tokenAwareHostPolicy) updateKeyspaceMetadata(keyspace string) { - meta, _ := t.keyspaces.Load().(*keyspaceMeta) - var size = 1 - if meta != nil { - size = len(meta.replicas) - } - - newMeta := &keyspaceMeta{ - replicas: make(map[string]map[token][]*HostInfo, size), - } +// updateReplicas updates replicas in clusterMeta. +// It must be called with t.mu mutex locked. +// meta must not be nil and it's replicas field will be updated. +func (t *tokenAwareHostPolicy) updateReplicas(meta *clusterMeta, keyspace string) { + newReplicas := make(map[string]tokenRingReplicas, len(meta.replicas)) - ks, err := t.session.KeyspaceMetadata(keyspace) + ks, err := t.getKeyspaceMetadata(keyspace) if err == nil { strat := getStrategy(ks) if strat != nil { - tr, _ := t.tokenRing.Load().(*tokenRing) - if tr != nil { - newMeta.replicas[keyspace] = strat.replicaMap(t.hosts.get(), tr.tokens) + if meta != nil && meta.tokenRing != nil { + newReplicas[keyspace] = strat.replicaMap(meta.tokenRing) } } } - if meta != nil { - for ks, replicas := range meta.replicas { - if ks != keyspace { - newMeta.replicas[ks] = replicas - } + for ks, replicas := range meta.replicas { + if ks != keyspace { + newReplicas[ks] = replicas } } - t.keyspaces.Store(newMeta) + meta.replicas = newReplicas } func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { @@ -467,53 +467,96 @@ func (t *tokenAwareHostPolicy) SetPartitioner(partitioner string) { if t.partitioner != partitioner { t.fallback.SetPartitioner(partitioner) t.partitioner = partitioner - - t.resetTokenRing(partitioner) + meta := t.getMetadataForUpdate() + meta.resetTokenRing(t.partitioner, t.hosts.get()) + t.updateReplicas(meta, t.getKeyspaceName()) + t.metadata.Store(meta) } } func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) { - t.HostUp(host) - if t.session != nil { // disable for unit tests - t.updateKeyspaceMetadata(t.session.cfg.Keyspace) + t.mu.Lock() + if t.hosts.add(host) { + meta := t.getMetadataForUpdate() + meta.resetTokenRing(t.partitioner, t.hosts.get()) + t.updateReplicas(meta, t.getKeyspaceName()) + t.metadata.Store(meta) + } + t.mu.Unlock() + + t.fallback.AddHost(host) +} + +func (t *tokenAwareHostPolicy) AddHosts(hosts []*HostInfo) { + t.mu.Lock() + + for _, host := range hosts { + t.hosts.add(host) + } + + meta := t.getMetadataForUpdate() + meta.resetTokenRing(t.partitioner, t.hosts.get()) + t.updateReplicas(meta, t.getKeyspaceName()) + t.metadata.Store(meta) + + t.mu.Unlock() + + for _, host := range hosts { + t.fallback.AddHost(host) } } func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) { - t.HostDown(host) - if t.session != nil { // disable for unit tests - t.updateKeyspaceMetadata(t.session.cfg.Keyspace) + t.mu.Lock() + if t.hosts.remove(host.ConnectAddress()) { + meta := t.getMetadataForUpdate() + meta.resetTokenRing(t.partitioner, t.hosts.get()) + t.updateReplicas(meta, t.getKeyspaceName()) + t.metadata.Store(meta) } + t.mu.Unlock() + + t.fallback.RemoveHost(host) } func (t *tokenAwareHostPolicy) HostUp(host *HostInfo) { - t.hosts.add(host) - t.fallback.AddHost(host) - - t.mu.RLock() - partitioner := t.partitioner - t.mu.RUnlock() - t.resetTokenRing(partitioner) + t.fallback.HostUp(host) } func (t *tokenAwareHostPolicy) HostDown(host *HostInfo) { - t.hosts.remove(host.ConnectAddress()) - t.fallback.RemoveHost(host) + t.fallback.HostDown(host) +} - t.mu.RLock() - partitioner := t.partitioner - t.mu.RUnlock() - t.resetTokenRing(partitioner) +// getMetadataReadOnly returns current cluster metadata. +// Metadata uses copy on write, so the returned value should be only used for reading. +// To obtain a copy that could be updated, use getMetadataForUpdate instead. +func (t *tokenAwareHostPolicy) getMetadataReadOnly() *clusterMeta { + meta, _ := t.metadata.Load().(*clusterMeta) + return meta +} + +// getMetadataForUpdate returns clusterMeta suitable for updating. +// It is a SHALLOW copy of current metadata in case it was already set or new empty clusterMeta otherwise. +// This function should be called with t.mu mutex locked and the mutex should not be released before +// storing the new metadata. +func (t *tokenAwareHostPolicy) getMetadataForUpdate() *clusterMeta { + metaReadOnly := t.getMetadataReadOnly() + meta := new(clusterMeta) + if metaReadOnly != nil { + *meta = *metaReadOnly + } + return meta } -func (t *tokenAwareHostPolicy) resetTokenRing(partitioner string) { +// resetTokenRing creates a new tokenRing. +// It must be called with t.mu locked. +func (m *clusterMeta) resetTokenRing(partitioner string, hosts []*HostInfo) { if partitioner == "" { // partitioner not yet set return } // create a new token ring - hosts := t.hosts.get() tokenRing, err := newTokenRing(partitioner, hosts) if err != nil { Logger.Printf("Unable to update the token ring due to error: %s", err) @@ -521,16 +564,7 @@ func (t *tokenAwareHostPolicy) resetTokenRing(partitioner string) { } // replace the token ring - t.tokenRing.Store(tokenRing) -} - -func (t *tokenAwareHostPolicy) getReplicas(keyspace string, token token) ([]*HostInfo, bool) { - meta, _ := t.keyspaces.Load().(*keyspaceMeta) - if meta == nil { - return nil, false - } - replicas, ok := meta.replicas[keyspace][token] - return replicas, ok + m.tokenRing = tokenRing } func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { @@ -545,26 +579,29 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { return t.fallback.Pick(qry) } - tr, _ := t.tokenRing.Load().(*tokenRing) - if tr == nil { + meta := t.getMetadataReadOnly() + if meta == nil || meta.tokenRing == nil { return t.fallback.Pick(qry) } - primaryEndpoint, token := tr.GetHostForPartitionKey(routingKey) - if primaryEndpoint == nil || token == nil { - return t.fallback.Pick(qry) - } + token := meta.tokenRing.partitioner.Hash(routingKey) + ht := meta.replicas[qry.Keyspace()].replicasFor(token) - replicas, ok := t.getReplicas(qry.Keyspace(), token) - if !ok { - replicas = []*HostInfo{primaryEndpoint} - } else if t.shuffleReplicas { - replicas = shuffleHosts(replicas) + var replicas []*HostInfo + if ht == nil { + host, _ := meta.tokenRing.GetHostForToken(token) + replicas = []*HostInfo{host} + } else { + replicas = ht.hosts + if t.shuffleReplicas { + replicas = shuffleHosts(replicas) + } } var ( fallbackIter NextHost - i int + i, j int + remote []*HostInfo ) used := make(map[*HostInfo]bool, len(replicas)) @@ -573,12 +610,29 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { h := replicas[i] i++ - if h.IsUp() && t.fallback.IsLocal(h) { + if !t.fallback.IsLocal(h) { + remote = append(remote, h) + continue + } + + if h.IsUp() { used[h] = true return (*selectedHost)(h) } } + if t.nonLocalReplicasFallback { + for j < len(remote) { + h := remote[j] + j++ + + if h.IsUp() { + used[h] = true + return (*selectedHost)(h) + } + } + } + if fallbackIter == nil { // fallback fallbackIter = t.fallback.Pick(qry) @@ -587,9 +641,11 @@ func (t *tokenAwareHostPolicy) Pick(qry ExecutableQuery) NextHost { // filter the token aware selected hosts from the fallback hosts for fallbackHost := fallbackIter(); fallbackHost != nil; fallbackHost = fallbackIter() { if !used[fallbackHost.Info()] { + used[fallbackHost.Info()] = true return fallbackHost } } + return nil } } @@ -737,11 +793,10 @@ func (host selectedHostPoolHost) Mark(err error) { } type dcAwareRR struct { - local string - pos uint32 - mu sync.RWMutex - localHosts cowHostList - remoteHosts cowHostList + local string + localHosts cowHostList + remoteHosts cowHostList + lastUsedHostIdx uint64 } // DCAwareRoundRobinPolicy is a host selection policies which will prioritize and @@ -760,7 +815,7 @@ func (d *dcAwareRR) IsLocal(host *HostInfo) bool { } func (d *dcAwareRR) AddHost(host *HostInfo) { - if host.DataCenter() == d.local { + if d.IsLocal(host) { d.localHosts.add(host) } else { d.remoteHosts.add(host) @@ -768,7 +823,7 @@ func (d *dcAwareRR) AddHost(host *HostInfo) { } func (d *dcAwareRR) RemoveHost(host *HostInfo) { - if host.DataCenter() == d.local { + if d.IsLocal(host) { d.localHosts.remove(host.ConnectAddress()) } else { d.remoteHosts.remove(host.ConnectAddress()) @@ -778,33 +833,53 @@ func (d *dcAwareRR) RemoveHost(host *HostInfo) { func (d *dcAwareRR) HostUp(host *HostInfo) { d.AddHost(host) } func (d *dcAwareRR) HostDown(host *HostInfo) { d.RemoveHost(host) } -func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost { - var i int +// This function is supposed to be called in a fashion +// roundRobbin(offset, hostsPriority1, hostsPriority2, hostsPriority3 ... ) +// +// E.g. for DC-naive strategy: +// roundRobbin(offset, allHosts) +// +// For tiered and DC-aware strategy: +// roundRobbin(offset, localHosts, remoteHosts) +func roundRobbin(shift int, hosts ...[]*HostInfo) NextHost { + currentLayer := 0 + currentlyObserved := 0 + return func() SelectedHost { - var hosts []*HostInfo - localHosts := d.localHosts.get() - remoteHosts := d.remoteHosts.get() - if len(localHosts) != 0 { - hosts = localHosts - } else { - hosts = remoteHosts - } - if len(hosts) == 0 { - return nil - } - // always increment pos to evenly distribute traffic in case of - // failures - pos := atomic.AddUint32(&d.pos, 1) - 1 - if i >= len(localHosts)+len(remoteHosts) { - return nil + // iterate over layers + for { + if currentLayer == len(hosts) { + return nil + } + + currentLayerSize := len(hosts[currentLayer]) + + // iterate over hosts within a layer + for { + currentlyObserved++ + if currentlyObserved > currentLayerSize { + currentLayer++ + currentlyObserved = 0 + break + } + + h := hosts[currentLayer][(shift+currentlyObserved)%currentLayerSize] + + if h.IsUp() { + return (*selectedHost)(h) + } + + } } - host := hosts[(pos)%uint32(len(hosts))] - i++ - return (*selectedHost)(host) } } +func (d *dcAwareRR) Pick(q ExecutableQuery) NextHost { + nextStartOffset := atomic.AddUint64(&d.lastUsedHostIdx, 1) + return roundRobbin(int(nextStartOffset), d.localHosts.get(), d.remoteHosts.get()) +} + // ConvictionPolicy interface is used by gocql to determine if a host should be // marked as DOWN based on the error and host info type ConvictionPolicy interface { @@ -857,10 +932,15 @@ func (c *ConstantReconnectionPolicy) GetMaxRetries() int { type ExponentialReconnectionPolicy struct { MaxRetries int InitialInterval time.Duration + MaxInterval time.Duration } func (e *ExponentialReconnectionPolicy) GetInterval(currentRetry int) time.Duration { - return getExponentialTime(e.InitialInterval, math.MaxInt16*time.Second, e.GetMaxRetries()) + max := e.MaxInterval + if max < e.InitialInterval { + max = math.MaxInt16 * time.Second + } + return getExponentialTime(e.InitialInterval, max, currentRetry) } func (e *ExponentialReconnectionPolicy) GetMaxRetries() int { diff --git a/vendor/github.com/gocql/gocql/prepared_cache.go b/vendor/github.com/gocql/gocql/prepared_cache.go index 3c012a4bbc0c..3abeada21707 100644 --- a/vendor/github.com/gocql/gocql/prepared_cache.go +++ b/vendor/github.com/gocql/gocql/prepared_cache.go @@ -1,6 +1,7 @@ package gocql import ( + "bytes" "github.com/gocql/gocql/internal/lru" "sync" ) @@ -59,6 +60,30 @@ func (p *preparedLRU) execIfMissing(key string, fn func(lru *lru.Cache) *infligh } func (p *preparedLRU) keyFor(addr, keyspace, statement string) string { - // TODO: maybe use []byte for keys? + // TODO: we should just use a struct for the key in the map return addr + keyspace + statement } + +func (p *preparedLRU) evictPreparedID(key string, id []byte) { + p.mu.Lock() + defer p.mu.Unlock() + + val, ok := p.lru.Get(key) + if !ok { + return + } + + ifp, ok := val.(*inflightPrepare) + if !ok { + return + } + + select { + case <-ifp.done: + if bytes.Equal(id, ifp.preparedStatment.id) { + p.lru.Remove(key) + } + default: + } + +} diff --git a/vendor/github.com/gocql/gocql/session.go b/vendor/github.com/gocql/gocql/session.go index 92261cc62e32..c627e4bbd968 100644 --- a/vendor/github.com/gocql/gocql/session.go +++ b/vendor/github.com/gocql/gocql/session.go @@ -68,7 +68,8 @@ type Session struct { cfg ClusterConfig - quit chan struct{} + ctx context.Context + cancel context.CancelFunc closeMu sync.RWMutex isClosed bool @@ -113,14 +114,18 @@ func NewSession(cfg ClusterConfig) (*Session, error) { return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.") } + // TODO: we should take a context in here at some point + ctx, cancel := context.WithCancel(context.TODO()) + s := &Session{ cons: cfg.Consistency, prefetch: 0.25, cfg: cfg, pageSize: cfg.PageSize, stmtsLRU: &preparedLRU{lru: lru.New(cfg.MaxPreparedStmts)}, - quit: make(chan struct{}), connectObserver: cfg.ConnectObserver, + ctx: ctx, + cancel: cancel, } s.schemaDescriber = newSchemaDescriber(s) @@ -221,9 +226,28 @@ func (s *Session) init() error { hostMap[host.ConnectAddress().String()] = host } + hosts = hosts[:0] for _, host := range hostMap { host = s.ring.addOrUpdate(host) - s.addNewNode(host) + if s.cfg.filterHost(host) { + continue + } + + host.setState(NodeUp) + s.pool.addHost(host) + + hosts = append(hosts, host) + } + + type bulkAddHosts interface { + AddHosts([]*HostInfo) + } + if v, ok := s.policy.(bulkAddHosts); ok { + v.AddHosts(hosts) + } else { + for _, host := range hosts { + s.policy.AddHost(host) + } } // TODO(zariel): we probably dont need this any more as we verify that we @@ -259,6 +283,21 @@ func (s *Session) init() error { return nil } +// AwaitSchemaAgreement will wait until schema versions across all nodes in the +// cluster are the same (as seen from the point of view of the control connection). +// The maximum amount of time this takes is governed +// by the MaxWaitSchemaAgreement setting in the configuration (default: 60s). +// AwaitSchemaAgreement returns an error in case schema versions are not the same +// after the timeout specified in MaxWaitSchemaAgreement elapses. +func (s *Session) AwaitSchemaAgreement(ctx context.Context) error { + if s.cfg.disableControlConn { + return errNoControl + } + return s.control.withConn(func(conn *Conn) *Iter { + return &Iter{err: conn.awaitSchemaAgreement(ctx)} + }).err +} + func (s *Session) reconnectDownedHosts(intv time.Duration) { reconnectTicker := time.NewTicker(intv) defer reconnectTicker.Stop() @@ -283,7 +322,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { } s.handleNodeUp(h.ConnectAddress(), h.Port(), true) } - case <-s.quit: + case <-s.ctx.Done(): return } } @@ -386,8 +425,8 @@ func (s *Session) Close() { s.schemaEvents.stop() } - if s.quit != nil { - close(s.quit) + if s.cancel != nil { + s.cancel() } } @@ -616,7 +655,7 @@ func (s *Session) ExecuteBatch(batch *Batch) error { } // ExecuteBatchCAS executes a batch operation and returns true if successful and -// an iterator (to scan aditional rows if more than one conditional statement) +// an iterator (to scan additional rows if more than one conditional statement) // was sent. // Further scans on the interator must also remember to include // the applied boolean as the first argument to *Iter.Scan @@ -657,13 +696,102 @@ func (s *Session) MapExecuteBatchCAS(batch *Batch, dest map[string]interface{}) } type hostMetrics struct { - Attempts int + // Attempts is count of how many times this query has been attempted for this host. + // An attempt is either a retry or fetching next page of results. + Attempts int + + // TotalLatency is the sum of attempt latencies for this host in nanoseconds. TotalLatency int64 } type queryMetrics struct { l sync.RWMutex m map[string]*hostMetrics + // totalAttempts is total number of attempts. + // Equal to sum of all hostMetrics' Attempts. + totalAttempts int +} + +// preFilledQueryMetrics initializes new queryMetrics based on per-host supplied data. +func preFilledQueryMetrics(m map[string]*hostMetrics) *queryMetrics { + qm := &queryMetrics{m: m} + for _, hm := range qm.m { + qm.totalAttempts += hm.Attempts + } + return qm +} + +// hostMetrics returns a snapshot of metrics for given host. +// If the metrics for host don't exist, they are created. +func (qm *queryMetrics) hostMetrics(host *HostInfo) *hostMetrics { + qm.l.Lock() + metrics := qm.hostMetricsLocked(host) + copied := new(hostMetrics) + *copied = *metrics + qm.l.Unlock() + return copied +} + +// hostMetricsLocked gets or creates host metrics for given host. +// It must be called only while holding qm.l lock. +func (qm *queryMetrics) hostMetricsLocked(host *HostInfo) *hostMetrics { + metrics, exists := qm.m[host.ConnectAddress().String()] + if !exists { + // if the host is not in the map, it means it's been accessed for the first time + metrics = &hostMetrics{} + qm.m[host.ConnectAddress().String()] = metrics + } + + return metrics +} + +// attempts returns the number of times the query was executed. +func (qm *queryMetrics) attempts() int { + qm.l.Lock() + attempts := qm.totalAttempts + qm.l.Unlock() + return attempts +} + +func (qm *queryMetrics) latency() int64 { + qm.l.Lock() + var ( + attempts int + latency int64 + ) + for _, metric := range qm.m { + attempts += metric.Attempts + latency += metric.TotalLatency + } + qm.l.Unlock() + if attempts > 0 { + return latency / int64(attempts) + } + return 0 +} + +// attempt adds given number of attempts and latency for given host. +// It returns previous total attempts. +// If needsHostMetrics is true, a copy of updated hostMetrics is returned. +func (qm *queryMetrics) attempt(addAttempts int, addLatency time.Duration, + host *HostInfo, needsHostMetrics bool) (int, *hostMetrics) { + qm.l.Lock() + + totalAttempts := qm.totalAttempts + qm.totalAttempts += addAttempts + + updateHostMetrics := qm.hostMetricsLocked(host) + updateHostMetrics.Attempts += addAttempts + updateHostMetrics.TotalLatency += addLatency.Nanoseconds() + + var hostMetricsCopy *hostMetrics + if needsHostMetrics { + hostMetricsCopy = new(hostMetrics) + *hostMetricsCopy = *updateHostMetrics + } + + qm.l.Unlock() + return totalAttempts, hostMetricsCopy } // Query represents a CQL statement that can be executed. @@ -673,7 +801,6 @@ type Query struct { cons Consistency pageSize int routingKey []byte - routingKeyBuffer []byte pageState []byte prefetch float64 trace Tracer @@ -692,6 +819,13 @@ type Query struct { metrics *queryMetrics disableAutoPage bool + + // getKeyspace is field so that it can be overriden in tests + getKeyspace func() string + + // used by control conn queries to prevent triggering a write to systems + // tables in AWS MCS see + skipPrepare bool } func (q *Query) defaultsFromSession() { @@ -713,19 +847,6 @@ func (q *Query) defaultsFromSession() { s.mu.RUnlock() } -func (q *Query) getHostMetrics(host *HostInfo) *hostMetrics { - q.metrics.l.Lock() - metrics, exists := q.metrics.m[host.ConnectAddress().String()] - if !exists { - // if the host is not in the map, it means it's been accessed for the first time - metrics = &hostMetrics{} - q.metrics.m[host.ConnectAddress().String()] = metrics - } - q.metrics.l.Unlock() - - return metrics -} - // Statement returns the statement that was used to generate this query. func (q Query) Statement() string { return q.stmt @@ -738,43 +859,20 @@ func (q Query) String() string { //Attempts returns the number of times the query was executed. func (q *Query) Attempts() int { - q.metrics.l.Lock() - var attempts int - for _, metric := range q.metrics.m { - attempts += metric.Attempts - } - q.metrics.l.Unlock() - return attempts + return q.metrics.attempts() } func (q *Query) AddAttempts(i int, host *HostInfo) { - hostMetric := q.getHostMetrics(host) - q.metrics.l.Lock() - hostMetric.Attempts += i - q.metrics.l.Unlock() + q.metrics.attempt(i, 0, host, false) } //Latency returns the average amount of nanoseconds per attempt of the query. func (q *Query) Latency() int64 { - q.metrics.l.Lock() - var attempts int - var latency int64 - for _, metric := range q.metrics.m { - attempts += metric.Attempts - latency += metric.TotalLatency - } - q.metrics.l.Unlock() - if attempts > 0 { - return latency / int64(attempts) - } - return 0 + return q.metrics.latency() } func (q *Query) AddLatency(l int64, host *HostInfo) { - hostMetric := q.getHostMetrics(host) - q.metrics.l.Lock() - hostMetric.TotalLatency += l - q.metrics.l.Unlock() + q.metrics.attempt(0, time.Duration(l)*time.Nanosecond, host, false) } // Consistency sets the consistency level for this query. If no consistency @@ -846,7 +944,7 @@ func (q *Query) DefaultTimestamp(enable bool) *Query { // WithTimestamp will enable the with default timestamp flag on the query // like DefaultTimestamp does. But also allows to define value for timestamp. // It works the same way as USING TIMESTAMP in the query itself, but -// should not break prepared query optimization +// should not break prepared query optimization. // // Only available on protocol >= 3 func (q *Query) WithTimestamp(timestamp int64) *Query { @@ -889,8 +987,8 @@ func (q *Query) execute(ctx context.Context, conn *Conn) *Iter { } func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { - q.AddAttempts(1, host) - q.AddLatency(end.Sub(start).Nanoseconds(), host) + latency := end.Sub(start) + attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.observer != nil) if q.observer != nil { q.observer.ObserveQuery(q.Context(), ObservedQuery{ @@ -900,8 +998,9 @@ func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host End: end, Rows: iter.numRows, Host: host, - Metrics: q.getHostMetrics(host), + Metrics: metricsForHost, Err: iter.err, + Attempt: attempt, }) } } @@ -912,6 +1011,9 @@ func (q *Query) retryPolicy() RetryPolicy { // Keyspace returns the keyspace the query will be executed against. func (q *Query) Keyspace() string { + if q.getKeyspace != nil { + return q.getKeyspace() + } if q.session == nil { return "" } @@ -942,46 +1044,7 @@ func (q *Query) GetRoutingKey() ([]byte, error) { return nil, err } - if routingKeyInfo == nil { - return nil, nil - } - - if len(routingKeyInfo.indexes) == 1 { - // single column routing key - routingKey, err := Marshal( - routingKeyInfo.types[0], - q.values[routingKeyInfo.indexes[0]], - ) - if err != nil { - return nil, err - } - return routingKey, nil - } - - // We allocate that buffer only once, so that further re-bind/exec of the - // same query don't allocate more memory. - if q.routingKeyBuffer == nil { - q.routingKeyBuffer = make([]byte, 0, 256) - } - - // composite routing key - buf := bytes.NewBuffer(q.routingKeyBuffer) - for i := range routingKeyInfo.indexes { - encoded, err := Marshal( - routingKeyInfo.types[i], - q.values[routingKeyInfo.indexes[i]], - ) - if err != nil { - return nil, err - } - lenBuf := []byte{0x00, 0x00} - binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded))) - buf.Write(lenBuf) - buf.Write(encoded) - buf.WriteByte(0x00) - } - routingKey := buf.Bytes() - return routingKey, nil + return createRoutingKey(routingKeyInfo, q.values) } func (q *Query) shouldPrepare() bool { @@ -1046,6 +1109,7 @@ func (q *Query) Idempotent(value bool) *Query { // to an existing query instance. func (q *Query) Bind(v ...interface{}) *Query { q.values = v + q.pageState = nil return q } @@ -1071,7 +1135,7 @@ func (q *Query) PageState(state []byte) *Query { // NoSkipMetadata will override the internal result metadata cache so that the driver does not // send skip_metadata for queries, this means that the result will always contain // the metadata to parse the rows and will not reuse the metadata from the prepared -// staement. This should only be used to work around cassandra bugs, such as when using +// statement. This should only be used to work around cassandra bugs, such as when using // CAS operations which do not end in Cas. // // See https://issues.apache.org/jira/browse/CASSANDRA-11099 @@ -1471,10 +1535,13 @@ type Batch struct { Type BatchType Entries []BatchEntry Cons Consistency + routingKey []byte + routingKeyBuffer []byte CustomPayload map[string][]byte rt RetryPolicy spec SpeculativeExecutionPolicy observer BatchObserver + session *Session serialCons SerialConsistency defaultTimestamp bool defaultTimestampValue int64 @@ -1503,6 +1570,7 @@ func (s *Session) NewBatch(typ BatchType) *Batch { rt: s.cfg.RetryPolicy, serialCons: s.cfg.SerialConsistency, observer: s.batchObserver, + session: s, Cons: s.cons, defaultTimestamp: s.cfg.DefaultTimestamp, keyspace: s.cfg.Keyspace, @@ -1514,19 +1582,6 @@ func (s *Session) NewBatch(typ BatchType) *Batch { return batch } -func (b *Batch) getHostMetrics(host *HostInfo) *hostMetrics { - b.metrics.l.Lock() - metrics, exists := b.metrics.m[host.ConnectAddress().String()] - if !exists { - // if the host is not in the map, it means it's been accessed for the first time - metrics = &hostMetrics{} - b.metrics.m[host.ConnectAddress().String()] = metrics - } - b.metrics.l.Unlock() - - return metrics -} - // Observer enables batch-level observer on this batch. // The provided observer will be called every time this batched query is executed. func (b *Batch) Observer(observer BatchObserver) *Batch { @@ -1540,47 +1595,20 @@ func (b *Batch) Keyspace() string { // Attempts returns the number of attempts made to execute the batch. func (b *Batch) Attempts() int { - b.metrics.l.Lock() - defer b.metrics.l.Unlock() - - var attempts int - for _, metric := range b.metrics.m { - attempts += metric.Attempts - } - return attempts + return b.metrics.attempts() } func (b *Batch) AddAttempts(i int, host *HostInfo) { - hostMetric := b.getHostMetrics(host) - b.metrics.l.Lock() - hostMetric.Attempts += i - b.metrics.l.Unlock() + b.metrics.attempt(i, 0, host, false) } //Latency returns the average number of nanoseconds to execute a single attempt of the batch. func (b *Batch) Latency() int64 { - b.metrics.l.Lock() - defer b.metrics.l.Unlock() - - var ( - attempts int - latency int64 - ) - for _, metric := range b.metrics.m { - attempts += metric.Attempts - latency += metric.TotalLatency - } - if attempts > 0 { - return latency / int64(attempts) - } - return 0 + return b.metrics.latency() } func (b *Batch) AddLatency(l int64, host *HostInfo) { - hostMetric := b.getHostMetrics(host) - b.metrics.l.Lock() - hostMetric.TotalLatency += l - b.metrics.l.Unlock() + b.metrics.attempt(0, time.Duration(l)*time.Nanosecond, host, false) } // GetConsistency returns the currently configured consistency level for the batch @@ -1694,7 +1722,7 @@ func (b *Batch) DefaultTimestamp(enable bool) *Batch { // WithTimestamp will enable the with default timestamp flag on the query // like DefaultTimestamp does. But also allows to define value for timestamp. // It works the same way as USING TIMESTAMP in the query itself, but -// should not break prepared query optimization +// should not break prepared query optimization. // // Only available on protocol >= 3 func (b *Batch) WithTimestamp(timestamp int64) *Batch { @@ -1704,8 +1732,8 @@ func (b *Batch) WithTimestamp(timestamp int64) *Batch { } func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) { - b.AddAttempts(1, host) - b.AddLatency(end.Sub(start).Nanoseconds(), host) + latency := end.Sub(start) + _, metricsForHost := b.metrics.attempt(1, latency, host, b.observer != nil) if b.observer == nil { return @@ -1723,14 +1751,69 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host End: end, // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS Host: host, - Metrics: b.getHostMetrics(host), + Metrics: metricsForHost, Err: iter.err, }) } func (b *Batch) GetRoutingKey() ([]byte, error) { - // TODO: use the first statement in the batch as the routing key? - return nil, nil + if b.routingKey != nil { + return b.routingKey, nil + } + + if len(b.Entries) == 0 { + return nil, nil + } + + entry := b.Entries[0] + if entry.binding != nil { + // bindings do not have the values let's skip it like Query does. + return nil, nil + } + // try to determine the routing key + routingKeyInfo, err := b.session.routingKeyInfo(b.Context(), entry.Stmt) + if err != nil { + return nil, err + } + + return createRoutingKey(routingKeyInfo, entry.Args) +} + +func createRoutingKey(routingKeyInfo *routingKeyInfo, values []interface{}) ([]byte, error) { + if routingKeyInfo == nil { + return nil, nil + } + + if len(routingKeyInfo.indexes) == 1 { + // single column routing key + routingKey, err := Marshal( + routingKeyInfo.types[0], + values[routingKeyInfo.indexes[0]], + ) + if err != nil { + return nil, err + } + return routingKey, nil + } + + // composite routing key + buf := bytes.NewBuffer(make([]byte, 0, 256)) + for i := range routingKeyInfo.indexes { + encoded, err := Marshal( + routingKeyInfo.types[i], + values[routingKeyInfo.indexes[i]], + ) + if err != nil { + return nil, err + } + lenBuf := []byte{0x00, 0x00} + binary.BigEndian.PutUint16(lenBuf, uint16(len(encoded))) + buf.Write(lenBuf) + buf.Write(encoded) + buf.WriteByte(0x00) + } + routingKey := buf.Bytes() + return routingKey, nil } type BatchType byte @@ -1883,6 +1966,11 @@ type ObservedQuery struct { // Err is the error in the query. // It only tracks network errors or errors of bad cassandra syntax, in particular selects with no match return nil error Err error + + // Attempt is the index of attempt at executing this query. + // An attempt might be either retry or fetching next page of a query. + // The first attempt is number zero and any retries have non-zero attempt number. + Attempt int } // QueryObserver is the interface implemented by query observers / stat collectors. diff --git a/vendor/github.com/gocql/gocql/token.go b/vendor/github.com/gocql/gocql/token.go index e32cea7e1559..9ae69b67adcf 100644 --- a/vendor/github.com/gocql/gocql/token.go +++ b/vendor/github.com/gocql/gocql/token.go @@ -130,11 +130,21 @@ func (ht hostToken) String() string { // a data structure for organizing the relationship between tokens and hosts type tokenRing struct { partitioner partitioner - tokens []hostToken + + // tokens map token range to primary replica. + // The elements in tokens are sorted by token ascending. + // The range for a given item in tokens starts after preceding range and ends with the token specified in + // token. The end token is part of the range. + // The lowest (i.e. index 0) range wraps around the ring (its preceding range is the one with largest index). + tokens []hostToken + + hosts []*HostInfo } func newTokenRing(partitioner string, hosts []*HostInfo) (*tokenRing, error) { - tokenRing := &tokenRing{} + tokenRing := &tokenRing{ + hosts: hosts, + } if strings.HasSuffix(partitioner, "Murmur3Partitioner") { tokenRing.partitioner = murmur3Partitioner{} @@ -192,29 +202,21 @@ func (t *tokenRing) String() string { return string(buf.Bytes()) } -func (t *tokenRing) GetHostForPartitionKey(partitionKey []byte) (host *HostInfo, endToken token) { - if t == nil { - return nil, nil - } - - return t.GetHostForToken(t.partitioner.Hash(partitionKey)) -} - func (t *tokenRing) GetHostForToken(token token) (host *HostInfo, endToken token) { if t == nil || len(t.tokens) == 0 { return nil, nil } // find the primary replica - ringIndex := sort.Search(len(t.tokens), func(i int) bool { + p := sort.Search(len(t.tokens), func(i int) bool { return !t.tokens[i].token.Less(token) }) - if ringIndex == len(t.tokens) { + if p == len(t.tokens) { // wrap around to the first in the ring - ringIndex = 0 + p = 0 } - v := t.tokens[ringIndex] + v := t.tokens[p] return v.host, v.token } diff --git a/vendor/github.com/gocql/gocql/topology.go b/vendor/github.com/gocql/gocql/topology.go index 59d737e9055d..3f8a75e8cff1 100644 --- a/vendor/github.com/gocql/gocql/topology.go +++ b/vendor/github.com/gocql/gocql/topology.go @@ -2,12 +2,47 @@ package gocql import ( "fmt" + "sort" "strconv" "strings" ) +type hostTokens struct { + // token is end (inclusive) of token range these hosts belong to + token token + hosts []*HostInfo +} + +// tokenRingReplicas maps token ranges to list of replicas. +// The elements in tokenRingReplicas are sorted by token ascending. +// The range for a given item in tokenRingReplicas starts after preceding range and ends with the token specified in +// token. The end token is part of the range. +// The lowest (i.e. index 0) range wraps around the ring (its preceding range is the one with largest index). +type tokenRingReplicas []hostTokens + +func (h tokenRingReplicas) Less(i, j int) bool { return h[i].token.Less(h[j].token) } +func (h tokenRingReplicas) Len() int { return len(h) } +func (h tokenRingReplicas) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h tokenRingReplicas) replicasFor(t token) *hostTokens { + if len(h) == 0 { + return nil + } + + p := sort.Search(len(h), func(i int) bool { + return !h[i].token.Less(t) + }) + + if p >= len(h) { + // rollover + p = 0 + } + + return &h[p] +} + type placementStrategy interface { - replicaMap(hosts []*HostInfo, tokens []hostToken) map[token][]*HostInfo + replicaMap(tokenRing *tokenRing) tokenRingReplicas replicationFactor(dc string) int } @@ -63,20 +98,28 @@ func (s *simpleStrategy) replicationFactor(dc string) int { return s.rf } -func (s *simpleStrategy) replicaMap(_ []*HostInfo, tokens []hostToken) map[token][]*HostInfo { - tokenRing := make(map[token][]*HostInfo, len(tokens)) +func (s *simpleStrategy) replicaMap(tokenRing *tokenRing) tokenRingReplicas { + tokens := tokenRing.tokens + ring := make(tokenRingReplicas, len(tokens)) for i, th := range tokens { replicas := make([]*HostInfo, 0, s.rf) + seen := make(map[*HostInfo]bool) + for j := 0; j < len(tokens) && len(replicas) < s.rf; j++ { - // TODO: need to ensure we dont add the same hosts twice h := tokens[(i+j)%len(tokens)] - replicas = append(replicas, h.host) + if !seen[h.host] { + replicas = append(replicas, h.host) + seen[h.host] = true + } } - tokenRing[th.token] = replicas + + ring[i] = hostTokens{th.token, replicas} } - return tokenRing + sort.Sort(ring) + + return ring } type networkTopology struct { @@ -101,10 +144,16 @@ func (n *networkTopology) haveRF(replicaCounts map[string]int) bool { return true } -func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[token][]*HostInfo { - dcRacks := make(map[string]map[string]struct{}) +func (n *networkTopology) replicaMap(tokenRing *tokenRing) tokenRingReplicas { + dcRacks := make(map[string]map[string]struct{}, len(n.dcs)) + // skipped hosts in a dc + skipped := make(map[string][]*HostInfo, len(n.dcs)) + // number of replicas per dc + replicasInDC := make(map[string]int, len(n.dcs)) + // dc -> racks + seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs)) - for _, h := range hosts { + for _, h := range tokenRing.hosts { dc := h.DataCenter() rack := h.Rack() @@ -116,33 +165,51 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ racks[rack] = struct{}{} } - tokenRing := make(map[token][]*HostInfo, len(tokens)) + for dc, racks := range dcRacks { + replicasInDC[dc] = 0 + seenDCRacks[dc] = make(map[string]struct{}, len(racks)) + } + + tokens := tokenRing.tokens + replicaRing := make(tokenRingReplicas, 0, len(tokens)) var totalRF int for _, rf := range n.dcs { totalRF += rf } - for i, th := range tokens { - // number of replicas per dc - // TODO: recycle these - replicasInDC := make(map[string]int, len(n.dcs)) - // dc -> racks - seenDCRacks := make(map[string]map[string]struct{}, len(n.dcs)) - // skipped hosts in a dc - skipped := make(map[string][]*HostInfo, len(n.dcs)) + for i, th := range tokenRing.tokens { + if rf := n.dcs[th.host.DataCenter()]; rf == 0 { + // skip this token since no replica in this datacenter. + continue + } + + for k, v := range skipped { + skipped[k] = v[:0] + } + + for dc := range n.dcs { + replicasInDC[dc] = 0 + for rack := range seenDCRacks[dc] { + delete(seenDCRacks[dc], rack) + } + } replicas := make([]*HostInfo, 0, totalRF) - for j := 0; j < len(tokens) && !n.haveRF(replicasInDC); j++ { + for j := 0; j < len(tokens) && (len(replicas) < totalRF && !n.haveRF(replicasInDC)); j++ { // TODO: ensure we dont add the same host twice - h := tokens[(i+j)%len(tokens)].host + p := i + j + if p >= len(tokens) { + p -= len(tokens) + } + h := tokens[p].host dc := h.DataCenter() rack := h.Rack() - rf, ok := n.dcs[dc] - if !ok { - // skip this DC, dont know about it + rf := n.dcs[dc] + if rf == 0 { + // skip this DC, dont know about it or replication factor is zero continue } else if replicasInDC[dc] >= rf { if replicasInDC[dc] > rf { @@ -154,13 +221,6 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ } else if _, ok := dcRacks[dc][rack]; !ok { // dont know about this rack continue - } else if len(replicas) >= totalRF { - if replicasInDC[dc] > rf { - panic(fmt.Sprintf("replica overflow. total rf=%d have=%d", totalRF, len(replicas))) - } - - // we now have enough replicas - break } racks := seenDCRacks[dc] @@ -177,7 +237,7 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ // new rack racks[rack] = struct{}{} replicas = append(replicas, h) - replicasInDC[dc]++ + r := replicasInDC[dc] + 1 if len(racks) == len(dcRacks[dc]) { // if we have been through all the racks, drain the rest of the skipped @@ -185,13 +245,14 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ // above skippedHosts := skipped[dc] var k int - for ; k < len(skippedHosts) && replicasInDC[dc] < rf; k++ { + for ; k < len(skippedHosts) && r+k < rf; k++ { sh := skippedHosts[k] replicas = append(replicas, sh) - replicasInDC[dc]++ } + r += k skipped[dc] = skippedHosts[k:] } + replicasInDC[dc] = r } else { // already seen this rack, keep hold of this host incase // we dont get enough for rf @@ -199,16 +260,18 @@ func (n *networkTopology) replicaMap(hosts []*HostInfo, tokens []hostToken) map[ } } - if len(replicas) == 0 || replicas[0] != th.host { - panic("first replica is not the primary replica for the token") + if len(replicas) == 0 { + panic(fmt.Sprintf("no replicas for token: %v", th.token)) + } else if !replicas[0].Equal(th.host) { + panic(fmt.Sprintf("first replica is not the primary replica for the token: expected %v got %v", replicas[0].ConnectAddress(), th.host.ConnectAddress())) } - tokenRing[th.token] = replicas + replicaRing = append(replicaRing, hostTokens{th.token, replicas}) } - if len(tokenRing) != len(tokens) { - panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(tokenRing), len(tokens))) + if len(n.dcs) == len(dcRacks) && len(replicaRing) != len(tokens) { + panic(fmt.Sprintf("token map different size to token ring: got %d expected %d", len(replicaRing), len(tokens))) } - return tokenRing + return replicaRing } diff --git a/vendor/github.com/gocql/gocql/uuid.go b/vendor/github.com/gocql/gocql/uuid.go index 7ca4c087a6f6..13ad38379692 100644 --- a/vendor/github.com/gocql/gocql/uuid.go +++ b/vendor/github.com/gocql/gocql/uuid.go @@ -112,21 +112,64 @@ func RandomUUID() (UUID, error) { var timeBase = time.Date(1582, time.October, 15, 0, 0, 0, 0, time.UTC).Unix() +// getTimestamp converts time to UUID (version 1) timestamp. +// It must be an interval of 100-nanoseconds since timeBase. +func getTimestamp(t time.Time) int64 { + utcTime := t.In(time.UTC) + ts := int64(utcTime.Unix()-timeBase)*10000000 + int64(utcTime.Nanosecond()/100) + + return ts +} + // TimeUUID generates a new time based UUID (version 1) using the current // time as the timestamp. func TimeUUID() UUID { return UUIDFromTime(time.Now()) } +// The min and max clock values for a UUID. +// +// Cassandra's TimeUUIDType compares the lsb parts as signed byte arrays. +// Thus, the min value for each byte is -128 and the max is +127. +const ( + minClock = 0x8080 + maxClock = 0x7f7f +) + +// The min and max node values for a UUID. +// +// See explanation about Cassandra's TimeUUIDType comparison logic above. +var ( + minNode = []byte{0x80, 0x80, 0x80, 0x80, 0x80, 0x80} + maxNode = []byte{0x7f, 0x7f, 0x7f, 0x7f, 0x7f, 0x7f} +) + +// MinTimeUUID generates a "fake" time based UUID (version 1) which will be +// the smallest possible UUID generated for the provided timestamp. +// +// UUIDs generated by this function are not unique and are mostly suitable only +// in queries to select a time range of a Cassandra's TimeUUID column. +func MinTimeUUID(t time.Time) UUID { + return TimeUUIDWith(getTimestamp(t), minClock, minNode) +} + +// MaxTimeUUID generates a "fake" time based UUID (version 1) which will be +// the biggest possible UUID generated for the provided timestamp. +// +// UUIDs generated by this function are not unique and are mostly suitable only +// in queries to select a time range of a Cassandra's TimeUUID column. +func MaxTimeUUID(t time.Time) UUID { + return TimeUUIDWith(getTimestamp(t), maxClock, maxNode) +} + // UUIDFromTime generates a new time based UUID (version 1) as described in // RFC 4122. This UUID contains the MAC address of the node that generated // the UUID, the given timestamp and a sequence number. -func UUIDFromTime(aTime time.Time) UUID { - utcTime := aTime.In(time.UTC) - t := int64(utcTime.Unix()-timeBase)*10000000 + int64(utcTime.Nanosecond()/100) +func UUIDFromTime(t time.Time) UUID { + ts := getTimestamp(t) clock := atomic.AddUint32(&clockSeq, 1) - return TimeUUIDWith(t, clock, hardwareAddr) + return TimeUUIDWith(ts, clock, hardwareAddr) } // TimeUUIDWith generates a new time based UUID (version 1) as described in diff --git a/vendor/modules.txt b/vendor/modules.txt index 3cbe15e7ab7f..902d4cc2df1f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -303,7 +303,7 @@ github.com/go-stack/stack github.com/go-test/deep # github.com/go-yaml/yaml v2.1.0+incompatible github.com/go-yaml/yaml -# github.com/gocql/gocql v0.0.0-20190402132108-0e1d5de854df +# github.com/gocql/gocql v0.0.0-20200624222514-34081eda590e github.com/gocql/gocql github.com/gocql/gocql/internal/lru github.com/gocql/gocql/internal/murmur