Skip to content

Commit

Permalink
[Segment Replication] Adding segment replication statistics rolled up…
Browse files Browse the repository at this point in the history
… at index, node and cluster level (opensearch-project#9709)

* add node stats for segrep

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* comment removal

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* changelog

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* address review comments

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* move segrep stats to segments

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* fix failures

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* uniform response for docrep and segrep segment stats

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* remove pri check

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

* version 3.0

Signed-off-by: Poojita Raj <poojiraj@amazon.com>

---------

Signed-off-by: Poojita Raj <poojiraj@amazon.com>
Signed-off-by: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com>
Co-authored-by: Rishikesh Pasham <62345295+Rishikesh1159@users.noreply.github.com>
  • Loading branch information
2 people authored and dreamer-89 committed Sep 4, 2023
1 parent 42b30de commit 97b22e7
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))
- Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584))
- Added encryption-sdk lib to provide encryption and decryption capabilities ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466) [#9289](https://github.com/opensearch-project/OpenSearch/pull/9289))
- [Segment Replication] Adding segment replication statistics rolled up at index, node and cluster level ([#9709](https://github.com/opensearch-project/OpenSearch/pull/9709))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ protected Releasable blockReplication(List<String> nodes, CountDownLatch latch)

protected void assertReplicaCheckpointUpdated(IndexShard primaryShard) throws Exception {
assertBusy(() -> {
Set<SegmentReplicationShardStats> groupStats = primaryShard.getReplicationStats();
Set<SegmentReplicationShardStats> groupStats = primaryShard.getReplicationStatsForTrackedReplicas();
assertEquals(primaryShard.indexSettings().getNumberOfReplicas(), groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(0, shardStat.getCheckpointsBehindCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,15 @@

package org.opensearch.indices.replication;

import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.replication.SegmentReplicationStatsResponse;
import org.opensearch.action.admin.indices.stats.CommonStatsFlags;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
Expand Down Expand Up @@ -357,4 +363,71 @@ public void testQueryAgainstDocRepIndex() {
.actionGet();
assertTrue(segmentReplicationStatsResponse.getReplicationStats().isEmpty());
}

public void testSegmentReplicationNodeAndIndexStats() throws Exception {
logger.info("--> start primary node");
final String primaryNode = internalCluster().startNode();

logger.info("--> create index on node: {}", primaryNode);
assertAcked(prepareCreate(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 2)));

ensureYellow();
logger.info("--> start first replica node");
final String replicaNode1 = internalCluster().startNode();

logger.info("--> start second replica node");
final String replicaNode2 = internalCluster().startNode();

ensureGreen();
CountDownLatch latch = new CountDownLatch(1);
// block replication
try (final Releasable ignored = blockReplication(List.of(replicaNode1, replicaNode2), latch)) {
// index another doc while blocked, this would not get replicated to the replicas.
Thread indexingThread = new Thread(() -> {
client().prepareIndex(INDEX_NAME).setId("2").setSource("foo2", randomInt()).get();
refresh(INDEX_NAME);
});

indexingThread.start();
indexingThread.join();
latch.await();

NodesStatsResponse nodesStatsResponse = client().admin()
.cluster()
.prepareNodesStats()
.clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Segments))
.get();

for (NodeStats nodeStats : nodesStatsResponse.getNodes()) {
ReplicationStats replicationStats = nodeStats.getIndices().getSegments().getReplicationStats();
// primary node - should hold replication statistics
if (nodeStats.getNode().getName().equals(primaryNode)) {
assertTrue(replicationStats.getMaxBytesBehind() > 0);
assertTrue(replicationStats.getTotalBytesBehind() > 0);
assertTrue(replicationStats.getMaxReplicationLag() > 0);
// 2 replicas so total bytes should be double of max
assertEquals(replicationStats.getMaxBytesBehind() * 2, replicationStats.getTotalBytesBehind());
}
// replica nodes - should hold empty replication statistics
if (nodeStats.getNode().getName().equals(replicaNode1) || nodeStats.getNode().getName().equals(replicaNode2)) {
assertEquals(0, replicationStats.getMaxBytesBehind());
assertEquals(0, replicationStats.getTotalBytesBehind());
assertEquals(0, replicationStats.getMaxReplicationLag());
}
}
// get replication statistics at index level
IndicesStatsResponse stats = client().admin().indices().prepareStats().execute().actionGet();

// stats should be of non-zero value when aggregated at index level
ReplicationStats indexReplicationStats = stats.getIndex(INDEX_NAME).getTotal().getSegments().getReplicationStats();
assertNotNull(indexReplicationStats);
assertTrue(indexReplicationStats.getMaxBytesBehind() > 0);
assertTrue(indexReplicationStats.getTotalBytesBehind() > 0);
assertTrue(indexReplicationStats.getMaxReplicationLag() > 0);
assertEquals(2 * indexReplicationStats.getMaxBytesBehind(), indexReplicationStats.getTotalBytesBehind());
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesRequestCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.InternalSettingsPlugin;
Expand Down Expand Up @@ -1477,4 +1478,37 @@ private void persistGlobalCheckpoint(String index) throws Exception {
}
}
}

public void testSegmentReplicationStats() {
String indexName = "test-index";
createIndex(
indexName,
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1).build()
);

ensureGreen(indexName);

IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
IndicesStatsResponse stats = builder.execute().actionGet();

// document replication enabled index should return empty segment replication stats
assertNotNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats());

indexName = "test-index2";
createIndex(
indexName,
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build()
);
ensureGreen(indexName);

builder = client().admin().indices().prepareStats();
stats = builder.execute().actionGet();

// segment replication enabled index should return segment replication stats
assertNotNull(stats.getIndex(indexName).getTotal().getSegments().getReplicationStats());
}
}
97 changes: 97 additions & 0 deletions server/src/main/java/org/opensearch/index/ReplicationStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;

/**
* ReplicationStats is used to provide segment replication statistics at an index,
* node and cluster level on a segment replication enabled cluster.
*
* @opensearch.internal
*/
public class ReplicationStats implements ToXContentFragment, Writeable {

public long maxBytesBehind;
public long maxReplicationLag;
public long totalBytesBehind;

public ReplicationStats(long maxBytesBehind, long totalBytesBehind, long maxReplicationLag) {
this.maxBytesBehind = maxBytesBehind;
this.totalBytesBehind = totalBytesBehind;
this.maxReplicationLag = maxReplicationLag;
}

public ReplicationStats(StreamInput in) throws IOException {
this.maxBytesBehind = in.readVLong();
this.totalBytesBehind = in.readVLong();
this.maxReplicationLag = in.readVLong();
}

public ReplicationStats() {

}

public void add(ReplicationStats other) {
if (other != null) {
maxBytesBehind = Math.max(other.maxBytesBehind, maxBytesBehind);
totalBytesBehind += other.totalBytesBehind;
maxReplicationLag = Math.max(other.maxReplicationLag, maxReplicationLag);
}
}

public long getMaxBytesBehind() {
return this.maxBytesBehind;
}

public long getTotalBytesBehind() {
return this.totalBytesBehind;
}

public long getMaxReplicationLag() {
return this.maxReplicationLag;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(maxBytesBehind);
out.writeVLong(totalBytesBehind);
out.writeVLong(maxReplicationLag);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.SEGMENT_REPLICATION);
builder.field(Fields.MAX_BYTES_BEHIND, new ByteSizeValue(maxBytesBehind).toString());
builder.field(Fields.TOTAL_BYTES_BEHIND, new ByteSizeValue(totalBytesBehind).toString());
builder.field(Fields.MAX_REPLICATION_LAG, new TimeValue(maxReplicationLag));
builder.endObject();
return builder;
}

/**
* Fields for segment replication statistics
*
* @opensearch.internal
*/
static final class Fields {
static final String SEGMENT_REPLICATION = "segment_replication";
static final String MAX_BYTES_BEHIND = "max_bytes_behind";
static final String TOTAL_BYTES_BEHIND = "total_bytes_behind";
static final String MAX_REPLICATION_LAG = "max_replication_lag";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void isSegrepLimitBreached(ShardId shardId) {
}

private void validateReplicationGroup(IndexShard shard) {
final Set<SegmentReplicationShardStats> replicaStats = shard.getReplicationStats();
final Set<SegmentReplicationShardStats> replicaStats = shard.getReplicationStatsForTrackedReplicas();
final Set<SegmentReplicationShardStats> staleReplicas = getStaleReplicas(replicaStats);
if (staleReplicas.isEmpty() == false) {
// inSyncIds always considers the primary id, so filter it out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void incrementRejectionCount(ShardId shardId) {
public SegmentReplicationPerGroupStats getStatsForShard(IndexShard indexShard) {
return new SegmentReplicationPerGroupStats(
indexShard.shardId(),
indexShard.getReplicationStats(),
indexShard.getReplicationStatsForTrackedReplicas(),
Optional.ofNullable(rejectionCount.get(indexShard.shardId())).map(AtomicInteger::get).orElse(0)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.remote.RemoteSegmentStats;

import java.io.IOException;
Expand All @@ -62,6 +63,11 @@ public class SegmentsStats implements Writeable, ToXContentFragment {
private final RemoteSegmentStats remoteSegmentStats;
private static final ByteSizeValue ZERO_BYTE_SIZE_VALUE = new ByteSizeValue(0L);

/**
* Segment replication statistics.
*/
private final ReplicationStats replicationStats;

/*
* A map to provide a best-effort approach describing Lucene index files.
*
Expand Down Expand Up @@ -93,6 +99,7 @@ public class SegmentsStats implements Writeable, ToXContentFragment {
public SegmentsStats() {
fileSizes = new HashMap<>();
remoteSegmentStats = new RemoteSegmentStats();
replicationStats = new ReplicationStats();
}

public SegmentsStats(StreamInput in) throws IOException {
Expand All @@ -118,6 +125,11 @@ public SegmentsStats(StreamInput in) throws IOException {
} else {
remoteSegmentStats = new RemoteSegmentStats();
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
replicationStats = in.readOptionalWriteable(ReplicationStats::new);
} else {
replicationStats = new ReplicationStats();
}
}

public void add(long count) {
Expand All @@ -144,6 +156,10 @@ public void addRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) {
this.remoteSegmentStats.add(remoteSegmentStats);
}

public void addReplicationStats(ReplicationStats replicationStats) {
this.replicationStats.add(replicationStats);
}

public void addFileSizes(final Map<String, Long> newFileSizes) {
newFileSizes.forEach((k, v) -> this.fileSizes.merge(k, v, (a, b) -> {
assert a != null;
Expand All @@ -163,6 +179,7 @@ public void add(SegmentsStats mergeStats) {
addBitsetMemoryInBytes(mergeStats.bitsetMemoryInBytes);
addFileSizes(mergeStats.fileSizes);
addRemoteSegmentStats(mergeStats.remoteSegmentStats);
addReplicationStats(mergeStats.replicationStats);
}

/**
Expand Down Expand Up @@ -215,6 +232,10 @@ public RemoteSegmentStats getRemoteSegmentStats() {
return remoteSegmentStats;
}

public ReplicationStats getReplicationStats() {
return replicationStats;
}

/**
* Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
* This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
Expand All @@ -239,6 +260,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.FIXED_BIT_SET_MEMORY_IN_BYTES, Fields.FIXED_BIT_SET, getBitsetMemory());
builder.field(Fields.MAX_UNSAFE_AUTO_ID_TIMESTAMP, maxUnsafeAutoIdTimestamp);
remoteSegmentStats.toXContent(builder, params);
replicationStats.toXContent(builder, params);
builder.startObject(Fields.FILE_SIZES);
for (Map.Entry<String, Long> entry : fileSizes.entrySet()) {
builder.startObject(entry.getKey());
Expand Down Expand Up @@ -308,6 +330,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeOptionalWriteable(remoteSegmentStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(replicationStats);
}
}

public void clearFileSizes() {
Expand Down
20 changes: 19 additions & 1 deletion server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.ReplicationStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.VersionType;
import org.opensearch.index.cache.IndexCache;
Expand Down Expand Up @@ -1395,6 +1396,9 @@ public SegmentsStats segmentStats(boolean includeSegmentFileSizes, boolean inclu
new RemoteSegmentStats(remoteStoreStatsTrackerFactory.getRemoteSegmentTransferTracker(shardId).stats())
);
}
if (indexSettings.isSegRepEnabled()) {
segmentsStats.addReplicationStats(getReplicationStats());
}
return segmentsStats;
}

Expand Down Expand Up @@ -2941,10 +2945,24 @@ public void updateVisibleCheckpointForShard(final String allocationId, final Rep
* @return {@link Tuple} V1 - TimeValue in ms - mean replication lag for this primary to its entire group,
* V2 - Set of {@link SegmentReplicationShardStats} per shard in this primary's replication group.
*/
public Set<SegmentReplicationShardStats> getReplicationStats() {
public Set<SegmentReplicationShardStats> getReplicationStatsForTrackedReplicas() {
return replicationTracker.getSegmentReplicationStats();
}

public ReplicationStats getReplicationStats() {
if (indexSettings.isSegRepEnabled() && routingEntry().primary()) {
final Set<SegmentReplicationShardStats> stats = getReplicationStatsForTrackedReplicas();
long maxBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).max().orElse(0L);
long totalBytesBehind = stats.stream().mapToLong(SegmentReplicationShardStats::getBytesBehindCount).sum();
long maxReplicationLag = stats.stream()
.mapToLong(SegmentReplicationShardStats::getCurrentReplicationTimeMillis)
.max()
.orElse(0L);
return new ReplicationStats(maxBytesBehind, totalBytesBehind, maxReplicationLag);
}
return new ReplicationStats();
}

/**
* Add a global checkpoint listener. If the global checkpoint is equal to or above the global checkpoint the listener is waiting for,
* then the listener will be notified immediately via an executor (so possibly not on the current thread). If the specified timeout
Expand Down
Loading

0 comments on commit 97b22e7

Please sign in to comment.