Skip to content

Commit

Permalink
refactor: divide lazy conditional refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
proost committed Sep 21, 2024
1 parent 09a9403 commit fa0b58a
Showing 1 changed file with 128 additions and 1 deletion.
129 changes: 128 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (c *clusterClient) lazyRefresh() {
c.sc.LazyDo(time.Second, c._refresh)
}

func (c *clusterClient) lazyConditionalRefresh() {
c.sc.LazyDo(time.Second, c.conditionalRefresh)
}

type clusterslots struct {
addr string
reply RedisResult
Expand Down Expand Up @@ -181,6 +185,129 @@ func (c *clusterClient) _refresh() (err error) {
}
pending = nil

groups := result.parse(c.opt.TLSConfig != nil)
conns := make(map[string]connrole, len(groups))
for master, g := range groups {
conns[master] = connrole{conn: c.connFn(master, c.opt), replica: false}
for _, addr := range g.nodes[1:] {
if c.rOpt != nil {
conns[addr] = connrole{conn: c.connFn(addr, c.rOpt), replica: true}
} else {
conns[addr] = connrole{conn: c.connFn(addr, c.opt), replica: true}
}
}
}
// make sure InitAddress always be present
for _, addr := range c.opt.InitAddress {
if _, ok := conns[addr]; !ok {
conns[addr] = connrole{
conn: c.connFn(addr, c.opt),
}
}
}

var removes []conn

c.mu.RLock()
for addr, cc := range c.conns {
fresh, ok := conns[addr]
if ok && (cc.replica == fresh.replica || c.rOpt == nil) {
conns[addr] = connrole{
conn: cc.conn,
replica: fresh.replica,
}
} else {
removes = append(removes, cc.conn)
}
}
c.mu.RUnlock()

pslots := [16384]conn{}
var rslots []conn
for master, g := range groups {
switch {
case c.opt.ReplicaOnly && len(g.nodes) > 1:
nodesCount := len(g.nodes)
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
pslots[i] = conns[g.nodes[1+util.FastRand(nodesCount-1)]].conn
}
}
case c.rOpt != nil: // implies c.opt.SendToReplicas != nil
if len(rslots) == 0 { // lazy init
rslots = make([]conn, 16384)
}
if len(g.nodes) > 1 {
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
pslots[i] = conns[master].conn
rslots[i] = conns[g.nodes[1+util.FastRand(len(g.nodes)-1)]].conn
}
}
} else {
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
pslots[i] = conns[master].conn
rslots[i] = conns[master].conn
}
}
}
default:
for _, slot := range g.slots {
for i := slot[0]; i <= slot[1]; i++ {
pslots[i] = conns[master].conn
}
}
}
}

c.mu.Lock()
c.pslots = pslots
c.rslots = rslots
c.conns = conns
c.mu.Unlock()

if len(removes) > 0 {
go func(removes []conn) {
time.Sleep(time.Second * 5)
for _, cc := range removes {
cc.Close()
}
}(removes)
}

return nil
}

func (c *clusterClient) conditionalRefresh() (err error) {
c.mu.RLock()
results := make(chan clusterslots, len(c.conns))
pending := make([]conn, 0, len(c.conns))
for _, cc := range c.conns {
pending = append(pending, cc.conn)
}
c.mu.RUnlock()

var result clusterslots
for i := 0; i < cap(results); i++ {
if i&3 == 0 { // batch CLUSTER SLOTS/CLUSTER SHARDS for every 4 connections
for j := i; j < i+4 && j < len(pending); j++ {
go func(c conn, timeout time.Duration) {
results <- getClusterSlots(c, timeout)
}(pending[j], c.opt.ConnWriteTimeout)
}
}
result = <-results
err = result.reply.Error()
if len(result.reply.val.values) != 0 {
break
}
}
if err != nil {
return err
}
pending = nil

groups := result.parse(c.opt.TLSConfig != nil)

// we need to check whether the new topology is different from the current one.
Expand Down Expand Up @@ -418,7 +545,7 @@ func (c *clusterClient) runClusterTopologyRefreshment() {
case <-c.stopCh:
return
case <-ticker.C:
c.lazyRefresh()
c.lazyConditionalRefresh()
}
}
}
Expand Down

0 comments on commit fa0b58a

Please sign in to comment.