Skip to content

Commit

Permalink
Add test of imports failing when clients reconnect too fast
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@nats.io>
  • Loading branch information
wallyqs committed Jun 24, 2024
1 parent c8090e7 commit 58642d3
Showing 1 changed file with 191 additions and 0 deletions.
191 changes: 191 additions & 0 deletions server/jwt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package server

import (
"bufio"
"context"
"encoding/base64"
"encoding/json"
"errors"
Expand Down Expand Up @@ -6842,3 +6843,193 @@ func TestJWTAccountNATSResolverWrongCreds(t *testing.T) {
t.Fatalf("Expected auth error: %v", err)
}
}

// Issue 5480: https://github.com/nats-io/nats-server/issues/5480
func TestJWTImportsOnServerRestartAndClientsReconnect(t *testing.T) {
type namedCreds struct {
name string
creds nats.Option
}
preload := make(map[string]string)
users := make(map[string]*namedCreds)

// sys account
_, sysAcc, sysAccClaim := NewJwtAccountClaim("sys")
sysAccJWT, err := sysAccClaim.Encode(oKp)
require_NoError(t, err)
preload[sysAcc] = sysAccJWT

// main account, other accounts will import from this.
mainAccKP, mainAcc, mainAccClaim := NewJwtAccountClaim("main")
mainAccClaim.Exports.Add(&jwt.Export{
Type: jwt.Stream,
Subject: "city.>",
})

// main account user
mainUserClaim := jwt.NewUserClaims("publisher")
mainUserClaim.Permissions = jwt.Permissions{
Pub: jwt.Permission{
Allow: []string{"city.>"},
},
}
mainCreds := createUserCredsEx(t, mainUserClaim, mainAccKP)

// The main account will be importing from all other accounts.
maxAccounts := 100
for i := 0; i < maxAccounts; i++ {
name := fmt.Sprintf("secondary-%d", i)
accKP, acc, accClaim := NewJwtAccountClaim(name)

accClaim.Exports.Add(&jwt.Export{
Type: jwt.Stream,
Subject: "internal.*",
})
accClaim.Imports.Add(&jwt.Import{
Type: jwt.Stream,
Subject: jwt.Subject(fmt.Sprintf("city.%d-1.*", i)),
Account: mainAcc,
})

// main account imports from the secondary accounts
mainAccClaim.Imports.Add(&jwt.Import{
Type: jwt.Stream,
Subject: jwt.Subject(fmt.Sprintf("internal.%d", i)),
Account: acc,
})

accJWT, err := accClaim.Encode(oKp)
require_NoError(t, err)
preload[acc] = accJWT

userClaim := jwt.NewUserClaims("subscriber")
userClaim.Permissions = jwt.Permissions{
Sub: jwt.Permission{
Allow: []string{"city.>", "internal.*"},
},
Pub: jwt.Permission{
Allow: []string{"internal.*"},
},
}
userCreds := createUserCredsEx(t, userClaim, accKP)
users[acc] = &namedCreds{name, userCreds}
}
mainAccJWT, err := mainAccClaim.Encode(oKp)
require_NoError(t, err)
preload[mainAcc] = mainAccJWT

// Start the server with the preload.
resolverPreload, err := json.Marshal(preload)
require_NoError(t, err)
conf := createConfFile(t, []byte(fmt.Sprintf(`
listen: 127.0.0.1:4747
http: 127.0.0.1:8222
operator: %s
system_account: %s
resolver: MEM
resolver_preload: %s
`, ojwt, sysAcc, string(resolverPreload))))
s, _ := RunServerWithConfig(conf)
defer s.Shutdown()

// Have a connection ready for each one of the accounts.
conns := make([]*nats.Conn, 0)
type namedSub struct {
name string
sub *nats.Subscription
}
subs := make(map[string]*namedSub)
for acc, user := range users {
nc := natsConnect(t, s.ClientURL(), user.creds,
// Make the clients attempt to reconnect too fast,
// changing this to be above ~200ms mitigates the issue.
nats.ReconnectWait(15*time.Millisecond),
nats.Name(user.name),
nats.MaxReconnects(-1),
)
defer nc.Close()
conns = append(conns, nc)

sub, err := nc.SubscribeSync("city.>")
require_NoError(t, err)
subs[acc] = &namedSub{user.name, sub}
}

nc := natsConnect(t, s.ClientURL(), mainCreds, nats.ReconnectWait(15*time.Millisecond), nats.MaxReconnects(-1))
defer nc.Close()

send := func(t *testing.T) {
t.Helper()
for i := 0; i < maxAccounts; i++ {
nc.Publish(fmt.Sprintf("city.%d-1.A4BDB048-69DC-4F10-916C-2B998249DC11", i), []byte(fmt.Sprintf("test:%d", i)))
}
nc.Flush()
}

ctx, done := context.WithCancel(context.Background())
defer done()
go func() {
for range time.NewTicker(200 * time.Millisecond).C {
select {
case <-ctx.Done():
default:
}
send(t)
}
}()

receive := func(t *testing.T) {
t.Helper()
received := 0
for _, nsub := range subs {
// Drain first any pending messages.
pendingMsgs, _, _ := nsub.sub.Pending()
for i, _ := 0, 0; i < pendingMsgs; i++ {
nsub.sub.NextMsg(500 * time.Millisecond)
}

_, err = nsub.sub.NextMsg(500 * time.Millisecond)
if err != nil {
t.Logf("WRN: Failed to receive message on account %q: %v", nsub.name, err)
} else {
received++
}
}
if received < (maxAccounts / 2) {
t.Fatalf("Too many missed messages after restart. Received %d", received)
}
}
receive(t)
time.Sleep(1 * time.Second)

restart := func(t *testing.T) *Server {
t.Helper()
s.Shutdown()
s.WaitForShutdown()
s, _ = RunServerWithConfig(conf)

hctx, hcancel := context.WithTimeout(context.Background(), 5*time.Second)
defer hcancel()
for range time.NewTicker(2 * time.Second).C {
select {
case <-hctx.Done():
t.Logf("WRN: Timed out waiting for healthz from %s", s)
default:
}

status := s.healthz(nil)
if status.StatusCode == 200 {
return s
}
}
return nil
}

// Takes a few restarts for issue to show up.
for i := 0; i < 5; i++ {
s := restart(t)
defer s.Shutdown()
time.Sleep(2 * time.Second)
receive(t)
}
}

0 comments on commit 58642d3

Please sign in to comment.