Skip to content

Commit

Permalink
Test ACKing deleted messages
Browse files Browse the repository at this point in the history
Test that explicitly ACKs messages that have been already removed from a stream
  • Loading branch information
mprimi committed Aug 16, 2024
1 parent b083868 commit 0824883
Showing 1 changed file with 132 additions and 0 deletions.
132 changes: 132 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2872,3 +2872,135 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) {
)
}
}

func TestJetStreamClusterAckDeleted(t *testing.T) {

const (
NumMessages = 10
StreamName = "TEST"
StreamSubject = "ORDERS.*"
StreamSubjectPrefix = "ORDERS."
)

for _, ClusterSize := range []int{
1, // Single server
3, // 3-node cluster
} {
R := ClusterSize
t.Run(
fmt.Sprintf("Nodes:%d,Replicas:%d", ClusterSize, R),
func(t *testing.T) {
// Setup: start server or cluster, connect client
var server *Server
if ClusterSize == 1 {
server = RunBasicJetStreamServer(t)
defer server.Shutdown()
} else {
c := createJetStreamCluster(t, jsClusterTempl, "HUB", _EMPTY_, ClusterSize, 22020, true)
defer c.shutdown()
server = c.randomServer()
}

// Setup: connect
var nc *nats.Conn
var js nats.JetStreamContext
nc, js = jsClientConnect(t, server)
defer nc.Close()

// Setup: create stream
_, err := js.AddStream(&nats.StreamConfig{
Replicas: R,
Name: StreamName,
Subjects: []string{StreamSubject},
Retention: nats.LimitsPolicy,
Discard: nats.DiscardOld,
MaxMsgs: 1, // Only keep the latest message
})
require_NoError(t, err)

// Setup: create durable consumer and subscription
const Durable = "dlc"
c, err := js.AddConsumer(StreamName, &nats.ConsumerConfig{
Durable: Durable,
Replicas: R,
AckPolicy: nats.AckExplicitPolicy,
MaxAckPending: NumMessages,
})
require_NoError(t, err)
defer func() {
require_NoError(t, js.DeleteConsumer(c.Stream, c.Name))
}()

// Setup: create durable consumer subscription
sub, err := js.PullSubscribe(
"",
"",
nats.Bind(StreamName, Durable),
)
require_NoError(t, err)
defer func() {
require_NoError(t, sub.Unsubscribe())
}()

// Collect received and non-ACKed messages
receivedMessages := make([]*nats.Msg, 0, NumMessages)

buf := make([]byte, 100)
for i := uint64(1); i <= NumMessages; i++ {
// Publish one message
msgId := nuid.Next()
pubAck, err := js.Publish(
StreamSubjectPrefix+strconv.Itoa(int(i)),
buf,
nats.MsgId(msgId),
)
require_NoError(t, err)
require_Equal(t, pubAck.Sequence, i)

// Consume message
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Equal(t, len(msgs), 1)

// Validate message
msg := msgs[0]
require_Equal(t, msgs[0].Header.Get(nats.MsgIdHdr), msgId)

// Validate message metadata
msgMeta, err := msg.Metadata()
require_NoError(t, err)
// Check sequence number
require_Equal(t, msgMeta.Sequence.Stream, i)

// Save for ACK later
receivedMessages = append(receivedMessages, msg)
}

// Verify stream state, expecting a single message due to limits
streamInfo, err := js.StreamInfo(StreamName)
require_NoError(t, err)
require_Equal(t, streamInfo.State.Msgs, 1)

// Verify consumer state, expecting ack floor corresponding to messages dropped
consumerInfo, err := js.ConsumerInfo(StreamName, Durable)
require_NoError(t, err)
require_Equal(t, consumerInfo.NumAckPending, 1)
require_Equal(t, consumerInfo.AckFloor.Stream, 9)
require_Equal(t, consumerInfo.AckFloor.Consumer, 9)

// ACK all messages (all except last have been dropped from the stream)
for _, message := range receivedMessages {
err := message.AckSync()
require_NoError(t, err)
}

// Verify consumer state, all messages ACKed
consumerInfo, err = js.ConsumerInfo(StreamName, Durable)
require_NoError(t, err)
require_Equal(t, consumerInfo.NumAckPending, 0)
require_Equal(t, consumerInfo.AckFloor.Stream, 10)
require_Equal(t, consumerInfo.AckFloor.Consumer, 10)
},
)
}
}

0 comments on commit 0824883

Please sign in to comment.