Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Adding segment replication statistics rolled up at index, node and cluster level #9709

Merged
merged 10 commits into from
Sep 4, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584))
- Add average concurrency metric for concurrent segment search ([#9670](https://github.com/opensearch-project/OpenSearch/issues/9670))
- [Remote state] Integrate remote cluster state in publish/commit flow ([#9665](https://github.com/opensearch-project/OpenSearch/pull/9665))
- [Segment Replication] Adding segment replication statistics rolled up at index, node and cluster level ([#9709](https://github.com/opensearch-project/OpenSearch/pull/9709))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ protected Releasable blockReplication(List<String> nodes, CountDownLatch latch)

protected void assertReplicaCheckpointUpdated(IndexShard primaryShard) throws Exception {
assertBusy(() -> {
Set<SegmentReplicationShardStats> groupStats = primaryShard.getReplicationStats();
Set<SegmentReplicationShardStats> groupStats = primaryShard.getReplicationStatsForTrackedReplicas();
assertEquals(primaryShard.indexSettings().getNumberOfReplicas(), groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(0, shardStat.getCheckpointsBehindCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -357,4 +363,71 @@ public void testQueryAgainstDocRepIndex() {
.actionGet();
assertTrue(segmentReplicationStatsResponse.getReplicationStats().isEmpty());
}

public void testSegmentReplicationNodeAndIndexStats() throws Exception {
logger.info("--> start primary node");
final String primaryNode = internalCluster().startNode();

logger.info("--> create index on node: {}", primaryNode);
assertAcked(prepareCreate(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)));

ensureYellow();
logger.info("--> start first replica node");
final String replicaNode1 = internalCluster().startNode();

logger.info("--> start second replica node");
final String replicaNode2 = internalCluster().startNode();

ensureGreen();
CountDownLatch latch = new CountDownLatch(1);
// block replication
try (final Releasable ignored = blockReplication(List.of(replicaNode1, replicaNode2), latch)) {
// index another doc while blocked, this would not get replicated to the replicas.
Thread indexingThread = new Thread(() -> {
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", randomInt()).get();
refresh(INDEX_NAME);
});

indexingThread.start();
indexingThread.join();
latch.await();

NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Segments))
.get();

for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats();
// primary node - should hold replication statistics
if (nodeStats.getNode().getName().equals(primaryNode)) {
assertTrue(replicationStats.getMaxBytesBehind() > 0);
assertTrue(replicationStats.getTotalBytesBehind() > 0);
assertTrue(replicationStats.getMaxReplicationLag() > 0);
// 2 replicas so total bytes should be double of max
assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind());
}
// replica nodes - should hold empty replication statistics
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
assertEquals(0, replicationStats.getMaxBytesBehind());
assertEquals(0, replicationStats.getTotalBytesBehind());
assertEquals(0, replicationStats.getMaxReplicationLag());
}
}
// get replication statistics at index level
IndicesStatsResponse stats = client().admin().indices().prepareStats().execute().actionGet();

// stats should be of non-zero value when aggregated at index level
ReplicationStats indexReplicationStats = stats.getIndex(INDEX_NAME).getTotal().getSegments().getReplicationStats();
assertNotNull(indexReplicationStats);
assertTrue(indexReplicationStats.getMaxBytesBehind() > 0);
assertTrue(indexReplicationStats.getTotalBytesBehind() > 0);
assertTrue(indexReplicationStats.getMaxReplicationLag() > 0);
assertEquals(2 * indexReplicationStats.getMaxBytesBehind(), indexReplicationStats.getTotalBytesBehind());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.InternalSettingsPlugin;
Expand Down Expand Up @@ -1477,4 +1478,37 @@ private void persistGlobalCheckpoint(String index) throws Exception {
}
}
}

public void testSegmentReplicationStats() {
String indexName = "test-index";
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

ensureGreen(indexName);

IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
IndicesStatsResponse stats = builder.execute().actionGet();

// document replication enabled index should return empty segment replication stats
assertNotNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats());

indexName = "test-index2";
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen(indexName);

builder = client().admin().indices().prepareStats();
stats = builder.execute().actionGet();

// segment replication enabled index should return segment replication stats
assertNotNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats());
}
}
97 changes: 97 additions & 0 deletions server/src/main/java/org/opensearch/index/ReplicationStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry didn't catch this - lets move this to indices.replication package?


import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* ReplicationStats is used to provide segment replication statistics at an index,
* node and cluster level on a segment replication enabled cluster.
*
* @opensearch.internal
*/
mch2 marked this conversation as resolved.
Show resolved Hide resolved
public class ReplicationStats implements ToXContentFragment, Writeable {

public long maxBytesBehind;
public long maxReplicationLag;
public long totalBytesBehind;

public ReplicationStats(long maxBytesBehind, long totalBytesBehind, long maxReplicationLag) {
this.maxBytesBehind = maxBytesBehind;
this.totalBytesBehind = totalBytesBehind;
this.maxReplicationLag = maxReplicationLag;
}

Check warning on line 37 in server/src/main/java/org/opensearch/index/ReplicationStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/ReplicationStats.java#L33-L37

Added lines #L33 - L37 were not covered by tests

public ReplicationStats(StreamInput in) throws IOException {
this.maxBytesBehind = in.readVLong();
this.totalBytesBehind = in.readVLong();
this.maxReplicationLag = in.readVLong();
}

public ReplicationStats() {

}

public void add(ReplicationStats other) {
if (other != null) {
maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind);
totalBytesBehind += other.totalBytesBehind;
maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag);
}
}

public long getMaxBytesBehind() {
return this.maxBytesBehind;
}

public long getTotalBytesBehind() {
return this.totalBytesBehind;
}

public long getMaxReplicationLag() {
return this.maxReplicationLag;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(maxBytesBehind);
out.writeVLong(totalBytesBehind);
out.writeVLong(maxReplicationLag);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.SEGMENT_REPLICATION);
builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(maxBytesBehind).toString());
builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(totalBytesBehind).toString());
builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(maxReplicationLag));
builder.endObject();
return builder;
}

