From 93d173c4bc31b8b8fcce3646feed7ae492586a8a Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Wed, 11 Sep 2024 21:41:37 +0200 Subject: [PATCH] Fix data race when moving a stream (#5880) There was a data race that triggered in `TestJetStreamSuperClusterMoveCancel` due to the addition of JetStream asset version metadata. All JS API requests receive user-provided `StreamConfig`/`ConsumerConfig`, which ensures that any mutations done in `setStaticStreamMetadata` for example is safe. However, `jsLeaderServerStreamMoveRequest` and `jsLeaderServerStreamCancelMoveRequest` would use the `sa.Config` from the stream assignment and call into `jsClusteredStreamUpdateRequest`. This resulted in a reference to the same `Metadata` map being available and modified. To ensure any changes made to config defaults or metadata doesn't result in a data race, a `StreamConfig.clone` method is added. [data-race-TestJetStreamSuperClusterMoveCancel.txt](https://github.com/user-attachments/files/16968281/data-race-TestJetStreamSuperClusterMoveCancel.txt) Signed-off-by: Maurice van Veen --------- Signed-off-by: Maurice van Veen --- server/jetstream_api.go | 4 ++-- server/jetstream_test.go | 35 +++++++++++++++++++++++++++++++++++ server/stream.go | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 73 insertions(+), 2 deletions(-) diff --git a/server/jetstream_api.go b/server/jetstream_api.go index 354dd9ee36..6bb29884fd 100644 --- a/server/jetstream_api.go +++ b/server/jetstream_api.go @@ -2493,7 +2493,7 @@ func (s *Server) jsLeaderServerStreamMoveRequest(sub *subscription, c *client, _ if ok { sa, ok := streams[streamName] if ok { - cfg = *sa.Config + cfg = *sa.Config.clone() streamFound = true currPeers = sa.Group.Peers currCluster = sa.Group.Cluster @@ -2635,7 +2635,7 @@ func (s *Server) jsLeaderServerStreamCancelMoveRequest(sub *subscription, c *cli if ok { sa, ok := streams[streamName] if ok { - cfg = *sa.Config + cfg = *sa.Config.clone() streamFound = true currPeers = sa.Group.Peers } diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 093e97deed..472cec3f45 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -24329,3 +24329,38 @@ func TestJetStreamRateLimitHighStreamIngestDefaults(t *testing.T) { require_Equal(t, stream.msgs.mlen, streamDefaultMaxQueueMsgs) require_Equal(t, stream.msgs.msz, streamDefaultMaxQueueBytes) } + +func TestJetStreamStreamConfigClone(t *testing.T) { + cfg := &StreamConfig{ + Name: "name", + Placement: &Placement{Cluster: "placement", Tags: []string{"tag"}}, + Mirror: &StreamSource{Name: "mirror"}, + Sources: []*StreamSource{&StreamSource{Name: "source"}}, + SubjectTransform: &SubjectTransformConfig{Source: "source", Destination: "dest"}, + RePublish: &RePublish{Source: "source", Destination: "dest", HeadersOnly: false}, + Metadata: make(map[string]string), + } + + // Copy should be complete. + clone := cfg.clone() + require_True(t, reflect.DeepEqual(cfg, clone)) + + // Changing fields should not update the original. + clone.Placement.Cluster = "diff" + require_False(t, reflect.DeepEqual(cfg.Placement, clone.Placement)) + + clone.Mirror.Name = "diff" + require_False(t, reflect.DeepEqual(cfg.Mirror, clone.Mirror)) + + clone.Sources[0].Name = "diff" + require_False(t, reflect.DeepEqual(cfg.Sources, clone.Sources)) + + clone.SubjectTransform.Source = "diff" + require_False(t, reflect.DeepEqual(cfg.SubjectTransform, clone.SubjectTransform)) + + clone.RePublish.Source = "diff" + require_False(t, reflect.DeepEqual(cfg.RePublish, clone.RePublish)) + + clone.Metadata["key"] = "value" + require_False(t, reflect.DeepEqual(cfg.Metadata, clone.Metadata)) +} diff --git a/server/stream.go b/server/stream.go index 85c550f620..aee4b34723 100644 --- a/server/stream.go +++ b/server/stream.go @@ -104,6 +104,42 @@ type StreamConfig struct { Metadata map[string]string `json:"metadata,omitempty"` } +// clone performs a deep copy of the StreamConfig struct, returning a new clone with +// all values copied. +func (cfg *StreamConfig) clone() *StreamConfig { + clone := *cfg + if cfg.Placement != nil { + placement := *cfg.Placement + clone.Placement = &placement + } + if cfg.Mirror != nil { + mirror := *cfg.Mirror + clone.Mirror = &mirror + } + if len(cfg.Sources) > 0 { + clone.Sources = make([]*StreamSource, len(cfg.Sources)) + for i, cfgSource := range cfg.Sources { + source := *cfgSource + clone.Sources[i] = &source + } + } + if cfg.SubjectTransform != nil { + transform := *cfg.SubjectTransform + clone.SubjectTransform = &transform + } + if cfg.RePublish != nil { + rePublish := *cfg.RePublish + clone.RePublish = &rePublish + } + if cfg.Metadata != nil { + clone.Metadata = make(map[string]string, len(cfg.Metadata)) + for k, v := range cfg.Metadata { + clone.Metadata[k] = v + } + } + return &clone +} + type StreamConsumerLimits struct { InactiveThreshold time.Duration `json:"inactive_threshold,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"`