diff --git a/CHANGELOG.md b/CHANGELOG.md index 53ff23e0a1a78..22191e739e601 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584)) - Add average concurrency metric for concurrent segment search ([#9670](https://github.com/opensearch-project/OpenSearch/issues/9670)) - [Remote state] Integrate remote cluster state in publish/commit flow ([#9665](https://github.com/opensearch-project/OpenSearch/pull/9665)) +- [Segment Replication] Adding segment replication statistics rolled up at index, node and cluster level ([#9709](https://github.com/opensearch-project/OpenSearch/pull/9709)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java index 9539044bf75b0..a16167c8ad0c4 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java @@ -241,7 +241,7 @@ protected Releasable blockReplication(List nodes, CountDownLatch latch) protected void assertReplicaCheckpointUpdated(IndexShard primaryShard) throws Exception { assertBusy(() -> { - Set groupStats = primaryShard.getReplicationStats(); + Set groupStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(primaryShard.indexSettings().getNumberOfReplicas(), groupStats.size()); for (SegmentReplicationShardStats shardStat : groupStats) { assertEquals(0, shardStat.getCheckpointsBehindCount()); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java index 159de1a681f53..766471fdc0756 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationStatsIT.java @@ -8,9 +8,15 @@ package org.opensearch.indices.replication; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.lease.Releasable; import org.opensearch.common.settings.Settings; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.SegmentReplicationPerGroupStats; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.shard.IndexShard; @@ -357,4 +363,71 @@ public void testQueryAgainstDocRepIndex() { .actionGet(); assertTrue(segmentReplicationStatsResponse.getReplicationStats().isEmpty()); } + + public void testSegmentReplicationNodeAndIndexStats() throws Exception { + logger.info("--> start primary node"); + final String primaryNode = internalCluster().startNode(); + + logger.info("--> create index on node: {}", primaryNode); + assertAcked(prepareCreate(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2))); + + ensureYellow(); + logger.info("--> start first replica node"); + final String replicaNode1 = internalCluster().startNode(); + + logger.info("--> start second replica node"); + final String replicaNode2 = internalCluster().startNode(); + + ensureGreen(); + CountDownLatch latch = new CountDownLatch(1); + // block replication + try (final Releasable ignored = blockReplication(List.of(replicaNode1, replicaNode2), latch)) { + // index another doc while blocked, this would not get replicated to the replicas. + Thread indexingThread = new Thread(() -> { + client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", randomInt()).get(); + refresh(INDEX_NAME); + }); + + indexingThread.start(); + indexingThread.join(); + latch.await(); + + NodesStatsResponse nodesStatsResponse = client().admin() + .cluster() + .prepareNodesStats() + .clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Segments)) + .get(); + + for (NodeStats nodeStats : nodesStatsResponse.getNodes()) { + ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats(); + // primary node - should hold replication statistics + if (nodeStats.getNode().getName().equals(primaryNode)) { + assertTrue(replicationStats.getMaxBytesBehind() > 0); + assertTrue(replicationStats.getTotalBytesBehind() > 0); + assertTrue(replicationStats.getMaxReplicationLag() > 0); + // 2 replicas so total bytes should be double of max + assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind()); + } + // replica nodes - should hold empty replication statistics + if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) { + assertEquals(0, replicationStats.getMaxBytesBehind()); + assertEquals(0, replicationStats.getTotalBytesBehind()); + assertEquals(0, replicationStats.getMaxReplicationLag()); + } + } + // get replication statistics at index level + IndicesStatsResponse stats = client().admin().indices().prepareStats().execute().actionGet(); + + // stats should be of non-zero value when aggregated at index level + ReplicationStats indexReplicationStats = stats.getIndex(INDEX_NAME).getTotal().getSegments().getReplicationStats(); + assertNotNull(indexReplicationStats); + assertTrue(indexReplicationStats.getMaxBytesBehind() > 0); + assertTrue(indexReplicationStats.getTotalBytesBehind() > 0); + assertTrue(indexReplicationStats.getMaxReplicationLag() > 0); + assertEquals(2 * indexReplicationStats.getMaxBytesBehind(), indexReplicationStats.getTotalBytesBehind()); + } + + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index 1a131a2a7eb3d..3f432060e13fb 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -75,6 +75,7 @@ import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.IndicesRequestCache; import org.opensearch.indices.IndicesService; +import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; import org.opensearch.search.sort.SortOrder; import org.opensearch.test.InternalSettingsPlugin; @@ -1477,4 +1478,37 @@ private void persistGlobalCheckpoint(String index) throws Exception { } } } + + public void testSegmentReplicationStats() { + String indexName = "test-index"; + createIndex( + indexName, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1).build() + ); + + ensureGreen(indexName); + + IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats(); + IndicesStatsResponse stats = builder.execute().actionGet(); + + // document replication enabled index should return empty segment replication stats + assertNotNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats()); + + indexName = "test-index2"; + createIndex( + indexName, + Settings.builder() + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) + .build() + ); + ensureGreen(indexName); + + builder = client().admin().indices().prepareStats(); + stats = builder.execute().actionGet(); + + // segment replication enabled index should return segment replication stats + assertNotNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats()); + } } diff --git a/server/src/main/java/org/opensearch/index/ReplicationStats.java b/server/src/main/java/org/opensearch/index/ReplicationStats.java new file mode 100644 index 0000000000000..9cc6685c75f80 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/ReplicationStats.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * ReplicationStats is used to provide segment replication statistics at an index, + * node and cluster level on a segment replication enabled cluster. + * + * @opensearch.internal + */ +public class ReplicationStats implements ToXContentFragment, Writeable { + + public long maxBytesBehind; + public long maxReplicationLag; + public long totalBytesBehind; + + public ReplicationStats(long maxBytesBehind, long totalBytesBehind, long maxReplicationLag) { + this.maxBytesBehind = maxBytesBehind; + this.totalBytesBehind = totalBytesBehind; + this.maxReplicationLag = maxReplicationLag; + } + + public ReplicationStats(StreamInput in) throws IOException { + this.maxBytesBehind = in.readVLong(); + this.totalBytesBehind = in.readVLong(); + this.maxReplicationLag = in.readVLong(); + } + + public ReplicationStats() { + + } + + public void add(ReplicationStats other) { + if (other != null) { + maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind); + totalBytesBehind += other.totalBytesBehind; + maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag); + } + } + + public long getMaxBytesBehind() { + return this.maxBytesBehind; + } + + public long getTotalBytesBehind() { + return this.totalBytesBehind; + } + + public long getMaxReplicationLag() { + return this.maxReplicationLag; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(maxBytesBehind); + out.writeVLong(totalBytesBehind); + out.writeVLong(maxReplicationLag); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.SEGMENT_REPLICATION); + builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(maxBytesBehind).toString()); + builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(totalBytesBehind).toString()); + builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(maxReplicationLag)); + builder.endObject(); + return builder; + } + + /** + * Fields for segment replication statistics + * + * @opensearch.internal + */ + static final class Fields { + static final String SEGMENT_REPLICATION = "segment_replication"; + static final String MAX_BYTES_BEHIND = "max_bytes_behind"; + static final String TOTAL_BYTES_BEHIND = "total_bytes_behind"; + static final String MAX_REPLICATION_LAG = "max_replication_lag"; + } +} diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java index 0c4fe26eeead5..4284daf9ffef4 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationPressureService.java @@ -151,7 +151,7 @@ public void isSegrepLimitBreached(ShardId shardId) { } private void validateReplicationGroup(IndexShard shard) { - final Set replicaStats = shard.getReplicationStats(); + final Set replicaStats = shard.getReplicationStatsForTrackedReplicas(); final Set staleReplicas = getStaleReplicas(replicaStats); if (staleReplicas.isEmpty() == false) { // inSyncIds always considers the primary id, so filter it out. diff --git a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java index 2255bb17d364f..6d5c00c08caff 100644 --- a/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java +++ b/server/src/main/java/org/opensearch/index/SegmentReplicationStatsTracker.java @@ -59,7 +59,7 @@ public void incrementRejectionCount(ShardId shardId) { public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) { return new SegmentReplicationPerGroupStats( indexShard.shardId(), - indexShard.getReplicationStats(), + indexShard.getReplicationStatsForTrackedReplicas(), Optional.ofNullable(rejectionCount.get(indexShard.shardId())).map(AtomicInteger::get).orElse(0) ); } diff --git a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java index d18579e662710..0ff64bb43ad5c 100644 --- a/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java +++ b/server/src/main/java/org/opensearch/index/engine/SegmentsStats.java @@ -39,6 +39,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.remote.RemoteSegmentStats; import java.io.IOException; @@ -62,6 +63,11 @@ public class SegmentsStats implements Writeable, ToXContentFragment { private final RemoteSegmentStats remoteSegmentStats; private static final ByteSizeValue ZERO_BYTE_SIZE_VALUE = new ByteSizeValue(0L); + /** + * Segment replication statistics. + */ + private final ReplicationStats replicationStats; + /* * A map to provide a best-effort approach describing Lucene index files. * @@ -93,6 +99,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment { public SegmentsStats() { fileSizes = new HashMap<>(); remoteSegmentStats = new RemoteSegmentStats(); + replicationStats = new ReplicationStats(); } public SegmentsStats(StreamInput in) throws IOException { @@ -118,6 +125,11 @@ public SegmentsStats(StreamInput in) throws IOException { } else { remoteSegmentStats = new RemoteSegmentStats(); } + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + replicationStats = in.readOptionalWriteable(ReplicationStats::new); + } else { + replicationStats = new ReplicationStats(); + } } public void add(long count) { @@ -144,6 +156,10 @@ public void addRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { this.remoteSegmentStats.add(remoteSegmentStats); } + public void addReplicationStats(ReplicationStats replicationStats) { + this.replicationStats.add(replicationStats); + } + public void addFileSizes(final Map newFileSizes) { newFileSizes.forEach((k, v) -> this.fileSizes.merge(k, v, (a, b) -> { assert a != null; @@ -163,6 +179,7 @@ public void add(SegmentsStats mergeStats) { addBitsetMemoryInBytes(mergeStats.bitsetMemoryInBytes); addFileSizes(mergeStats.fileSizes); addRemoteSegmentStats(mergeStats.remoteSegmentStats); + addReplicationStats(mergeStats.replicationStats); } /** @@ -215,6 +232,10 @@ public RemoteSegmentStats getRemoteSegmentStats() { return remoteSegmentStats; } + public ReplicationStats getReplicationStats() { + return replicationStats; + } + /** * Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine. * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs @@ -239,6 +260,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.humanReadableField(Fields.FIXED_BIT_SET_MEMORY_IN_BYTES, Fields.FIXED_BIT_SET, getBitsetMemory()); builder.field(Fields.MAX_UNSAFE_AUTO_ID_TIMESTAMP, maxUnsafeAutoIdTimestamp); remoteSegmentStats.toXContent(builder, params); + replicationStats.toXContent(builder, params); builder.startObject(Fields.FILE_SIZES); for (Map.Entry entry : fileSizes.entrySet()) { builder.startObject(entry.getKey()); @@ -308,6 +330,9 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_10_0)) { out.writeOptionalWriteable(remoteSegmentStats); } + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(replicationStats); + } } public void clearFileSizes() { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 352876e54547e..250061c53bda9 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -110,6 +110,7 @@ import org.opensearch.index.IndexNotFoundException; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.SegmentReplicationShardStats; import org.opensearch.index.VersionType; import org.opensearch.index.cache.IndexCache; @@ -1390,6 +1391,9 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats()) ); } + if (indexSettings.isSegRepEnabled()) { + segmentsStats.addReplicationStats(getReplicationStats()); + } return segmentsStats; } @@ -2935,10 +2939,24 @@ public void updateVisibleCheckpointForShard(final String allocationId, final Rep * @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group, * V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group. */ - public Set getReplicationStats() { + public Set getReplicationStatsForTrackedReplicas() { return replicationTracker.getSegmentReplicationStats(); } + public ReplicationStats getReplicationStats() { + if (indexSettings.isSegRepEnabled() && routingEntry().primary()) { + final Set stats = getReplicationStatsForTrackedReplicas(); + long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L); + long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum(); + long maxReplicationLag = stats.stream() + .mapToLong(SegmentReplicationShardStats::getCurrentReplicationTimeMillis) + .max() + .orElse(0L); + return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag); + } + return new ReplicationStats(); + } + /** * Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for, * then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 8a450b99904cf..df056c75d66f0 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -46,6 +46,7 @@ import org.opensearch.core.indices.breaker.CircuitBreakerStats; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; +import org.opensearch.index.ReplicationStats; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.indices.NodeIndicesStats; import org.opensearch.ingest.IngestStats; @@ -462,6 +463,12 @@ public void testSerialization() throws IOException { assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(remoteSegmentStats.getTotalUploadTime(), deserializedRemoteSegmentStats.getTotalUploadTime()); assertEquals(remoteSegmentStats.getTotalDownloadTime(), deserializedRemoteSegmentStats.getTotalDownloadTime()); + ReplicationStats replicationStats = nodeIndicesStats.getSegments().getReplicationStats(); + + ReplicationStats deserializedReplicationStats = deserializedNodeIndicesStats.getSegments().getReplicationStats(); + assertEquals(replicationStats.getMaxBytesBehind(), deserializedReplicationStats.getMaxBytesBehind()); + assertEquals(replicationStats.getTotalBytesBehind(), deserializedReplicationStats.getTotalBytesBehind()); + assertEquals(replicationStats.getMaxReplicationLag(), deserializedReplicationStats.getMaxReplicationLag()); } } } diff --git a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java index 3a08a2c143a89..34fa13f0ba62c 100644 --- a/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java +++ b/server/src/test/java/org/opensearch/index/SegmentReplicationPressureServiceTests.java @@ -114,7 +114,7 @@ public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Excep indexInBatches(5, shards, primaryShard); - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(1, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertEquals(5, shardStats.getCheckpointsBehindCount()); @@ -142,7 +142,7 @@ public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception { indexInBatches(1, shards, primaryShard); assertBusy(() -> { - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(1, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertTrue(shardStats.getCurrentReplicationTimeMillis() > TimeValue.timeValueSeconds(5).millis()); @@ -164,7 +164,7 @@ public void testIsSegrepLimitBreached_underStaleNodeLimit() throws Exception { SegmentReplicationPressureService service = buildPressureService(settings, primaryShard); assertBusy(() -> { - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(3, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertTrue(shardStats.getCurrentReplicationTimeMillis() > TimeValue.timeValueSeconds(5).millis()); @@ -211,7 +211,7 @@ public void testFailStaleReplicaTask() throws Exception { indexInBatches(5, shards, primaryShard); // assert that replica shard is few checkpoints behind primary - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(1, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertEquals(5, shardStats.getCheckpointsBehindCount()); @@ -243,7 +243,7 @@ public void testFailStaleReplicaTaskDisabled() throws Exception { indexInBatches(5, shards, primaryShard); // assert that replica shard is few checkpoints behind primary - Set replicationStats = primaryShard.getReplicationStats(); + Set replicationStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(1, replicationStats.size()); SegmentReplicationShardStats shardStats = replicationStats.stream().findFirst().get(); assertEquals(5, shardStats.getCheckpointsBehindCount()); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index d5ad48e80400c..528402d48658a 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -843,7 +843,7 @@ public void testSegmentReplicationStats() throws Exception { .mapToLong(StoreFileMetadata::length) .sum(); - Set postRefreshStats = primaryShard.getReplicationStats(); + Set postRefreshStats = primaryShard.getReplicationStatsForTrackedReplicas(); SegmentReplicationShardStats shardStats = postRefreshStats.stream().findFirst().get(); assertEquals(1, shardStats.getCheckpointsBehindCount()); assertEquals(initialCheckpointSize, shardStats.getBytesBehindCount()); @@ -863,7 +863,7 @@ public void testSegmentReplicationStats() throws Exception { final Store.RecoveryDiff diff = Store.segmentReplicationDiff(segmentMetadataMap, replicaShard.getSegmentMetadataMap()); final long sizeAfterDeleteAndCommit = diff.missing.stream().mapToLong(StoreFileMetadata::length).sum(); - final Set statsAfterFlush = primaryShard.getReplicationStats(); + final Set statsAfterFlush = primaryShard.getReplicationStatsForTrackedReplicas(); shardStats = statsAfterFlush.stream().findFirst().get(); assertEquals(sizeAfterDeleteAndCommit, shardStats.getBytesBehindCount()); assertEquals(1, shardStats.getCheckpointsBehindCount()); @@ -966,7 +966,7 @@ private ClusterState addSnapshotIndex( } private void assertReplicaCaughtUp(IndexShard primaryShard) { - Set initialStats = primaryShard.getReplicationStats(); + Set initialStats = primaryShard.getReplicationStatsForTrackedReplicas(); assertEquals(initialStats.size(), 1); SegmentReplicationShardStats shardStats = initialStats.stream().findFirst().get(); assertEquals(0, shardStats.getCheckpointsBehindCount());