/**
* Fields for segment replication statistics
*
* @opensearch.internal
*/
static final class Fields {

Check warning on line 91 in server/src/main/java/org/opensearch/index/ReplicationStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/ReplicationStats.java#L91

Added line #L91 was not covered by tests
static final String SEGMENT_REPLICATION = "segment_replication";
static final String MAX_BYTES_BEHIND = "max_bytes_behind";
static final String TOTAL_BYTES_BEHIND = "total_bytes_behind";
static final String MAX_REPLICATION_LAG = "max_replication_lag";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ public void isSegrepLimitBreached(ShardId shardId) {
}

private void validateReplicationGroup(IndexShard shard) {
final Set<SegmentReplicationShardStats> replicaStats = shard.getReplicationStats();
final Set<SegmentReplicationShardStats> replicaStats = shard.getReplicationStatsForTrackedReplicas();
final Set<SegmentReplicationShardStats> staleReplicas = getStaleReplicas(replicaStats);
if (staleReplicas.isEmpty() == false) {
// inSyncIds always considers the primary id, so filter it out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void incrementRejectionCount(ShardId shardId) {
public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) {
return new SegmentReplicationPerGroupStats(
indexShard.shardId(),
indexShard.getReplicationStats(),
indexShard.getReplicationStatsForTrackedReplicas(),
Optional.ofNullable(rejectionCount.get(indexShard.shardId())).map(AtomicInteger::get).orElse(0)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.remote.RemoteSegmentStats;

import java.io.IOException;
Expand All @@ -62,6 +63,11 @@
private final RemoteSegmentStats remoteSegmentStats;
private static final ByteSizeValue ZERO_BYTE_SIZE_VALUE = new ByteSizeValue(0L);

/**
* Segment replication statistics.
*/
private final ReplicationStats replicationStats;

/*
* A map to provide a best-effort approach describing Lucene index files.
*
Expand Down Expand Up @@ -93,6 +99,7 @@
public SegmentsStats() {
fileSizes = new HashMap<>();
remoteSegmentStats = new RemoteSegmentStats();
replicationStats = new ReplicationStats();
}

public SegmentsStats(StreamInput in) throws IOException {
Expand All @@ -118,6 +125,11 @@
} else {
remoteSegmentStats = new RemoteSegmentStats();
mch2 marked this conversation as resolved.
Show resolved Hide resolved
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
replicationStats = in.readOptionalWriteable(ReplicationStats::new);
} else {
replicationStats = new ReplicationStats();

Check warning on line 131 in server/src/main/java/org/opensearch/index/engine/SegmentsStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/engine/SegmentsStats.java#L131

Added line #L131 was not covered by tests
}
}

public void add(long count) {
Expand All @@ -144,6 +156,10 @@
this.remoteSegmentStats.add(remoteSegmentStats);
}

public void addReplicationStats(ReplicationStats replicationStats) {
this.replicationStats.add(replicationStats);
}

public void addFileSizes(final Map<String, Long> newFileSizes) {
newFileSizes.forEach((k, v) -> this.fileSizes.merge(k, v, (a, b) -> {
assert a != null;
Expand All @@ -163,6 +179,7 @@
addBitsetMemoryInBytes(mergeStats.bitsetMemoryInBytes);
addFileSizes(mergeStats.fileSizes);
addRemoteSegmentStats(mergeStats.remoteSegmentStats);
addReplicationStats(mergeStats.replicationStats);
}

/**
Expand Down Expand Up @@ -215,6 +232,10 @@
return remoteSegmentStats;
}

public ReplicationStats getReplicationStats() {
return replicationStats;
}

/**
* Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
* This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
Expand All @@ -239,6 +260,7 @@
builder.humanReadableField(Fields.FIXED_BIT_SET_MEMORY_IN_BYTES, Fields.FIXED_BIT_SET, getBitsetMemory());
builder.field(Fields.MAX_UNSAFE_AUTO_ID_TIMESTAMP, maxUnsafeAutoIdTimestamp);
remoteSegmentStats.toXContent(builder, params);
replicationStats.toXContent(builder, params);
builder.startObject(Fields.FILE_SIZES);
for (Map.Entry<String, Long> entry : fileSizes.entrySet()) {
builder.startObject(entry.getKey());
Expand Down Expand Up @@ -308,6 +330,9 @@
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeOptionalWriteable(remoteSegmentStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(replicationStats);
}
}

public void clearFileSizes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.VersionType;
import org.opensearch.index.cache.IndexCache;
Expand Down Expand Up @@ -1390,6 +1391,9 @@
new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats())
);
}
if (indexSettings.isSegRepEnabled()) {
segmentsStats.addReplicationStats(getReplicationStats());
}
return segmentsStats;
}

Expand Down Expand Up @@ -2935,10 +2939,24 @@
* @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group,
* V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group.
*/
public Set<SegmentReplicationShardStats> getReplicationStats() {
public Set<SegmentReplicationShardStats> getReplicationStatsForTrackedReplicas() {
return replicationTracker.getSegmentReplicationStats();
}

public ReplicationStats getReplicationStats() {
if (indexSettings.isSegRepEnabled() && routingEntry().primary()) {
final Set<SegmentReplicationShardStats> stats = getReplicationStatsForTrackedReplicas();
long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L);
long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum();
long maxReplicationLag = stats.stream()
.mapToLong(SegmentReplicationShardStats::getCurrentReplicationTimeMillis)
.max()
.orElse(0L);
return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag);

Check warning on line 2955 in server/src/main/java/org/opensearch/index/shard/IndexShard.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/index/shard/IndexShard.java#L2948-L2955

Added lines #L2948 - L2955 were not covered by tests
}
return new ReplicationStats();
}

/**
* Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
* then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout
Expand Down
Loading
Loading