From 0824883a8e37d9415b78e487b6a1dd34ea331bd4 Mon Sep 17 00:00:00 2001 From: Marco Primi Date: Tue, 13 Aug 2024 13:08:09 -0700 Subject: [PATCH] Test ACKing deleted messages Test that explicitly ACKs messages that have been already removed from a stream --- server/jetstream_cluster_4_test.go | 132 +++++++++++++++++++++++++++++ 1 file changed, 132 insertions(+) diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 6d3eaa9cc8f..04eac01d113 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -2872,3 +2872,135 @@ func TestJetStreamClusterConsumeWithStartSequence(t *testing.T) { ) } } + +func TestJetStreamClusterAckDeleted(t *testing.T) { + + const ( + NumMessages = 10 + StreamName = "TEST" + StreamSubject = "ORDERS.*" + StreamSubjectPrefix = "ORDERS." + ) + + for _, ClusterSize := range []int{ + 1, // Single server + 3, // 3-node cluster + } { + R := ClusterSize + t.Run( + fmt.Sprintf("Nodes:%d,Replicas:%d", ClusterSize, R), + func(t *testing.T) { + // Setup: start server or cluster, connect client + var server *Server + if ClusterSize == 1 { + server = RunBasicJetStreamServer(t) + defer server.Shutdown() + } else { + c := createJetStreamCluster(t, jsClusterTempl, "HUB", _EMPTY_, ClusterSize, 22020, true) + defer c.shutdown() + server = c.randomServer() + } + + // Setup: connect + var nc *nats.Conn + var js nats.JetStreamContext + nc, js = jsClientConnect(t, server) + defer nc.Close() + + // Setup: create stream + _, err := js.AddStream(&nats.StreamConfig{ + Replicas: R, + Name: StreamName, + Subjects: []string{StreamSubject}, + Retention: nats.LimitsPolicy, + Discard: nats.DiscardOld, + MaxMsgs: 1, // Only keep the latest message + }) + require_NoError(t, err) + + // Setup: create durable consumer and subscription + const Durable = "dlc" + c, err := js.AddConsumer(StreamName, &nats.ConsumerConfig{ + Durable: Durable, + Replicas: R, + AckPolicy: nats.AckExplicitPolicy, + MaxAckPending: NumMessages, + }) + require_NoError(t, err) + defer func() { + require_NoError(t, js.DeleteConsumer(c.Stream, c.Name)) + }() + + // Setup: create durable consumer subscription + sub, err := js.PullSubscribe( + "", + "", + nats.Bind(StreamName, Durable), + ) + require_NoError(t, err) + defer func() { + require_NoError(t, sub.Unsubscribe()) + }() + + // Collect received and non-ACKed messages + receivedMessages := make([]*nats.Msg, 0, NumMessages) + + buf := make([]byte, 100) + for i := uint64(1); i <= NumMessages; i++ { + // Publish one message + msgId := nuid.Next() + pubAck, err := js.Publish( + StreamSubjectPrefix+strconv.Itoa(int(i)), + buf, + nats.MsgId(msgId), + ) + require_NoError(t, err) + require_Equal(t, pubAck.Sequence, i) + + // Consume message + msgs, err := sub.Fetch(1) + require_NoError(t, err) + require_Equal(t, len(msgs), 1) + + // Validate message + msg := msgs[0] + require_Equal(t, msgs[0].Header.Get(nats.MsgIdHdr), msgId) + + // Validate message metadata + msgMeta, err := msg.Metadata() + require_NoError(t, err) + // Check sequence number + require_Equal(t, msgMeta.Sequence.Stream, i) + + // Save for ACK later + receivedMessages = append(receivedMessages, msg) + } + + // Verify stream state, expecting a single message due to limits + streamInfo, err := js.StreamInfo(StreamName) + require_NoError(t, err) + require_Equal(t, streamInfo.State.Msgs, 1) + + // Verify consumer state, expecting ack floor corresponding to messages dropped + consumerInfo, err := js.ConsumerInfo(StreamName, Durable) + require_NoError(t, err) + require_Equal(t, consumerInfo.NumAckPending, 1) + require_Equal(t, consumerInfo.AckFloor.Stream, 9) + require_Equal(t, consumerInfo.AckFloor.Consumer, 9) + + // ACK all messages (all except last have been dropped from the stream) + for _, message := range receivedMessages { + err := message.AckSync() + require_NoError(t, err) + } + + // Verify consumer state, all messages ACKed + consumerInfo, err = js.ConsumerInfo(StreamName, Durable) + require_NoError(t, err) + require_Equal(t, consumerInfo.NumAckPending, 0) + require_Equal(t, consumerInfo.AckFloor.Stream, 10) + require_Equal(t, consumerInfo.AckFloor.Consumer, 10) + }, + ) + } +}