From c726042d80d1a37218ca4add9ef037268bb12ac1 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Thu, 31 Aug 2023 11:09:01 -0700 Subject: [PATCH 1/9] add node stats for segrep Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationBaseIT.java | 2 +- .../replication/SegmentReplicationIT.java | 87 +++++++++++++++++ .../indices/stats/IndexStatsIT.java | 37 ++++++- .../cluster/stats/ClusterStatsIndices.java | 12 +++ .../admin/indices/stats/CommonStats.java | 32 ++++++- .../admin/indices/stats/CommonStatsFlags.java | 3 +- .../indices/stats/IndicesStatsRequest.java | 9 ++ .../stats/IndicesStatsRequestBuilder.java | 5 + .../opensearch/index/ReplicationStats.java | 96 +++++++++++++++++++ .../SegmentReplicationPressureService.java | 2 +- .../index/SegmentReplicationStatsTracker.java | 2 +- .../opensearch/index/shard/IndexShard.java | 17 +++- .../opensearch/indices/NodeIndicesStats.java | 6 ++ ...egmentReplicationPressureServiceTests.java | 10 +- .../SegmentReplicationIndexShardTests.java | 6 +- 15 files changed, 311 insertions(+), 15 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/ReplicationStats.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 9539044bf75b0..a16167c8ad0c4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -241,7 +241,7 @@ protected Releasable blockReplication(List nodes, CountDownLatch latch) protected void assertReplicaCheckpointUpdated(IndexShard primaryShard) throws Exception { assertBusy(() -> { - Set groupStats = primaryShard.getReplicationStats(); + Set groupStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(primaryShard.indexSettings().getNumberOfReplicas(), groupStats.size()); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(0, shardStat.getCheckpointsBehindCount()); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 5855ed7470559..8a922da57a761 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -21,8 +21,11 @@ import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.get.GetResponse; @@ -1622,4 +1625,88 @@ public void testRealtimeMultiGetRequestsUnsuccessful() { assertTrue(mgetResponse.getResponses()[1].isFailed()); } + + public void testSegmentReplicationNodeAndIndexStats() throws Exception { + logger.info("--> start primary node"); + final String primaryNode = internalCluster().startNode(); + + logger.info("--> create index on node: {}", primaryNode); + assertAcked( + prepareCreate( + INDEX_NAME, + Settings.builder() + .put(indexSettings()) + // .put("number_of_shards", 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) + // .put("index.replication.type", ReplicationType.SEGMENT) + ) + ); + + ensureYellow(); + logger.info("--> start first replica node"); + final String replicaNode1 = internalCluster().startNode(); + + logger.info("--> start second replica node"); + final String replicaNode2 = internalCluster().startNode(); + + ensureGreen(); + CountDownLatch latch = new CountDownLatch(1); + // block replication + try (final Releasable ignored = blockReplication(List.of(replicaNode1, replicaNode2), latch)) { + // index another doc while blocked, this would not get replicated to the replicas. + Thread indexingThread = new Thread(() -> { + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", randomInt()).get(); + refresh(INDEX_NAME); + }); + + indexingThread.start(); + indexingThread.join(); + latch.await(); + + NodesStatsResponse nodesStatsResponse = client().admin() + .cluster() + .prepareNodesStats() + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.SegmentReplication)) + .get(); + + long maxBytesBehind; + long totalBytesBehind; + long maxReplicationLag; + for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { + if (nodeStats.getNode().isDataNode()) { + maxBytesBehind = nodeStats.getIndices().getReplicationStats().getMaxBytesBehind(); + totalBytesBehind = nodeStats.getIndices().getReplicationStats().getTotalBytesBehind(); + maxReplicationLag = nodeStats.getIndices().getReplicationStats().getMaxReplicationLag(); + // primary node - should hold replication statistics + if (nodeStats.getNode().getName().equals(primaryNode)) { + assertTrue(maxBytesBehind > 0); + assertTrue(totalBytesBehind > 0); + assertTrue(maxReplicationLag > 0); + // 2 replicas so total bytes should be double of max + assertEquals(maxBytesBehind * 2, totalBytesBehind); + } + // replica nodes + if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) { + assertEquals(0, maxBytesBehind); + assertEquals(0, totalBytesBehind); + assertEquals(0, maxReplicationLag); + } + } + } + // get replication statistics at index level + IndicesStatsResponse stats = client().admin().indices().prepareStats().execute().actionGet(); + // stats should be of non-zero value when aggregated at index level + assertNotNull(stats.getIndex(INDEX_NAME).getTotal().getReplicationStats()); + maxBytesBehind = stats.getIndex(INDEX_NAME).getTotal().getReplicationStats().getMaxBytesBehind(); + totalBytesBehind = stats.getIndex(INDEX_NAME).getTotal().getReplicationStats().getTotalBytesBehind(); + maxReplicationLag = stats.getIndex(INDEX_NAME).getTotal().getReplicationStats().getMaxReplicationLag(); + assertTrue(maxBytesBehind > 0); + assertTrue(totalBytesBehind > 0); + assertTrue(maxReplicationLag > 0); + assertEquals(2 * maxBytesBehind, totalBytesBehind); + } + + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index 1a131a2a7eb3d..caa3357a21a4e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -75,6 +75,7 @@ import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.IndicesRequestCache; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.search.sort.SortOrder; import org.opensearch.test.InternalSettingsPlugin; @@ -960,7 +961,8 @@ public void testFlagOrdinalOrder() { Flag.Segments, Flag.Translog, Flag.RequestCache, - Flag.Recovery }; + Flag.Recovery, + Flag.SegmentReplication }; assertThat(flags.length, equalTo(Flag.values().length)); for (int i = 0; i < flags.length; i++) { @@ -1140,6 +1142,8 @@ private static void set(Flag flag, IndicesStatsRequestBuilder builder, boolean s case Recovery: builder.setRecovery(set); break; + case SegmentReplication: + builder.setSegmentReplication(set); default: fail("new flag? " + flag); break; @@ -1180,6 +1184,8 @@ private static boolean isSet(Flag flag, CommonStats response) { return response.getRequestCache() != null; case Recovery: return response.getRecoveryStats() != null; + case SegmentReplication: + return response.getReplicationStats() != null; default: fail("new flag? " + flag); return false; @@ -1477,4 +1483,33 @@ private void persistGlobalCheckpoint(String index) throws Exception { } } } + + public void testSegmentReplicationStats() { + String indexName = "test-index"; + createIndex(indexName, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); + ensureGreen(indexName); + + IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats(); + IndicesStatsResponse stats = builder.execute().actionGet(); + + // document replication enabled index should not return segment replication stats + assertNull(stats.getIndex(indexName).getTotal().getReplicationStats()); + + indexName = "test-index2"; + createIndex( + indexName, + Settings.builder() + .put("index.number_of_shards", 1) + .put("index.number_of_replicas", 1) + .put("index.replication.type", ReplicationType.SEGMENT) + .build() + ); + ensureGreen(indexName); + + builder = client().admin().indices().prepareStats(); + stats = builder.execute().actionGet(); + + // segment replication enabled index should return segment replication stats + assertNotNull(stats.getIndex(indexName).getTotal().getReplicationStats()); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java index 63ac76ae65783..ef92225d71f12 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java @@ -35,6 +35,7 @@ import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.engine.SegmentsStats; import org.opensearch.index.fielddata.FieldDataStats; @@ -65,6 +66,8 @@ public class ClusterStatsIndices implements ToXContentFragment { private AnalysisStats analysis; private MappingStats mappings; + private ReplicationStats replicationStats; + public ClusterStatsIndices(List nodeResponses, MappingStats mappingStats, AnalysisStats analysisStats) { Map countsPerIndex = new HashMap<>(); @@ -74,6 +77,7 @@ public ClusterStatsIndices(List nodeResponses, Mapping this.queryCache = new QueryCacheStats(); this.completion = new CompletionStats(); this.segments = new SegmentsStats(); + this.replicationStats = new ReplicationStats(); for (ClusterStatsNodeResponse r : nodeResponses) { for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) { @@ -96,6 +100,7 @@ public ClusterStatsIndices(List nodeResponses, Mapping queryCache.add(shardCommonStats.queryCache); completion.add(shardCommonStats.completion); segments.add(shardCommonStats.segments); + replicationStats.add(shardCommonStats.replicationStats); } } @@ -149,6 +154,10 @@ public AnalysisStats getAnalysis() { return analysis; } + public ReplicationStats getReplicationStats() { + return replicationStats; + } + /** * Inner Fields used for creating XContent and parsing * @@ -174,6 +183,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (analysis != null) { analysis.toXContent(builder, params); } + if (replicationStats != null) { + replicationStats.toXContent(builder, params); + } return builder; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java index e4abaef4ddfa8..16f4e229216ee 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java @@ -33,6 +33,7 @@ package org.opensearch.action.admin.indices.stats; import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -41,6 +42,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.engine.SegmentsStats; @@ -120,6 +122,9 @@ public class CommonStats implements Writeable, ToXContentFragment { @Nullable public RecoveryStats recoveryStats; + @Nullable + public ReplicationStats replicationStats; + public CommonStats() { this(CommonStatsFlags.NONE); } @@ -177,6 +182,8 @@ public CommonStats(CommonStatsFlags flags) { case Recovery: recoveryStats = new RecoveryStats(); break; + case SegmentReplication: + break; default: throw new IllegalStateException("Unknown Flag: " + flag); } @@ -236,6 +243,9 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C case Recovery: recoveryStats = indexShard.recoveryStats(); break; + case SegmentReplication: + replicationStats = indexShard.getReplicationStats(); + break; default: throw new IllegalStateException("Unknown Flag: " + flag); } @@ -262,6 +272,9 @@ public CommonStats(StreamInput in) throws IOException { translog = in.readOptionalWriteable(TranslogStats::new); requestCache = in.readOptionalWriteable(RequestCacheStats::new); recoveryStats = in.readOptionalWriteable(RecoveryStats::new); + if (in.getVersion().onOrAfter(Version.V_2_10_0)) { + replicationStats = in.readOptionalWriteable(ReplicationStats::new); + } } @Override @@ -282,6 +295,9 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(translog); out.writeOptionalWriteable(requestCache); out.writeOptionalWriteable(recoveryStats); + if (out.getVersion().onOrAfter(Version.V_2_10_0)) { + out.writeOptionalWriteable(replicationStats); + } } public void add(CommonStats stats) { @@ -414,6 +430,14 @@ public void add(CommonStats stats) { } else { recoveryStats.add(stats.getRecoveryStats()); } + if (replicationStats == null) { + if (stats.getReplicationStats() != null) { + replicationStats = new ReplicationStats(); + replicationStats.add(stats.getReplicationStats()); + } + } else { + replicationStats.add(stats.getReplicationStats()); + } } @Nullable @@ -496,6 +520,11 @@ public RecoveryStats getRecoveryStats() { return recoveryStats; } + @Nullable + public ReplicationStats getReplicationStats() { + return replicationStats; + } + /** * Utility method which computes total memory by adding * FieldData, PercolatorCache, Segments (index writer, version map) @@ -535,7 +564,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws segments, translog, requestCache, - recoveryStats } + recoveryStats, + replicationStats } ).filter(Objects::nonNull); for (ToXContent toXContent : ((Iterable) stream::iterator)) { toXContent.toXContent(builder, params); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java index 7503020d1c8ef..84ac36383a107 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java @@ -274,7 +274,8 @@ public enum Flag { Translog("translog", 13), // 14 was previously used for Suggest RequestCache("request_cache", 15), - Recovery("recovery", 16); + Recovery("recovery", 16), + SegmentReplication("segment_replication", 17); private final String restName; private final int index; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java index 54f3e9b7d1a24..0180d513c1218 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java @@ -282,6 +282,15 @@ public IndicesStatsRequest includeUnloadedSegments(boolean includeUnloadedSegmen return this; } + public IndicesStatsRequest segmentReplication(boolean segmentReplication) { + flags.set(CommonStatsFlags.Flag.SegmentReplication, segmentReplication); + return this; + } + + public boolean segmentReplication() { + return flags.isSet(CommonStatsFlags.Flag.SegmentReplication); + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java index c211812b32c48..0f6f11a003014 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java @@ -179,4 +179,9 @@ public IndicesStatsRequestBuilder setIncludeSegmentFileSizes(boolean includeSegm request.includeSegmentFileSizes(includeSegmentFileSizes); return this; } + + public IndicesStatsRequestBuilder setSegmentReplication(boolean segmentReplication) { + request.segmentReplication(segmentReplication); + return this; + } } diff --git a/server/src/main/java/org/opensearch/index/ReplicationStats.java b/server/src/main/java/org/opensearch/index/ReplicationStats.java new file mode 100644 index 0000000000000..3157aebf52c4c --- /dev/null +++ b/server/src/main/java/org/opensearch/index/ReplicationStats.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * ReplicationStats is used to provide segment replication statistics at an index, + * node and cluster level on a segment replication enabled cluster. + */ +public class ReplicationStats implements ToXContentFragment, Writeable { + + public long maxBytesBehind; + public long maxReplicationLag; + public long totalBytesBehind; + + public ReplicationStats(long maxBytesBehind, long totalBytesBehind, long maxReplicationLag) { + this.maxBytesBehind = maxBytesBehind; + this.totalBytesBehind = totalBytesBehind; + this.maxReplicationLag = maxReplicationLag; + } + + public ReplicationStats(StreamInput in) throws IOException { + this.maxBytesBehind = in.readVLong(); + this.totalBytesBehind = in.readVLong(); + this.maxReplicationLag = in.readVLong(); + } + + public ReplicationStats() { + + } + + public void add(ReplicationStats other) { + if (other == null) { + return; + } + maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind); + totalBytesBehind += other.totalBytesBehind; + maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag); + } + + public long getMaxBytesBehind() { + return this.maxBytesBehind; + } + + public long getTotalBytesBehind() { + return this.totalBytesBehind; + } + + public long getMaxReplicationLag() { + return this.maxReplicationLag; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(maxBytesBehind); + out.writeVLong(totalBytesBehind); + out.writeVLong(maxReplicationLag); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.REPLICATION); + builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(maxBytesBehind).toString()); + builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(totalBytesBehind).toString()); + builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(maxReplicationLag)); + builder.endObject(); + return builder; + } + + /** + * Fields for segment replication statistics + * + * @opensearch.internal + */ + static final class Fields { + static final String REPLICATION = "replication"; + static final String MAX_BYTES_BEHIND = "max_bytes_behind"; + static final String TOTAL_BYTES_BEHIND = "total_bytes_behind"; + static final String MAX_REPLICATION_LAG = "max_replication_lag"; + } +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 0c4fe26eeead5..4284daf9ffef4 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -151,7 +151,7 @@ public void isSegrepLimitBreached(ShardId shardId) { } private void validateReplicationGroup(IndexShard shard) { - final Set replicaStats = shard.getReplicationStats(); + final Set replicaStats = shard.getReplicationStatsForTrackedReplicas(); final Set staleReplicas = getStaleReplicas(replicaStats); if (staleReplicas.isEmpty() == false) { // inSyncIds always considers the primary id, so filter it out. diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index 2255bb17d364f..6d5c00c08caff 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -59,7 +59,7 @@ public void incrementRejectionCount(ShardId shardId) { public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) { return new SegmentReplicationPerGroupStats( indexShard.shardId(), - indexShard.getReplicationStats(), + indexShard.getReplicationStatsForTrackedReplicas(), Optional.ofNullable(rejectionCount.get(indexShard.shardId())).map(AtomicInteger::get).orElse(0) ); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 352876e54547e..90b2d038973f3 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -110,6 +110,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; @@ -2935,10 +2936,24 @@ public void updateVisibleCheckpointForShard(final String allocationId, final Rep * @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group, * V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group. */ - public Set getReplicationStats() { + public Set getReplicationStatsForTrackedReplicas() { return replicationTracker.getSegmentReplicationStats(); } + public ReplicationStats getReplicationStats() { + if (indexSettings.isSegRepEnabled()) { + final Set stats = getReplicationStatsForTrackedReplicas(); + long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L); + long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum(); + long maxReplicationLag = stats.stream() + .mapToLong(SegmentReplicationShardStats::getCurrentReplicationTimeMillis) + .max() + .orElse(0L); + return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag); + } + return null; + } + /** * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for, * then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout diff --git a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java index cc3d8193dfa6b..ced633dcee9aa 100644 --- a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java @@ -42,6 +42,7 @@ import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.engine.SegmentsStats; @@ -187,6 +188,11 @@ public RecoveryStats getRecoveryStats() { return stats.getRecoveryStats(); } + @Nullable + public ReplicationStats getReplicationStats() { + return stats.getReplicationStats(); + } + @Override public void writeTo(StreamOutput out) throws IOException { stats.writeTo(out); diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java index 3a08a2c143a89..34fa13f0ba62c 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java @@ -114,7 +114,7 @@ public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Excep indexInBatches(5, shards, primaryShard); - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(1, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertEquals(5, shardStats.getCheckpointsBehindCount()); @@ -142,7 +142,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception { indexInBatches(1, shards, primaryShard); assertBusy(() -> { - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(1, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertTrue(shardStats.getCurrentReplicationTimeMillis() > TimeValue.timeValueSeconds(5).millis()); @@ -164,7 +164,7 @@ public void testIsSegrepLimitBreached_underStaleNodeLimit() throws Exception { SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); assertBusy(() -> { - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(3, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertTrue(shardStats.getCurrentReplicationTimeMillis() > TimeValue.timeValueSeconds(5).millis()); @@ -211,7 +211,7 @@ public void testFailStaleReplicaTask() throws Exception { indexInBatches(5, shards, primaryShard); // assert that replica shard is few checkpoints behind primary - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(1, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertEquals(5, shardStats.getCheckpointsBehindCount()); @@ -243,7 +243,7 @@ public void testFailStaleReplicaTaskDisabled() throws Exception { indexInBatches(5, shards, primaryShard); // assert that replica shard is few checkpoints behind primary - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(1, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertEquals(5, shardStats.getCheckpointsBehindCount()); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index b7972810dddb9..66d184c7ff86f 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -828,7 +828,7 @@ public void testSegmentReplicationStats() throws Exception { .mapToLong(StoreFileMetadata::length) .sum(); - Set postRefreshStats = primaryShard.getReplicationStats(); + Set postRefreshStats = primaryShard.getReplicationStatsForTrackedReplicas(); SegmentReplicationShardStats shardStats = postRefreshStats.stream().findFirst().get(); assertEquals(1, shardStats.getCheckpointsBehindCount()); assertEquals(initialCheckpointSize, shardStats.getBytesBehindCount()); @@ -848,7 +848,7 @@ public void testSegmentReplicationStats() throws Exception { final Store.RecoveryDiff diff = Store.segmentReplicationDiff(segmentMetadataMap, replicaShard.getSegmentMetadataMap()); final long sizeAfterDeleteAndCommit = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum(); - final Set statsAfterFlush = primaryShard.getReplicationStats(); + final Set statsAfterFlush = primaryShard.getReplicationStatsForTrackedReplicas(); shardStats = statsAfterFlush.stream().findFirst().get(); assertEquals(sizeAfterDeleteAndCommit, shardStats.getBytesBehindCount()); assertEquals(1, shardStats.getCheckpointsBehindCount()); @@ -860,7 +860,7 @@ public void testSegmentReplicationStats() throws Exception { } private void assertReplicaCaughtUp(IndexShard primaryShard) { - Set initialStats = primaryShard.getReplicationStats(); + Set initialStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(initialStats.size(), 1); SegmentReplicationShardStats shardStats = initialStats.stream().findFirst().get(); assertEquals(0, shardStats.getCheckpointsBehindCount()); From 054342ca950a870c3e042931d91298f3f0c77c2b Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Fri, 1 Sep 2023 15:32:43 -0700 Subject: [PATCH 2/9] comment removal Signed-off-by: Poojita Raj --- .../opensearch/indices/replication/SegmentReplicationIT.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 8a922da57a761..761a2f3ecae70 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -1636,9 +1636,7 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception { INDEX_NAME, Settings.builder() .put(indexSettings()) - // .put("number_of_shards", 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) - // .put("index.replication.type", ReplicationType.SEGMENT) ) ); From df2b2a6d850a26ae90b7f845aff9d274a3f9e37c Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Sun, 3 Sep 2023 00:46:18 -0700 Subject: [PATCH 3/9] changelog Signed-off-by: Poojita Raj --- CHANGELOG.md | 1 + .../replication/SegmentReplicationIT.java | 84 ------------------- 2 files changed, 1 insertion(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 529278c188f46..f167c28a12bf4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -100,6 +100,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592)) - Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584)) - Add average concurrency metric for concurrent segment search ([#9670](https://github.com/opensearch-project/OpenSearch/issues/9670)) +- [Segment Replication] Adding segment replication statistics rolled up at index, node and cluster level ([#9709](https://github.com/opensearch-project/OpenSearch/pull/9709)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 761a2f3ecae70..2d8a13c89d541 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -21,11 +21,8 @@ import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; -import org.opensearch.action.admin.cluster.node.stats.NodeStats; -import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.flush.FlushRequest; -import org.opensearch.action.admin.indices.stats.CommonStatsFlags; import org.opensearch.action.admin.indices.stats.IndicesStatsRequest; import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.action.get.GetResponse; @@ -1626,85 +1623,4 @@ public void testRealtimeMultiGetRequestsUnsuccessful() { } - public void testSegmentReplicationNodeAndIndexStats() throws Exception { - logger.info("--> start primary node"); - final String primaryNode = internalCluster().startNode(); - - logger.info("--> create index on node: {}", primaryNode); - assertAcked( - prepareCreate( - INDEX_NAME, - Settings.builder() - .put(indexSettings()) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2) - ) - ); - - ensureYellow(); - logger.info("--> start first replica node"); - final String replicaNode1 = internalCluster().startNode(); - - logger.info("--> start second replica node"); - final String replicaNode2 = internalCluster().startNode(); - - ensureGreen(); - CountDownLatch latch = new CountDownLatch(1); - // block replication - try (final Releasable ignored = blockReplication(List.of(replicaNode1, replicaNode2), latch)) { - // index another doc while blocked, this would not get replicated to the replicas. - Thread indexingThread = new Thread(() -> { - client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", randomInt()).get(); - refresh(INDEX_NAME); - }); - - indexingThread.start(); - indexingThread.join(); - latch.await(); - - NodesStatsResponse nodesStatsResponse = client().admin() - .cluster() - .prepareNodesStats() - .clear() - .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.SegmentReplication)) - .get(); - - long maxBytesBehind; - long totalBytesBehind; - long maxReplicationLag; - for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { - if (nodeStats.getNode().isDataNode()) { - maxBytesBehind = nodeStats.getIndices().getReplicationStats().getMaxBytesBehind(); - totalBytesBehind = nodeStats.getIndices().getReplicationStats().getTotalBytesBehind(); - maxReplicationLag = nodeStats.getIndices().getReplicationStats().getMaxReplicationLag(); - // primary node - should hold replication statistics - if (nodeStats.getNode().getName().equals(primaryNode)) { - assertTrue(maxBytesBehind > 0); - assertTrue(totalBytesBehind > 0); - assertTrue(maxReplicationLag > 0); - // 2 replicas so total bytes should be double of max - assertEquals(maxBytesBehind * 2, totalBytesBehind); - } - // replica nodes - if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) { - assertEquals(0, maxBytesBehind); - assertEquals(0, totalBytesBehind); - assertEquals(0, maxReplicationLag); - } - } - } - // get replication statistics at index level - IndicesStatsResponse stats = client().admin().indices().prepareStats().execute().actionGet(); - // stats should be of non-zero value when aggregated at index level - assertNotNull(stats.getIndex(INDEX_NAME).getTotal().getReplicationStats()); - maxBytesBehind = stats.getIndex(INDEX_NAME).getTotal().getReplicationStats().getMaxBytesBehind(); - totalBytesBehind = stats.getIndex(INDEX_NAME).getTotal().getReplicationStats().getTotalBytesBehind(); - maxReplicationLag = stats.getIndex(INDEX_NAME).getTotal().getReplicationStats().getMaxReplicationLag(); - assertTrue(maxBytesBehind > 0); - assertTrue(totalBytesBehind > 0); - assertTrue(maxReplicationLag > 0); - assertEquals(2 * maxBytesBehind, totalBytesBehind); - } - - } - } From eca18d2cdf2a6e6ef2781ff74cee4105be36d202 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Fri, 1 Sep 2023 16:36:40 -0700 Subject: [PATCH 4/9] address review comments Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationIT.java | 1 - .../SegmentReplicationStatsIT.java | 71 +++++++++++++++++++ .../indices/stats/IndexStatsIT.java | 13 ++-- .../opensearch/index/ReplicationStats.java | 6 +- .../opensearch/index/shard/IndexShard.java | 2 +- 5 files changed, 85 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 2d8a13c89d541..5855ed7470559 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -1622,5 +1622,4 @@ public void testRealtimeMultiGetRequestsUnsuccessful() { assertTrue(mgetResponse.getResponses()[1].isFailed()); } - } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index 159de1a681f53..a957ff5f4cfc3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -8,9 +8,15 @@ package org.opensearch.indices.replication; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.IndexShard; @@ -357,4 +363,69 @@ public void testQueryAgainstDocRepIndex() { .actionGet(); assertTrue(segmentReplicationStatsResponse.getReplicationStats().isEmpty()); } + + public void testSegmentReplicationNodeAndIndexStats() throws Exception { + logger.info("--> start primary node"); + final String primaryNode = internalCluster().startNode(); + + logger.info("--> create index on node: {}", primaryNode); + assertAcked(prepareCreate(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2))); + + ensureYellow(); + logger.info("--> start first replica node"); + final String replicaNode1 = internalCluster().startNode(); + + logger.info("--> start second replica node"); + final String replicaNode2 = internalCluster().startNode(); + + ensureGreen(); + CountDownLatch latch = new CountDownLatch(1); + // block replication + try (final Releasable ignored = blockReplication(List.of(replicaNode1, replicaNode2), latch)) { + // index another doc while blocked, this would not get replicated to the replicas. + Thread indexingThread = new Thread(() -> { + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", randomInt()).get(); + refresh(INDEX_NAME); + }); + + indexingThread.start(); + indexingThread.join(); + latch.await(); + + NodesStatsResponse nodesStatsResponse = client().admin() + .cluster() + .prepareNodesStats() + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.SegmentReplication)) + .get(); + + for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { + ReplicationStats replicationStats = nodeStats.getIndices().getReplicationStats(); + // primary node - should hold replication statistics + if (nodeStats.getNode().getName().equals(primaryNode)) { + assertTrue(replicationStats.getMaxBytesBehind() > 0); + assertTrue(replicationStats.getTotalBytesBehind() > 0); + assertTrue(replicationStats.getMaxReplicationLag() > 0); + // 2 replicas so total bytes should be double of max + assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind()); + } + // replica nodes will not return stats + if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) { + assertNull(replicationStats); + } + } + // get replication statistics at index level + IndicesStatsResponse stats = client().admin().indices().prepareStats().execute().actionGet(); + + // stats should be of non-zero value when aggregated at index level + ReplicationStats indexReplicationStats = stats.getIndex(INDEX_NAME).getTotal().getReplicationStats(); + assertNotNull(indexReplicationStats); + assertTrue(indexReplicationStats.getMaxBytesBehind() > 0); + assertTrue(indexReplicationStats.getTotalBytesBehind() > 0); + assertTrue(indexReplicationStats.getMaxReplicationLag() > 0); + assertEquals(2 * indexReplicationStats.getMaxBytesBehind(), indexReplicationStats.getTotalBytesBehind()); + } + + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index caa3357a21a4e..c9ea1abafbd3d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -1144,6 +1144,7 @@ private static void set(Flag flag, IndicesStatsRequestBuilder builder, boolean s break; case SegmentReplication: builder.setSegmentReplication(set); + break; default: fail("new flag? " + flag); break; @@ -1486,7 +1487,11 @@ private void persistGlobalCheckpoint(String index) throws Exception { public void testSegmentReplicationStats() { String indexName = "test-index"; - createIndex(indexName, Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 1).build()); + createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1).build() + ); + ensureGreen(indexName); IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats(); @@ -1499,9 +1504,9 @@ public void testSegmentReplicationStats() { createIndex( indexName, Settings.builder() - .put("index.number_of_shards", 1) - .put("index.number_of_replicas", 1) - .put("index.replication.type", ReplicationType.SEGMENT) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .build() ); ensureGreen(indexName); diff --git a/server/src/main/java/org/opensearch/index/ReplicationStats.java b/server/src/main/java/org/opensearch/index/ReplicationStats.java index 3157aebf52c4c..15c5837f1d043 100644 --- a/server/src/main/java/org/opensearch/index/ReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/ReplicationStats.java @@ -21,6 +21,8 @@ /** * ReplicationStats is used to provide segment replication statistics at an index, * node and cluster level on a segment replication enabled cluster. + * + * @opensearch.internal */ public class ReplicationStats implements ToXContentFragment, Writeable { @@ -74,7 +76,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(Fields.REPLICATION); + builder.startObject(Fields.SEGMENT_REPLICATION); builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(maxBytesBehind).toString()); builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(totalBytesBehind).toString()); builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(maxReplicationLag)); @@ -88,7 +90,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws * @opensearch.internal */ static final class Fields { - static final String REPLICATION = "replication"; + static final String SEGMENT_REPLICATION = "segment_replication"; static final String MAX_BYTES_BEHIND = "max_bytes_behind"; static final String TOTAL_BYTES_BEHIND = "total_bytes_behind"; static final String MAX_REPLICATION_LAG = "max_replication_lag"; diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 90b2d038973f3..1056f3fc63bb4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2941,7 +2941,7 @@ public Set getReplicationStatsForTrackedReplicas() } public ReplicationStats getReplicationStats() { - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabled() && isPrimaryMode()) { final Set stats = getReplicationStatsForTrackedReplicas(); long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L); long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum(); From 8efeccfa776235ca8d4dd28b4f999bd4abfeb904 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Fri, 1 Sep 2023 18:58:24 -0700 Subject: [PATCH 5/9] move segrep stats to segments Signed-off-by: Poojita Raj --- .../SegmentReplicationStatsIT.java | 6 ++-- .../indices/stats/IndexStatsIT.java | 12 ++----- .../cluster/stats/ClusterStatsIndices.java | 12 ------- .../admin/indices/stats/CommonStats.java | 32 +------------------ .../admin/indices/stats/CommonStatsFlags.java | 3 +- .../indices/stats/IndicesStatsRequest.java | 9 ------ .../stats/IndicesStatsRequestBuilder.java | 5 --- .../index/engine/SegmentsStats.java | 18 +++++++++++ .../opensearch/index/shard/IndexShard.java | 3 ++ .../opensearch/indices/NodeIndicesStats.java | 6 ---- 10 files changed, 29 insertions(+), 77 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index a957ff5f4cfc3..6c8f148c19886 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -396,11 +396,11 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception { .cluster() .prepareNodesStats() .clear() - .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.SegmentReplication)) + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Segments)) .get(); for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { - ReplicationStats replicationStats = nodeStats.getIndices().getReplicationStats(); + ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats(); // primary node - should hold replication statistics if (nodeStats.getNode().getName().equals(primaryNode)) { assertTrue(replicationStats.getMaxBytesBehind() > 0); @@ -418,7 +418,7 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception { IndicesStatsResponse stats = client().admin().indices().prepareStats().execute().actionGet(); // stats should be of non-zero value when aggregated at index level - ReplicationStats indexReplicationStats = stats.getIndex(INDEX_NAME).getTotal().getReplicationStats(); + ReplicationStats indexReplicationStats = stats.getIndex(INDEX_NAME).getTotal().getSegments().getReplicationStats(); assertNotNull(indexReplicationStats); assertTrue(indexReplicationStats.getMaxBytesBehind() > 0); assertTrue(indexReplicationStats.getTotalBytesBehind() > 0); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index c9ea1abafbd3d..6fcc4d760be4c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -961,8 +961,7 @@ public void testFlagOrdinalOrder() { Flag.Segments, Flag.Translog, Flag.RequestCache, - Flag.Recovery, - Flag.SegmentReplication }; + Flag.Recovery }; assertThat(flags.length, equalTo(Flag.values().length)); for (int i = 0; i < flags.length; i++) { @@ -1142,9 +1141,6 @@ private static void set(Flag flag, IndicesStatsRequestBuilder builder, boolean s case Recovery: builder.setRecovery(set); break; - case SegmentReplication: - builder.setSegmentReplication(set); - break; default: fail("new flag? " + flag); break; @@ -1185,8 +1181,6 @@ private static boolean isSet(Flag flag, CommonStats response) { return response.getRequestCache() != null; case Recovery: return response.getRecoveryStats() != null; - case SegmentReplication: - return response.getReplicationStats() != null; default: fail("new flag? " + flag); return false; @@ -1498,7 +1492,7 @@ public void testSegmentReplicationStats() { IndicesStatsResponse stats = builder.execute().actionGet(); // document replication enabled index should not return segment replication stats - assertNull(stats.getIndex(indexName).getTotal().getReplicationStats()); + assertNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats()); indexName = "test-index2"; createIndex( @@ -1515,6 +1509,6 @@ public void testSegmentReplicationStats() { stats = builder.execute().actionGet(); // segment replication enabled index should return segment replication stats - assertNotNull(stats.getIndex(indexName).getTotal().getReplicationStats()); + assertNotNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats()); } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java index ef92225d71f12..63ac76ae65783 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/ClusterStatsIndices.java @@ -35,7 +35,6 @@ import org.opensearch.action.admin.indices.stats.CommonStats; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.index.ReplicationStats; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.engine.SegmentsStats; import org.opensearch.index.fielddata.FieldDataStats; @@ -66,8 +65,6 @@ public class ClusterStatsIndices implements ToXContentFragment { private AnalysisStats analysis; private MappingStats mappings; - private ReplicationStats replicationStats; - public ClusterStatsIndices(List nodeResponses, MappingStats mappingStats, AnalysisStats analysisStats) { Map countsPerIndex = new HashMap<>(); @@ -77,7 +74,6 @@ public ClusterStatsIndices(List nodeResponses, Mapping this.queryCache = new QueryCacheStats(); this.completion = new CompletionStats(); this.segments = new SegmentsStats(); - this.replicationStats = new ReplicationStats(); for (ClusterStatsNodeResponse r : nodeResponses) { for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) { @@ -100,7 +96,6 @@ public ClusterStatsIndices(List nodeResponses, Mapping queryCache.add(shardCommonStats.queryCache); completion.add(shardCommonStats.completion); segments.add(shardCommonStats.segments); - replicationStats.add(shardCommonStats.replicationStats); } } @@ -154,10 +149,6 @@ public AnalysisStats getAnalysis() { return analysis; } - public ReplicationStats getReplicationStats() { - return replicationStats; - } - /** * Inner Fields used for creating XContent and parsing * @@ -183,9 +174,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (analysis != null) { analysis.toXContent(builder, params); } - if (replicationStats != null) { - replicationStats.toXContent(builder, params); - } return builder; } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java index 16f4e229216ee..e4abaef4ddfa8 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStats.java @@ -33,7 +33,6 @@ package org.opensearch.action.admin.indices.stats; import org.apache.lucene.store.AlreadyClosedException; -import org.opensearch.Version; import org.opensearch.common.Nullable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -42,7 +41,6 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.index.ReplicationStats; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.engine.SegmentsStats; @@ -122,9 +120,6 @@ public class CommonStats implements Writeable, ToXContentFragment { @Nullable public RecoveryStats recoveryStats; - @Nullable - public ReplicationStats replicationStats; - public CommonStats() { this(CommonStatsFlags.NONE); } @@ -182,8 +177,6 @@ public CommonStats(CommonStatsFlags flags) { case Recovery: recoveryStats = new RecoveryStats(); break; - case SegmentReplication: - break; default: throw new IllegalStateException("Unknown Flag: " + flag); } @@ -243,9 +236,6 @@ public CommonStats(IndicesQueryCache indicesQueryCache, IndexShard indexShard, C case Recovery: recoveryStats = indexShard.recoveryStats(); break; - case SegmentReplication: - replicationStats = indexShard.getReplicationStats(); - break; default: throw new IllegalStateException("Unknown Flag: " + flag); } @@ -272,9 +262,6 @@ public CommonStats(StreamInput in) throws IOException { translog = in.readOptionalWriteable(TranslogStats::new); requestCache = in.readOptionalWriteable(RequestCacheStats::new); recoveryStats = in.readOptionalWriteable(RecoveryStats::new); - if (in.getVersion().onOrAfter(Version.V_2_10_0)) { - replicationStats = in.readOptionalWriteable(ReplicationStats::new); - } } @Override @@ -295,9 +282,6 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(translog); out.writeOptionalWriteable(requestCache); out.writeOptionalWriteable(recoveryStats); - if (out.getVersion().onOrAfter(Version.V_2_10_0)) { - out.writeOptionalWriteable(replicationStats); - } } public void add(CommonStats stats) { @@ -430,14 +414,6 @@ public void add(CommonStats stats) { } else { recoveryStats.add(stats.getRecoveryStats()); } - if (replicationStats == null) { - if (stats.getReplicationStats() != null) { - replicationStats = new ReplicationStats(); - replicationStats.add(stats.getReplicationStats()); - } - } else { - replicationStats.add(stats.getReplicationStats()); - } } @Nullable @@ -520,11 +496,6 @@ public RecoveryStats getRecoveryStats() { return recoveryStats; } - @Nullable - public ReplicationStats getReplicationStats() { - return replicationStats; - } - /** * Utility method which computes total memory by adding * FieldData, PercolatorCache, Segments (index writer, version map) @@ -564,8 +535,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws segments, translog, requestCache, - recoveryStats, - replicationStats } + recoveryStats } ).filter(Objects::nonNull); for (ToXContent toXContent : ((Iterable) stream::iterator)) { toXContent.toXContent(builder, params); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java index 84ac36383a107..7503020d1c8ef 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/CommonStatsFlags.java @@ -274,8 +274,7 @@ public enum Flag { Translog("translog", 13), // 14 was previously used for Suggest RequestCache("request_cache", 15), - Recovery("recovery", 16), - SegmentReplication("segment_replication", 17); + Recovery("recovery", 16); private final String restName; private final int index; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java index 0180d513c1218..54f3e9b7d1a24 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequest.java @@ -282,15 +282,6 @@ public IndicesStatsRequest includeUnloadedSegments(boolean includeUnloadedSegmen return this; } - public IndicesStatsRequest segmentReplication(boolean segmentReplication) { - flags.set(CommonStatsFlags.Flag.SegmentReplication, segmentReplication); - return this; - } - - public boolean segmentReplication() { - return flags.isSet(CommonStatsFlags.Flag.SegmentReplication); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java index 0f6f11a003014..c211812b32c48 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/stats/IndicesStatsRequestBuilder.java @@ -179,9 +179,4 @@ public IndicesStatsRequestBuilder setIncludeSegmentFileSizes(boolean includeSegm request.includeSegmentFileSizes(includeSegmentFileSizes); return this; } - - public IndicesStatsRequestBuilder setSegmentReplication(boolean segmentReplication) { - request.segmentReplication(segmentReplication); - return this; - } } diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index d18579e662710..9f415281ba5c2 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -39,6 +39,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.remote.RemoteSegmentStats; import java.io.IOException; @@ -62,6 +63,11 @@ public class SegmentsStats implements Writeable, ToXContentFragment { private final RemoteSegmentStats remoteSegmentStats; private static final ByteSizeValue ZERO_BYTE_SIZE_VALUE = new ByteSizeValue(0L); + /** + * Segment replication statistics. + */ + private ReplicationStats replicationStats; + /* * A map to provide a best-effort approach describing Lucene index files. * @@ -115,6 +121,7 @@ public SegmentsStats(StreamInput in) throws IOException { fileSizes = in.readMap(StreamInput::readString, StreamInput::readLong); if (in.getVersion().onOrAfter(Version.V_2_10_0)) { remoteSegmentStats = in.readOptionalWriteable(RemoteSegmentStats::new); + replicationStats = in.readOptionalWriteable(ReplicationStats::new); } else { remoteSegmentStats = new RemoteSegmentStats(); } @@ -144,6 +151,10 @@ public void addRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { this.remoteSegmentStats.add(remoteSegmentStats); } + public void addReplicationStats(ReplicationStats replicationStats) { + this.replicationStats.add(replicationStats); + } + public void addFileSizes(final Map newFileSizes) { newFileSizes.forEach((k, v) -> this.fileSizes.merge(k, v, (a, b) -> { assert a != null; @@ -163,6 +174,7 @@ public void add(SegmentsStats mergeStats) { addBitsetMemoryInBytes(mergeStats.bitsetMemoryInBytes); addFileSizes(mergeStats.fileSizes); addRemoteSegmentStats(mergeStats.remoteSegmentStats); + addReplicationStats(mergeStats.replicationStats); } /** @@ -215,6 +227,10 @@ public RemoteSegmentStats getRemoteSegmentStats() { return remoteSegmentStats; } + public ReplicationStats getReplicationStats() { + return replicationStats; + } + /** * Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine. * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs @@ -239,6 +255,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.FIXED_BIT_SET_MEMORY_IN_BYTES, Fields.FIXED_BIT_SET, getBitsetMemory()); builder.field(Fields.MAX_UNSAFE_AUTO_ID_TIMESTAMP, maxUnsafeAutoIdTimestamp); remoteSegmentStats.toXContent(builder, params); + replicationStats.toXContent(builder, params); builder.startObject(Fields.FILE_SIZES); for (Map.Entry entry : fileSizes.entrySet()) { builder.startObject(entry.getKey()); @@ -307,6 +324,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(this.fileSizes, StreamOutput::writeString, StreamOutput::writeLong); if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeOptionalWriteable(remoteSegmentStats); + out.writeOptionalWriteable(replicationStats); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 1056f3fc63bb4..50f9f1e03bae9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1391,6 +1391,9 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats()) ); } + if (indexSettings.isSegRepEnabled()) { + segmentsStats.addReplicationStats(getReplicationStats()); + } return segmentsStats; } diff --git a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java index ced633dcee9aa..cc3d8193dfa6b 100644 --- a/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/opensearch/indices/NodeIndicesStats.java @@ -42,7 +42,6 @@ import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; -import org.opensearch.index.ReplicationStats; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.engine.SegmentsStats; @@ -188,11 +187,6 @@ public RecoveryStats getRecoveryStats() { return stats.getRecoveryStats(); } - @Nullable - public ReplicationStats getReplicationStats() { - return stats.getReplicationStats(); - } - @Override public void writeTo(StreamOutput out) throws IOException { stats.writeTo(out); From 7d428c4c8f4f1c6441a39b81d5d50dc78400c6e1 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Fri, 1 Sep 2023 19:40:26 -0700 Subject: [PATCH 6/9] fix failures Signed-off-by: Poojita Raj --- .../org/opensearch/index/engine/SegmentsStats.java | 11 ++++++++++- .../java/org/opensearch/index/shard/IndexShard.java | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index 9f415281ba5c2..b7edba195a353 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -99,6 +99,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment { public SegmentsStats() { fileSizes = new HashMap<>(); remoteSegmentStats = new RemoteSegmentStats(); + // replicationStats = new ReplicationStats(); } public SegmentsStats(StreamInput in) throws IOException { @@ -124,6 +125,7 @@ public SegmentsStats(StreamInput in) throws IOException { replicationStats = in.readOptionalWriteable(ReplicationStats::new); } else { remoteSegmentStats = new RemoteSegmentStats(); + // replicationStats = new ReplicationStats(); } } @@ -152,7 +154,14 @@ public void addRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { } public void addReplicationStats(ReplicationStats replicationStats) { - this.replicationStats.add(replicationStats); + if (this.replicationStats == null) { + if (replicationStats != null) { + this.replicationStats = new ReplicationStats(); + this.replicationStats.add(replicationStats); + } + } else { + this.replicationStats.add(replicationStats); + } } public void addFileSizes(final Map newFileSizes) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 50f9f1e03bae9..09bd42e049d1f 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1391,7 +1391,7 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats()) ); } - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabled() && isPrimaryMode()) { segmentsStats.addReplicationStats(getReplicationStats()); } return segmentsStats; From 04323bb2e04dc60a6da45b131388541a0d84786e Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Sun, 3 Sep 2023 00:39:46 -0700 Subject: [PATCH 7/9] uniform response for docrep and segrep segment stats Signed-off-by: Poojita Raj --- .../replication/SegmentReplicationStatsIT.java | 6 ++++-- .../org/opensearch/indices/stats/IndexStatsIT.java | 4 ++-- .../org/opensearch/index/engine/SegmentsStats.java | 13 +++---------- 3 files changed, 9 insertions(+), 14 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index 6c8f148c19886..766471fdc0756 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -409,9 +409,11 @@ public void testSegmentReplicationNodeAndIndexStats() throws Exception { // 2 replicas so total bytes should be double of max assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind()); } - // replica nodes will not return stats + // replica nodes - should hold empty replication statistics if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) { - assertNull(replicationStats); + assertEquals(0, replicationStats.getMaxBytesBehind()); + assertEquals(0, replicationStats.getTotalBytesBehind()); + assertEquals(0, replicationStats.getMaxReplicationLag()); } } // get replication statistics at index level diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index 6fcc4d760be4c..3f432060e13fb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -1491,8 +1491,8 @@ public void testSegmentReplicationStats() { IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats(); IndicesStatsResponse stats = builder.execute().actionGet(); - // document replication enabled index should not return segment replication stats - assertNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats()); + // document replication enabled index should return empty segment replication stats + assertNotNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats()); indexName = "test-index2"; createIndex( diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index b7edba195a353..a729368a909ee 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -99,7 +99,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment { public SegmentsStats() { fileSizes = new HashMap<>(); remoteSegmentStats = new RemoteSegmentStats(); - // replicationStats = new ReplicationStats(); + replicationStats = new ReplicationStats(); } public SegmentsStats(StreamInput in) throws IOException { @@ -125,7 +125,7 @@ public SegmentsStats(StreamInput in) throws IOException { replicationStats = in.readOptionalWriteable(ReplicationStats::new); } else { remoteSegmentStats = new RemoteSegmentStats(); - // replicationStats = new ReplicationStats(); + replicationStats = new ReplicationStats(); } } @@ -154,14 +154,7 @@ public void addRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { } public void addReplicationStats(ReplicationStats replicationStats) { - if (this.replicationStats == null) { - if (replicationStats != null) { - this.replicationStats = new ReplicationStats(); - this.replicationStats.add(replicationStats); - } - } else { - this.replicationStats.add(replicationStats); - } + this.replicationStats.add(replicationStats); } public void addFileSizes(final Map newFileSizes) { From 01d4dfed321a91d75e1f80076ae0160c4f589fb1 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Sun, 3 Sep 2023 09:18:45 -0700 Subject: [PATCH 8/9] remove pri check Signed-off-by: Poojita Raj --- .../main/java/org/opensearch/index/ReplicationStats.java | 9 ++++----- .../java/org/opensearch/index/engine/SegmentsStats.java | 2 +- .../main/java/org/opensearch/index/shard/IndexShard.java | 4 ++-- .../action/admin/cluster/node/stats/NodeStatsTests.java | 7 +++++++ 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/ReplicationStats.java b/server/src/main/java/org/opensearch/index/ReplicationStats.java index 15c5837f1d043..9cc6685c75f80 100644 --- a/server/src/main/java/org/opensearch/index/ReplicationStats.java +++ b/server/src/main/java/org/opensearch/index/ReplicationStats.java @@ -47,12 +47,11 @@ public ReplicationStats() { } public void add(ReplicationStats other) { - if (other == null) { - return; + if (other != null) { + maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind); + totalBytesBehind += other.totalBytesBehind; + maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag); } - maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind); - totalBytesBehind += other.totalBytesBehind; - maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag); } public long getMaxBytesBehind() { diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index a729368a909ee..f4fd2490c7abe 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -66,7 +66,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment { /** * Segment replication statistics. */ - private ReplicationStats replicationStats; + private final ReplicationStats replicationStats; /* * A map to provide a best-effort approach describing Lucene index files. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 09bd42e049d1f..7ddef8cb9f3d9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1391,7 +1391,7 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats()) ); } - if (indexSettings.isSegRepEnabled() && isPrimaryMode()) { + if (indexSettings.isSegRepEnabled()) { segmentsStats.addReplicationStats(getReplicationStats()); } return segmentsStats; @@ -2944,7 +2944,7 @@ public Set getReplicationStatsForTrackedReplicas() } public ReplicationStats getReplicationStats() { - if (indexSettings.isSegRepEnabled() && isPrimaryMode()) { + if (indexSettings.isSegRepEnabled()) { final Set stats = getReplicationStatsForTrackedReplicas(); long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L); long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum(); diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 8a450b99904cf..df056c75d66f0 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -46,6 +46,7 @@ import org.opensearch.core.indices.breaker.CircuitBreakerStats; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.indices.NodeIndicesStats; import org.opensearch.ingest.IngestStats; @@ -462,6 +463,12 @@ public void testSerialization() throws IOException { assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(remoteSegmentStats.getTotalUploadTime(), deserializedRemoteSegmentStats.getTotalUploadTime()); assertEquals(remoteSegmentStats.getTotalDownloadTime(), deserializedRemoteSegmentStats.getTotalDownloadTime()); + ReplicationStats replicationStats = nodeIndicesStats.getSegments().getReplicationStats(); + + ReplicationStats deserializedReplicationStats = deserializedNodeIndicesStats.getSegments().getReplicationStats(); + assertEquals(replicationStats.getMaxBytesBehind(), deserializedReplicationStats.getMaxBytesBehind()); + assertEquals(replicationStats.getTotalBytesBehind(), deserializedReplicationStats.getTotalBytesBehind()); + assertEquals(replicationStats.getMaxReplicationLag(), deserializedReplicationStats.getMaxReplicationLag()); } } } From 0e6447118f9a98ca3f1c21f7c92c7c6e2650c775 Mon Sep 17 00:00:00 2001 From: Poojita Raj Date: Sun, 3 Sep 2023 22:53:44 -0700 Subject: [PATCH 9/9] version 3.0 Signed-off-by: Poojita Raj --- .../java/org/opensearch/index/engine/SegmentsStats.java | 7 ++++++- .../main/java/org/opensearch/index/shard/IndexShard.java | 4 ++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index f4fd2490c7abe..0ff64bb43ad5c 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -122,9 +122,12 @@ public SegmentsStats(StreamInput in) throws IOException { fileSizes = in.readMap(StreamInput::readString, StreamInput::readLong); if (in.getVersion().onOrAfter(Version.V_2_10_0)) { remoteSegmentStats = in.readOptionalWriteable(RemoteSegmentStats::new); - replicationStats = in.readOptionalWriteable(ReplicationStats::new); } else { remoteSegmentStats = new RemoteSegmentStats(); + } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + replicationStats = in.readOptionalWriteable(ReplicationStats::new); + } else { replicationStats = new ReplicationStats(); } } @@ -326,6 +329,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeMap(this.fileSizes, StreamOutput::writeString, StreamOutput::writeLong); if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeOptionalWriteable(remoteSegmentStats); + } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeOptionalWriteable(replicationStats); } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7ddef8cb9f3d9..250061c53bda9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -2944,7 +2944,7 @@ public Set getReplicationStatsForTrackedReplicas() } public ReplicationStats getReplicationStats() { - if (indexSettings.isSegRepEnabled()) { + if (indexSettings.isSegRepEnabled() && routingEntry().primary()) { final Set stats = getReplicationStatsForTrackedReplicas(); long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L); long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum(); @@ -2954,7 +2954,7 @@ public ReplicationStats getReplicationStats() { .orElse(0L); return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag); } - return null; + return new ReplicationStats(); } /**