diff --git a/CHANGELOG.md b/CHANGELOG.md index 2f7ab876e92b0..2bdd1b9315c89 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -184,6 +184,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote Store] Add support for Remote Translog Store stats in `_remotestore/stats/` API ([#9263](https://github.com/opensearch-project/OpenSearch/pull/9263)) - Add support for query profiler with concurrent aggregation ([#9248](https://github.com/opensearch-project/OpenSearch/pull/9248)) - Cleanup Unreferenced file on segment merge failure ([#9503](https://github.com/opensearch-project/OpenSearch/pull/9503)) +- [Remote Store] Add support for Remote Translog Store upload stats in `_nodes/stats/` API ([#8908](https://github.com/opensearch-project/OpenSearch/pull/8908)) ### Deprecated diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index 3f432060e13fb..ee904dbcb6924 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -71,6 +71,7 @@ import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.shard.IndexShard; +import org.opensearch.index.translog.RemoteTranslogStats; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.IndicesRequestCache; @@ -1436,9 +1437,12 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() { .get() .status() ); - ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).get().getShards()[0]; + ShardStats shard = client().admin().indices().prepareStats(indexName).setSegments(true).setTranslog(true).get().getShards()[0]; RemoteSegmentStats remoteSegmentStatsFromIndexStats = shard.getStats().getSegments().getRemoteSegmentStats(); assertZeroRemoteSegmentStats(remoteSegmentStatsFromIndexStats); + RemoteTranslogStats remoteTranslogStatsFromIndexStats = shard.getStats().getTranslog().getRemoteTranslogStats(); + assertZeroRemoteTranslogStats(remoteTranslogStatsFromIndexStats); + NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats(primaryNodeName(indexName)).get(); RemoteSegmentStats remoteSegmentStatsFromNodesStats = nodesStatsResponse.getNodes() .get(0) @@ -1446,20 +1450,22 @@ public void testZeroRemoteStoreStatsOnNonRemoteStoreIndex() { .getSegments() .getRemoteSegmentStats(); assertZeroRemoteSegmentStats(remoteSegmentStatsFromNodesStats); + RemoteTranslogStats remoteTranslogStatsFromNodesStats = nodesStatsResponse.getNodes() + .get(0) + .getIndices() + .getTranslog() + .getRemoteTranslogStats(); + assertZeroRemoteTranslogStats(remoteTranslogStatsFromNodesStats); } private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { - assertEquals(0, remoteSegmentStats.getUploadBytesStarted()); - assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded()); - assertEquals(0, remoteSegmentStats.getUploadBytesFailed()); - assertEquals(0, remoteSegmentStats.getDownloadBytesStarted()); - assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded()); - assertEquals(0, remoteSegmentStats.getDownloadBytesFailed()); - assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag()); - assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); - assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); - assertEquals(0, remoteSegmentStats.getTotalUploadTime()); - assertEquals(0, remoteSegmentStats.getTotalDownloadTime()); + // Compare with fresh object because all values default to 0 in default fresh object + assertEquals(new RemoteSegmentStats(), remoteSegmentStats); + } + + private void assertZeroRemoteTranslogStats(RemoteTranslogStats remoteTranslogStats) { + // Compare with fresh object because all values default to 0 in default fresh object + assertEquals(new RemoteTranslogStats(), remoteTranslogStats); } /** diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java deleted file mode 100644 index 32f3b1066aacd..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteSegmentStatsFromNodesStatsIT.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.remotestore; - -import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; -import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; -import org.opensearch.action.admin.indices.stats.CommonStatsFlags; -import org.opensearch.common.settings.Settings; -import org.opensearch.index.remote.RemoteSegmentStats; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.junit.Before; - -import java.util.concurrent.TimeUnit; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) -public class RemoteSegmentStatsFromNodesStatsIT extends RemoteStoreBaseIntegTestCase { - private static final String INDEX_NAME = "remote-index-1"; - private static final int DATA_NODE_COUNT = 2; - private static final int CLUSTER_MANAGER_NODE_COUNT = 3; - - @Before - public void setup() { - setupCustomCluster(); - } - - private void setupCustomCluster() { - internalCluster().startClusterManagerOnlyNodes(CLUSTER_MANAGER_NODE_COUNT); - internalCluster().startDataOnlyNodes(DATA_NODE_COUNT); - ensureStableCluster(DATA_NODE_COUNT + CLUSTER_MANAGER_NODE_COUNT); - } - - /** - * - Creates two indices with single primary shard, pinned to a single node. - * - Index documents in both of them and forces a fresh for both - * - Polls the _remotestore/stats API for individual index level stats - * - Adds up requisite fields from the API output, repeats this for the 2nd index - * - Polls _nodes/stats and verifies that the total values at node level adds up - * to the values capture in the previous step - */ - public void testNodesStatsParityWithOnlyPrimaryShards() { - String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new); - String randomDataNode = dataNodes[randomIntBetween(0, dataNodes.length - 1)]; - String firstIndex = INDEX_NAME + "1"; - String secondIndex = INDEX_NAME + "2"; - - // Create first index - createIndex( - firstIndex, - Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build() - ); - ensureGreen(firstIndex); - indexSingleDoc(firstIndex, true); - - // Create second index - createIndex( - secondIndex, - Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build() - ); - ensureGreen(secondIndex); - indexSingleDoc(secondIndex, true); - - long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; - long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0; - long totalUploadTime = 0; - // Fetch upload stats - RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(randomDataNode).admin() - .cluster() - .prepareRemoteStoreStats(firstIndex, "0") - .setLocal(true) - .get(); - cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; - cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; - cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; - totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); - totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; - - RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(randomDataNode).admin() - .cluster() - .prepareRemoteStoreStats(secondIndex, "0") - .setLocal(true) - .get(); - - cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; - cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; - cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; - totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); - totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; - - // Fetch nodes stats - NodesStatsResponse nodesStatsResponse = client().admin() - .cluster() - .prepareNodesStats(randomDataNode) - .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true)) - .get(); - RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats(); - assertTrue(cumulativeUploadsSucceeded > 0 && cumulativeUploadsSucceeded == remoteSegmentStats.getUploadBytesSucceeded()); - assertTrue(cumulativeUploadsStarted > 0 && cumulativeUploadsStarted == remoteSegmentStats.getUploadBytesStarted()); - assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed()); - assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag()); - assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag()); - assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag()); - assertTrue(totalUploadTime > 0 && totalUploadTime == remoteSegmentStats.getTotalUploadTime()); - } - - /** - * - Creates two indices with single primary shard and single replica - * - Index documents in both of them and forces a fresh for both - * - Polls the _remotestore/stats API for individual index level stats - * - Adds up requisite fields from the API output for both indices - * - Polls _nodes/stats and verifies that the total values at node level adds up - * to the values capture in the previous step - * - Repeats the above 3 steps for the second node - */ - public void testNodesStatsParityWithReplicaShards() throws Exception { - String firstIndex = INDEX_NAME + "1"; - String secondIndex = INDEX_NAME + "2"; - - createIndex(firstIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build()); - ensureGreen(firstIndex); - indexSingleDoc(firstIndex, true); - - // Create second index - createIndex(secondIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build()); - ensureGreen(secondIndex); - indexSingleDoc(secondIndex, true); - - assertBusy(() -> assertNodeStatsParityAcrossNodes(firstIndex, secondIndex), 15, TimeUnit.SECONDS); - } - - /** - * Ensures that node stats shows 0 values for dedicated cluster manager nodes - * since cluster manager nodes does not participate in indexing - */ - public void testZeroRemoteStatsOnNodesStatsForClusterManager() { - createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); - ensureGreen(INDEX_NAME); - indexSingleDoc(INDEX_NAME); - refresh(INDEX_NAME); - NodesStatsResponse nodesStatsResponseForClusterManager = client().admin() - .cluster() - .prepareNodesStats(internalCluster().getClusterManagerName()) - .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true)) - .get(); - assertTrue( - nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isClusterManagerNode() - && !nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isDataNode() - ); - assertZeroRemoteSegmentStats( - nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats() - ); - NodesStatsResponse nodesStatsResponseForDataNode = client().admin() - .cluster() - .prepareNodesStats(primaryNodeName(INDEX_NAME)) - .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true)) - .get(); - assertTrue(nodesStatsResponseForDataNode.getNodes().get(0).getNode().isDataNode()); - RemoteSegmentStats remoteSegmentStats = nodesStatsResponseForDataNode.getNodes() - .get(0) - .getIndices() - .getSegments() - .getRemoteSegmentStats(); - assertTrue(remoteSegmentStats.getUploadBytesStarted() > 0); - assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); - } - - private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { - assertEquals(0, remoteSegmentStats.getUploadBytesStarted()); - assertEquals(0, remoteSegmentStats.getUploadBytesSucceeded()); - assertEquals(0, remoteSegmentStats.getUploadBytesFailed()); - assertEquals(0, remoteSegmentStats.getDownloadBytesStarted()); - assertEquals(0, remoteSegmentStats.getDownloadBytesSucceeded()); - assertEquals(0, remoteSegmentStats.getDownloadBytesFailed()); - assertEquals(0, remoteSegmentStats.getTotalRefreshBytesLag()); - assertEquals(0, remoteSegmentStats.getMaxRefreshBytesLag()); - assertEquals(0, remoteSegmentStats.getMaxRefreshTimeLag()); - assertEquals(0, remoteSegmentStats.getTotalUploadTime()); - assertEquals(0, remoteSegmentStats.getTotalDownloadTime()); - } - - private static void assertNodeStatsParityAcrossNodes(String firstIndex, String secondIndex) { - for (String dataNode : internalCluster().getDataNodeNames()) { - long cumulativeUploadsSucceeded = 0, cumulativeUploadsStarted = 0, cumulativeUploadsFailed = 0; - long cumulativeDownloadsSucceeded = 0, cumulativeDownloadsStarted = 0, cumulativeDownloadsFailed = 0; - long totalBytesLag = 0, maxBytesLag = 0, maxTimeLag = 0; - long totalUploadTime = 0, totalDownloadTime = 0; - // Fetch upload stats - RemoteStoreStatsResponse remoteStoreStatsFirstIndex = client(dataNode).admin() - .cluster() - .prepareRemoteStoreStats(firstIndex, "0") - .setLocal(true) - .get(); - cumulativeUploadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; - cumulativeUploadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; - cumulativeUploadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; - cumulativeDownloadsSucceeded += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] - .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded; - cumulativeDownloadsStarted += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] - .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; - cumulativeDownloadsFailed += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] - .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; - totalBytesLag += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); - totalUploadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; - totalDownloadTime += remoteStoreStatsFirstIndex.getRemoteStoreStats()[0] - .getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs; - - RemoteStoreStatsResponse remoteStoreStatsSecondIndex = client(dataNode).admin() - .cluster() - .prepareRemoteStoreStats(secondIndex, "0") - .setLocal(true) - .get(); - cumulativeUploadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesSucceeded; - cumulativeUploadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesStarted; - cumulativeUploadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().uploadBytesFailed; - cumulativeDownloadsSucceeded += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] - .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesSucceeded; - cumulativeDownloadsStarted += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] - .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesStarted; - cumulativeDownloadsFailed += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] - .getSegmentStats().directoryFileTransferTrackerStats.transferredBytesFailed; - totalBytesLag += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag; - maxBytesLag = Math.max(maxBytesLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().bytesLag); - maxTimeLag = Math.max(maxTimeLag, remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().refreshTimeLagMs); - totalUploadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0].getSegmentStats().totalUploadTimeInMs; - totalDownloadTime += remoteStoreStatsSecondIndex.getRemoteStoreStats()[0] - .getSegmentStats().directoryFileTransferTrackerStats.totalTransferTimeInMs; - - // Fetch nodes stats - NodesStatsResponse nodesStatsResponse = client().admin() - .cluster() - .prepareNodesStats(dataNode) - .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true)) - .get(); - RemoteSegmentStats remoteSegmentStats = nodesStatsResponse.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats(); - assertEquals(cumulativeUploadsSucceeded, remoteSegmentStats.getUploadBytesSucceeded()); - assertEquals(cumulativeUploadsStarted, remoteSegmentStats.getUploadBytesStarted()); - assertEquals(cumulativeUploadsFailed, remoteSegmentStats.getUploadBytesFailed()); - assertEquals(cumulativeDownloadsSucceeded, remoteSegmentStats.getDownloadBytesSucceeded()); - assertEquals(cumulativeDownloadsStarted, remoteSegmentStats.getDownloadBytesStarted()); - assertEquals(cumulativeDownloadsFailed, remoteSegmentStats.getDownloadBytesFailed()); - assertEquals(totalBytesLag, remoteSegmentStats.getTotalRefreshBytesLag()); - assertEquals(maxBytesLag, remoteSegmentStats.getMaxRefreshBytesLag()); - assertEquals(maxTimeLag, remoteSegmentStats.getMaxRefreshTimeLag()); - // Ensure that total upload time has non-zero value if there has been segments uploaded from the node - if (cumulativeUploadsStarted > 0) { - assertTrue(totalUploadTime > 0); - } - assertEquals(totalUploadTime, remoteSegmentStats.getTotalUploadTime()); - // Ensure that total download time has non-zero value if there has been segments downloaded to the node - if (cumulativeDownloadsStarted > 0) { - assertTrue(totalDownloadTime > 0); - } - assertEquals(totalDownloadTime, remoteSegmentStats.getTotalDownloadTime()); - } - } -} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsFromNodesStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsFromNodesStatsIT.java new file mode 100644 index 0000000000000..6e796bdae5a4a --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreStatsFromNodesStatsIT.java @@ -0,0 +1,209 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.remotestore; + +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse; +import org.opensearch.action.admin.indices.stats.CommonStatsFlags; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.remote.RemoteSegmentStats; +import org.opensearch.index.translog.RemoteTranslogStats; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import java.util.concurrent.TimeUnit; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class RemoteStoreStatsFromNodesStatsIT extends RemoteStoreBaseIntegTestCase { + private static final String INDEX_NAME = "remote-index-1"; + private static final int DATA_NODE_COUNT = 2; + private static final int CLUSTER_MANAGER_NODE_COUNT = 3; + + @Before + public void setup() { + setupCustomCluster(); + } + + private void setupCustomCluster() { + internalCluster().startClusterManagerOnlyNodes(CLUSTER_MANAGER_NODE_COUNT); + internalCluster().startDataOnlyNodes(DATA_NODE_COUNT); + ensureStableCluster(DATA_NODE_COUNT + CLUSTER_MANAGER_NODE_COUNT); + } + + /** + * - Creates two indices with single primary shard, pinned to a single node. + * - Index documents in both of them and forces a fresh for both + * - Polls the _remotestore/stats API for individual index level stats + * - Adds up requisite fields from the API output, repeats this for the 2nd index + * - Polls _nodes/stats and verifies that the total values at node level adds up + * to the values capture in the previous step + */ + public void testNodesStatsParityWithOnlyPrimaryShards() { + String[] dataNodes = internalCluster().getDataNodeNames().toArray(String[]::new); + String randomDataNode = dataNodes[randomIntBetween(0, dataNodes.length - 1)]; + String firstIndex = INDEX_NAME + "1"; + String secondIndex = INDEX_NAME + "2"; + + // Create first index + createIndex( + firstIndex, + Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build() + ); + ensureGreen(firstIndex); + indexSingleDoc(firstIndex, true); + + // Create second index + createIndex( + secondIndex, + Settings.builder().put(remoteStoreIndexSettings(0, 1)).put("index.routing.allocation.require._name", randomDataNode).build() + ); + ensureGreen(secondIndex); + indexSingleDoc(secondIndex, true); + + assertNodeStatsParityOnNode(randomDataNode, firstIndex, secondIndex); + } + + /** + * - Creates two indices with single primary shard and single replica + * - Index documents in both of them and forces a fresh for both + * - Polls the _remotestore/stats API for individual index level stats + * - Adds up requisite fields from the API output for both indices + * - Polls _nodes/stats and verifies that the total values at node level adds up + * to the values capture in the previous step + * - Repeats the above 3 steps for the second node + */ + public void testNodesStatsParityWithReplicaShards() throws Exception { + String firstIndex = INDEX_NAME + "1"; + String secondIndex = INDEX_NAME + "2"; + + createIndex(firstIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build()); + ensureGreen(firstIndex); + indexSingleDoc(firstIndex, true); + + // Create second index + createIndex(secondIndex, Settings.builder().put(remoteStoreIndexSettings(1, 1)).build()); + ensureGreen(secondIndex); + indexSingleDoc(secondIndex, true); + + assertBusy(() -> assertNodeStatsParityAcrossNodes(firstIndex, secondIndex), 15, TimeUnit.SECONDS); + } + + /** + * Ensures that node stats shows 0 values for dedicated cluster manager nodes + * since cluster manager nodes does not participate in indexing + */ + public void testZeroRemoteStatsOnNodesStatsForClusterManager() { + createIndex(INDEX_NAME, remoteStoreIndexSettings(0)); + ensureGreen(INDEX_NAME); + indexSingleDoc(INDEX_NAME); + refresh(INDEX_NAME); + + NodesStatsResponse nodesStatsResponseForClusterManager = client().admin() + .cluster() + .prepareNodesStats(internalCluster().getClusterManagerName()) + .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true)) + .get(); + + assertTrue( + nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isClusterManagerNode() + && !nodesStatsResponseForClusterManager.getNodes().get(0).getNode().isDataNode() + ); + assertZeroRemoteSegmentStats( + nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getSegments().getRemoteSegmentStats() + ); + assertZeroRemoteTranslogStats( + nodesStatsResponseForClusterManager.getNodes().get(0).getIndices().getTranslog().getRemoteTranslogStats() + ); + + NodesStatsResponse nodesStatsResponseForDataNode = client().admin() + .cluster() + .prepareNodesStats(primaryNodeName(INDEX_NAME)) + .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true)) + .get(); + + assertTrue(nodesStatsResponseForDataNode.getNodes().get(0).getNode().isDataNode()); + RemoteSegmentStats remoteSegmentStats = nodesStatsResponseForDataNode.getNodes() + .get(0) + .getIndices() + .getSegments() + .getRemoteSegmentStats(); + assertTrue(remoteSegmentStats.getUploadBytesStarted() > 0); + assertTrue(remoteSegmentStats.getUploadBytesSucceeded() > 0); + + RemoteTranslogStats remoteTranslogStats = nodesStatsResponseForDataNode.getNodes() + .get(0) + .getIndices() + .getTranslog() + .getRemoteTranslogStats(); + assertTrue(remoteTranslogStats.getUploadBytesStarted() > 0); + assertTrue(remoteTranslogStats.getUploadBytesSucceeded() > 0); + } + + private void assertZeroRemoteSegmentStats(RemoteSegmentStats remoteSegmentStats) { + // Compare with fresh object because all values default to 0 in default fresh object + assertEquals(new RemoteSegmentStats(), remoteSegmentStats); + } + + private void assertZeroRemoteTranslogStats(RemoteTranslogStats remoteTranslogStats) { + // Compare with fresh object because all values default to 0 in default fresh object + assertEquals(new RemoteTranslogStats(), remoteTranslogStats); + } + + private static void assertNodeStatsParityAcrossNodes(String... indices) { + for (String dataNode : internalCluster().getDataNodeNames()) { + assertNodeStatsParityOnNode(dataNode, indices); + } + } + + private static void assertNodeStatsParityOnNode(String dataNode, String... indices) { + RemoteSegmentStats remoteSegmentStatsCumulative = new RemoteSegmentStats(); + RemoteTranslogStats remoteTranslogStatsCumulative = new RemoteTranslogStats(); + for (String index : indices) { + // Fetch _remotestore/stats + RemoteStoreStatsResponse remoteStoreStats = client(dataNode).admin() + .cluster() + .prepareRemoteStoreStats(index, "0") + .setLocal(true) + .get(); + remoteSegmentStatsCumulative.add(new RemoteSegmentStats(remoteStoreStats.getRemoteStoreStats()[0].getSegmentStats())); + remoteTranslogStatsCumulative.add(new RemoteTranslogStats(remoteStoreStats.getRemoteStoreStats()[0].getTranslogStats())); + } + + // Fetch _nodes/stats + NodesStatsResponse nodesStatsResponse = client().admin() + .cluster() + .prepareNodesStats(dataNode) + .setIndices(new CommonStatsFlags().set(CommonStatsFlags.Flag.Segments, true).set(CommonStatsFlags.Flag.Translog, true)) + .get(); + + // assert segment stats + RemoteSegmentStats remoteSegmentStatsFromNodesStats = nodesStatsResponse.getNodes() + .get(0) + .getIndices() + .getSegments() + .getRemoteSegmentStats(); + assertEquals(remoteSegmentStatsCumulative, remoteSegmentStatsFromNodesStats); + // Ensure that total upload time has non-zero value if there has been segments uploaded from the node + if (remoteSegmentStatsCumulative.getUploadBytesStarted() > 0) { + assertTrue(remoteSegmentStatsCumulative.getTotalUploadTime() > 0); + } + // Ensure that total download time has non-zero value if there has been segments downloaded to the node + if (remoteSegmentStatsCumulative.getDownloadBytesStarted() > 0) { + assertTrue(remoteSegmentStatsCumulative.getTotalDownloadTime() > 0); + } + + // assert translog stats + RemoteTranslogStats remoteTranslogStatsFromNodesStats = nodesStatsResponse.getNodes() + .get(0) + .getIndices() + .getTranslog() + .getRemoteTranslogStats(); + assertEquals(remoteTranslogStatsCumulative, remoteTranslogStatsFromNodesStats); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java index 8f422bc11f876..f292fcec7ccac 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/remotestore/stats/RemoteStoreStats.java @@ -240,7 +240,7 @@ static final class RoutingFields { /** * Fields for remote store stats response */ - static final class UploadStatsFields { + public static final class UploadStatsFields { /** * Lag in terms of bytes b/w local and remote store */ @@ -294,12 +294,12 @@ static final class UploadStatsFields { /** * Count of files uploaded to remote store */ - static final String TOTAL_UPLOADS = "total_uploads"; + public static final String TOTAL_UPLOADS = "total_uploads"; /** * Represents the total uploads to remote store in bytes */ - static final String TOTAL_UPLOAD_SIZE = "total_upload_size"; + public static final String TOTAL_UPLOAD_SIZE = "total_upload_size"; /** * Total time spent on remote store uploads @@ -367,17 +367,17 @@ static final class DownloadStatsFields { /** * Reusable sub fields for {@link UploadStatsFields} and {@link DownloadStatsFields} */ - static final class SubFields { - static final String STARTED = "started"; - static final String SUCCEEDED = "succeeded"; - static final String FAILED = "failed"; + public static final class SubFields { + public static final String STARTED = "started"; + public static final String SUCCEEDED = "succeeded"; + public static final String FAILED = "failed"; - static final String STARTED_BYTES = "started_bytes"; - static final String SUCCEEDED_BYTES = "succeeded_bytes"; - static final String FAILED_BYTES = "failed_bytes"; + public static final String STARTED_BYTES = "started_bytes"; + public static final String SUCCEEDED_BYTES = "succeeded_bytes"; + public static final String FAILED_BYTES = "failed_bytes"; static final String DOWNLOAD = "download"; - static final String UPLOAD = "upload"; + public static final String UPLOAD = "upload"; /** * Moving avg over last N values stat diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java index b79ae7765c45f..c7863536adf20 100644 --- a/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java +++ b/server/src/main/java/org/opensearch/index/remote/RemoteSegmentStats.java @@ -19,6 +19,7 @@ import org.opensearch.index.shard.IndexShard; import java.io.IOException; +import java.util.Objects; /** * Tracks remote store segment download and upload stats @@ -300,4 +301,40 @@ static final class Fields { static final String TOTAL_TIME_SPENT = "total_time_spent"; static final String TOTAL_TIME_SPENT_IN_MILLIS = "total_time_spent_in_millis"; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RemoteSegmentStats that = (RemoteSegmentStats) o; + return uploadBytesStarted == that.uploadBytesStarted + && uploadBytesFailed == that.uploadBytesFailed + && uploadBytesSucceeded == that.uploadBytesSucceeded + && downloadBytesStarted == that.downloadBytesStarted + && downloadBytesFailed == that.downloadBytesFailed + && downloadBytesSucceeded == that.downloadBytesSucceeded + && maxRefreshTimeLag == that.maxRefreshTimeLag + && maxRefreshBytesLag == that.maxRefreshBytesLag + && totalRefreshBytesLag == that.totalRefreshBytesLag + && totalUploadTime == that.totalUploadTime + && totalDownloadTime == that.totalDownloadTime; + } + + @Override + public int hashCode() { + return Objects.hash( + uploadBytesStarted, + uploadBytesFailed, + uploadBytesSucceeded, + downloadBytesStarted, + downloadBytesFailed, + downloadBytesSucceeded, + maxRefreshTimeLag, + maxRefreshBytesLag, + totalRefreshBytesLag, + totalUploadTime, + totalDownloadTime + ); + } } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 250061c53bda9..0ad82d23e0cc4 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -168,6 +168,7 @@ import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.RemoteFsTranslog; +import org.opensearch.index.translog.RemoteTranslogStats; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogConfig; import org.opensearch.index.translog.TranslogFactory; @@ -1406,7 +1407,15 @@ public FieldDataStats fieldDataStats(String... fields) { } public TranslogStats translogStats() { - return getEngine().translogManager().getTranslogStats(); + TranslogStats translogStats = getEngine().translogManager().getTranslogStats(); + // Populate remote_store stats only if the index is remote store backed + if (indexSettings.isRemoteStoreEnabled()) { + translogStats.addRemoteTranslogStats( + new RemoteTranslogStats(remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardId).stats()) + ); + } + + return translogStats; } public CompletionStats completionStats(String... fields) { diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteTranslogStats.java b/server/src/main/java/org/opensearch/index/translog/RemoteTranslogStats.java new file mode 100644 index 0000000000000..966f8ebc2875a --- /dev/null +++ b/server/src/main/java/org/opensearch/index/translog/RemoteTranslogStats.java @@ -0,0 +1,192 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStats; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; + +import java.io.IOException; +import java.util.Objects; + +/** + * Encapsulates the stats related to Remote Translog Store operations + * + * @opensearch.internal + */ +public class RemoteTranslogStats implements ToXContentFragment, Writeable { + /** + * Total number of Remote Translog Store uploads that have been started + */ + private long totalUploadsStarted; + + /** + * Total number of Remote Translog Store uploads that have failed. + */ + private long totalUploadsFailed; + + /** + * Total number of Remote Translog Store uploads that have been successful. + */ + private long totalUploadsSucceeded; + + /** + * Total number of byte uploads to Remote Translog Store that have been started. + */ + private long uploadBytesStarted; + + /** + * Total number of byte uploads to Remote Translog Store that have failed. + */ + private long uploadBytesFailed; + + /** + * Total number of byte uploads to Remote Translog Store that have been successful. + */ + private long uploadBytesSucceeded; + + static final String REMOTE_STORE = "remote_store"; + + public RemoteTranslogStats() {} + + public RemoteTranslogStats(StreamInput in) throws IOException { + this.totalUploadsStarted = in.readVLong(); + this.totalUploadsFailed = in.readVLong(); + this.totalUploadsSucceeded = in.readVLong(); + this.uploadBytesStarted = in.readVLong(); + this.uploadBytesFailed = in.readVLong(); + this.uploadBytesSucceeded = in.readVLong(); + } + + public RemoteTranslogStats(RemoteTranslogTransferTracker.Stats transferTrackerStats) { + this.totalUploadsStarted = transferTrackerStats.totalUploadsStarted; + this.totalUploadsFailed = transferTrackerStats.totalUploadsFailed; + this.totalUploadsSucceeded = transferTrackerStats.totalUploadsSucceeded; + this.uploadBytesStarted = transferTrackerStats.uploadBytesStarted; + this.uploadBytesFailed = transferTrackerStats.uploadBytesFailed; + this.uploadBytesSucceeded = transferTrackerStats.uploadBytesSucceeded; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(totalUploadsStarted); + out.writeVLong(totalUploadsFailed); + out.writeVLong(totalUploadsSucceeded); + out.writeVLong(uploadBytesStarted); + out.writeVLong(uploadBytesFailed); + out.writeVLong(uploadBytesSucceeded); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) return true; + if (obj == null || getClass() != obj.getClass()) return false; + RemoteTranslogStats other = (RemoteTranslogStats) obj; + + return this.totalUploadsStarted == other.totalUploadsStarted + && this.totalUploadsFailed == other.totalUploadsFailed + && this.totalUploadsSucceeded == other.totalUploadsSucceeded + && this.uploadBytesStarted == other.uploadBytesStarted + && this.uploadBytesFailed == other.uploadBytesFailed + && this.uploadBytesSucceeded == other.uploadBytesSucceeded; + } + + @Override + public int hashCode() { + return Objects.hash( + totalUploadsStarted, + totalUploadsFailed, + totalUploadsSucceeded, + uploadBytesStarted, + uploadBytesFailed, + uploadBytesSucceeded + ); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(REMOTE_STORE); + + builder.startObject(RemoteStoreStats.SubFields.UPLOAD); + addRemoteTranslogUploadStatsXContent(builder); + builder.endObject(); // translog.remote_store.upload + + builder.endObject(); // translog.remote_store + + return builder; + } + + public long getTotalUploadsStarted() { + return totalUploadsStarted; + } + + public long getTotalUploadsFailed() { + return totalUploadsFailed; + } + + public long getTotalUploadsSucceeded() { + return totalUploadsSucceeded; + } + + public long getUploadBytesStarted() { + return uploadBytesStarted; + } + + public long getUploadBytesFailed() { + return uploadBytesFailed; + } + + public long getUploadBytesSucceeded() { + return uploadBytesSucceeded; + } + + public void add(RemoteTranslogStats other) { + if (other == null) { + return; + } + + this.totalUploadsStarted += other.totalUploadsStarted; + this.totalUploadsFailed += other.totalUploadsFailed; + this.totalUploadsSucceeded += other.totalUploadsSucceeded; + this.uploadBytesStarted += other.uploadBytesStarted; + this.uploadBytesFailed += other.uploadBytesFailed; + this.uploadBytesSucceeded += other.uploadBytesSucceeded; + } + + void addRemoteTranslogUploadStatsXContent(XContentBuilder builder) throws IOException { + builder.startObject(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOADS); + builder.field(RemoteStoreStats.SubFields.STARTED, totalUploadsStarted) + .field(RemoteStoreStats.SubFields.FAILED, totalUploadsFailed) + .field(RemoteStoreStats.SubFields.SUCCEEDED, totalUploadsSucceeded); + builder.endObject(); + + builder.startObject(RemoteStoreStats.UploadStatsFields.TOTAL_UPLOAD_SIZE); + builder.humanReadableField( + RemoteStoreStats.SubFields.STARTED_BYTES, + RemoteStoreStats.SubFields.STARTED, + new ByteSizeValue(uploadBytesStarted) + ); + builder.humanReadableField( + RemoteStoreStats.SubFields.FAILED_BYTES, + RemoteStoreStats.SubFields.FAILED, + new ByteSizeValue(uploadBytesFailed) + ); + builder.humanReadableField( + RemoteStoreStats.SubFields.SUCCEEDED_BYTES, + RemoteStoreStats.SubFields.SUCCEEDED, + new ByteSizeValue(uploadBytesSucceeded) + ); + builder.endObject(); + } +} diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogStats.java b/server/src/main/java/org/opensearch/index/translog/TranslogStats.java index cf279334c7557..25bf40e129626 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogStats.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogStats.java @@ -31,6 +31,7 @@ package org.opensearch.index.translog; +import org.opensearch.Version; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; @@ -48,14 +49,21 @@ * @opensearch.internal */ public class TranslogStats implements Writeable, ToXContentFragment { - + private static final String TRANSLOG = "translog"; private long translogSizeInBytes; private int numberOfOperations; private long uncommittedSizeInBytes; private int uncommittedOperations; private long earliestLastModifiedAge; - public TranslogStats() {} + /** + * Stats related to the Remote Translog Store operations + */ + private final RemoteTranslogStats remoteTranslogStats; + + public TranslogStats() { + remoteTranslogStats = new RemoteTranslogStats(); + } public TranslogStats(StreamInput in) throws IOException { numberOfOperations = in.readVInt(); @@ -63,6 +71,11 @@ public TranslogStats(StreamInput in) throws IOException { uncommittedOperations = in.readVInt(); uncommittedSizeInBytes = in.readVLong(); earliestLastModifiedAge = in.readVLong(); + // TODO: remoteTranslogStats = in.getVersion().onOrAfter(Version.V_2_10_0) ? in.readOptionalWriteable(RemoteTranslogStats::new) : + // new RemoteTranslogStats(); + remoteTranslogStats = in.getVersion().onOrAfter(Version.CURRENT) + ? in.readOptionalWriteable(RemoteTranslogStats::new) + : new RemoteTranslogStats(); } public TranslogStats( @@ -87,27 +100,37 @@ public TranslogStats( if (earliestLastModifiedAge < 0) { throw new IllegalArgumentException("earliestLastModifiedAge must be >= 0"); } + this.numberOfOperations = numberOfOperations; this.translogSizeInBytes = translogSizeInBytes; this.uncommittedSizeInBytes = uncommittedSizeInBytes; this.uncommittedOperations = uncommittedOperations; this.earliestLastModifiedAge = earliestLastModifiedAge; + this.remoteTranslogStats = new RemoteTranslogStats(); + } + + public void addRemoteTranslogStats(RemoteTranslogStats remoteTranslogStats) { + if (this.remoteTranslogStats != null) { + this.remoteTranslogStats.add(remoteTranslogStats); + } } - public void add(TranslogStats translogStats) { - if (translogStats == null) { + public void add(TranslogStats other) { + if (other == null) { return; } - this.numberOfOperations += translogStats.numberOfOperations; - this.translogSizeInBytes += translogStats.translogSizeInBytes; - this.uncommittedOperations += translogStats.uncommittedOperations; - this.uncommittedSizeInBytes += translogStats.uncommittedSizeInBytes; + this.numberOfOperations += other.numberOfOperations; + this.translogSizeInBytes += other.translogSizeInBytes; + this.uncommittedOperations += other.uncommittedOperations; + this.uncommittedSizeInBytes += other.uncommittedSizeInBytes; if (this.earliestLastModifiedAge == 0) { - this.earliestLastModifiedAge = translogStats.earliestLastModifiedAge; + this.earliestLastModifiedAge = other.earliestLastModifiedAge; } else { - this.earliestLastModifiedAge = Math.min(this.earliestLastModifiedAge, translogStats.earliestLastModifiedAge); + this.earliestLastModifiedAge = Math.min(this.earliestLastModifiedAge, other.earliestLastModifiedAge); } + + addRemoteTranslogStats(other.remoteTranslogStats); } public long getTranslogSizeInBytes() { @@ -132,15 +155,20 @@ public long getEarliestLastModifiedAge() { return earliestLastModifiedAge; } + public RemoteTranslogStats getRemoteTranslogStats() { + return remoteTranslogStats; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("translog"); - builder.field("operations", numberOfOperations); - builder.humanReadableField("size_in_bytes", "size", new ByteSizeValue(translogSizeInBytes)); - builder.field("uncommitted_operations", uncommittedOperations); - builder.humanReadableField("uncommitted_size_in_bytes", "uncommitted_size", new ByteSizeValue(uncommittedSizeInBytes)); - builder.field("earliest_last_modified_age", earliestLastModifiedAge); + builder.startObject(TRANSLOG); + addLocalTranslogStatsXContent(builder); + if (remoteTranslogStats != null) { + builder = remoteTranslogStats.toXContent(builder, params); + } + builder.endObject(); + return builder; } @@ -156,5 +184,17 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(uncommittedOperations); out.writeVLong(uncommittedSizeInBytes); out.writeVLong(earliestLastModifiedAge); + // TODO: if (out.getVersion().onOrAfter(Version.V_2_10_0)) { + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(remoteTranslogStats); + } + } + + private void addLocalTranslogStatsXContent(XContentBuilder builder) throws IOException { + builder.field("operations", numberOfOperations); + builder.humanReadableField("size_in_bytes", "size", new ByteSizeValue(translogSizeInBytes)); + builder.field("uncommitted_operations", uncommittedOperations); + builder.humanReadableField("uncommitted_size_in_bytes", "uncommitted_size", new ByteSizeValue(uncommittedSizeInBytes)); + builder.field("earliest_last_modified_age", earliestLastModifiedAge); } } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index df056c75d66f0..b50ce17a3d391 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -42,12 +42,15 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.metrics.OperationStats; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.indices.breaker.AllCircuitBreakerStats; import org.opensearch.core.indices.breaker.CircuitBreakerStats; import org.opensearch.discovery.DiscoveryStats; import org.opensearch.http.HttpStats; import org.opensearch.index.ReplicationStats; import org.opensearch.index.remote.RemoteSegmentStats; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.index.translog.RemoteTranslogStats; import org.opensearch.indices.NodeIndicesStats; import org.opensearch.ingest.IngestStats; import org.opensearch.monitor.fs.FsInfo; @@ -463,6 +466,12 @@ public void testSerialization() throws IOException { assertEquals(remoteSegmentStats.getTotalRefreshBytesLag(), deserializedRemoteSegmentStats.getTotalRefreshBytesLag()); assertEquals(remoteSegmentStats.getTotalUploadTime(), deserializedRemoteSegmentStats.getTotalUploadTime()); assertEquals(remoteSegmentStats.getTotalDownloadTime(), deserializedRemoteSegmentStats.getTotalDownloadTime()); + + RemoteTranslogStats remoteTranslogStats = nodeIndicesStats.getTranslog().getRemoteTranslogStats(); + RemoteTranslogStats deserializedRemoteTranslogStats = deserializedNodeIndicesStats.getTranslog() + .getRemoteTranslogStats(); + assertEquals(remoteTranslogStats, deserializedRemoteTranslogStats); + ReplicationStats replicationStats = nodeIndicesStats.getSegments().getReplicationStats(); ReplicationStats deserializedReplicationStats = deserializedNodeIndicesStats.getSegments().getReplicationStats(); @@ -792,6 +801,7 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { NodeIndicesStats indicesStats = null; if (remoteStoreStats) { indicesStats = new NodeIndicesStats(new CommonStats(CommonStatsFlags.ALL), new HashMap<>()); + RemoteSegmentStats remoteSegmentStats = indicesStats.getSegments().getRemoteSegmentStats(); remoteSegmentStats.addUploadBytesStarted(10L); remoteSegmentStats.addUploadBytesSucceeded(10L); @@ -804,10 +814,38 @@ private static NodeIndicesStats getNodeIndicesStats(boolean remoteStoreStats) { remoteSegmentStats.setMaxRefreshTimeLag(2L); remoteSegmentStats.addTotalUploadTime(20L); remoteSegmentStats.addTotalDownloadTime(20L); + + RemoteTranslogStats remoteTranslogStats = indicesStats.getTranslog().getRemoteTranslogStats(); + RemoteTranslogStats otherRemoteTranslogStats = new RemoteTranslogStats(getRandomRemoteTranslogTransferTrackerStats()); + remoteTranslogStats.add(otherRemoteTranslogStats); } return indicesStats; } + private static RemoteTranslogTransferTracker.Stats getRandomRemoteTranslogTransferTrackerStats() { + return new RemoteTranslogTransferTracker.Stats( + new ShardId("test-idx", "test-idx", randomIntBetween(1, 10)), + 0L, + randomLongBetween(100, 500), + randomLongBetween(50, 100), + randomLongBetween(100, 200), + randomLongBetween(10000, 50000), + randomLongBetween(5000, 10000), + randomLongBetween(10000, 20000), + 0L, + 0D, + 0D, + 0D, + 0L, + 0L, + 0L, + 0L, + 0D, + 0D, + 0D + ); + } + private OperationStats getPipelineStats(List pipelineStats, String id) { return pipelineStats.stream().filter(p1 -> p1.getPipelineId().equals(id)).findFirst().map(p2 -> p2.getStats()).orElse(null); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 140ea0fade4b4..cd272d8f626d0 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -125,6 +125,7 @@ import org.opensearch.index.mapper.VersionFieldMapper; import org.opensearch.index.remote.RemoteSegmentStats; import org.opensearch.index.remote.RemoteSegmentTransferTracker; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; import org.opensearch.index.seqno.ReplicationTracker; import org.opensearch.index.seqno.RetentionLease; import org.opensearch.index.seqno.RetentionLeaseSyncer; @@ -137,6 +138,7 @@ import org.opensearch.index.store.StoreUtils; import org.opensearch.index.translog.InternalTranslogFactory; import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; +import org.opensearch.index.translog.RemoteTranslogStats; import org.opensearch.index.translog.TestTranslog; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogStats; @@ -1821,9 +1823,12 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) .build() ); - RemoteSegmentTransferTracker remoteRefreshSegmentTracker = shard.getRemoteStoreStatsTrackerFactory() + RemoteSegmentTransferTracker remoteSegmentTransferTracker = shard.getRemoteStoreStatsTrackerFactory() .getRemoteSegmentTransferTracker(shard.shardId); - populateSampleRemoteStoreStats(remoteRefreshSegmentTracker); + RemoteTranslogTransferTracker remoteTranslogTransferTracker = shard.getRemoteStoreStatsTrackerFactory() + .getRemoteTranslogTransferTracker(shard.shardId); + populateSampleRemoteSegmentStats(remoteSegmentTransferTracker); + populateSampleRemoteTranslogStats(remoteTranslogTransferTracker); ShardStats shardStats = new ShardStats( shard.routingEntry(), shard.shardPath(), @@ -1833,9 +1838,9 @@ public void testShardStatsWithRemoteStoreEnabled() throws IOException { shard.getRetentionLeaseStats() ); RemoteSegmentStats remoteSegmentStats = shardStats.getStats().getSegments().getRemoteSegmentStats(); - assertEquals(remoteRefreshSegmentTracker.getUploadBytesStarted(), remoteSegmentStats.getUploadBytesStarted()); - assertEquals(remoteRefreshSegmentTracker.getUploadBytesSucceeded(), remoteSegmentStats.getUploadBytesSucceeded()); - assertEquals(remoteRefreshSegmentTracker.getUploadBytesFailed(), remoteSegmentStats.getUploadBytesFailed()); + assertRemoteSegmentStats(remoteSegmentTransferTracker, remoteSegmentStats); + RemoteTranslogStats remoteTranslogStats = shardStats.getStats().getTranslog().getRemoteTranslogStats(); + assertRemoteTranslogStats(remoteTranslogTransferTracker, remoteTranslogStats); closeShards(shard); } @@ -4901,10 +4906,42 @@ public void testRecordsForceMerges() throws IOException { closeShards(shard); } - private void populateSampleRemoteStoreStats(RemoteSegmentTransferTracker tracker) { - tracker.addUploadBytesStarted(10L); - tracker.addUploadBytesStarted(10L); + private void populateSampleRemoteSegmentStats(RemoteSegmentTransferTracker tracker) { + tracker.addUploadBytesStarted(30L); tracker.addUploadBytesSucceeded(10L); tracker.addUploadBytesFailed(10L); } + + private void populateSampleRemoteTranslogStats(RemoteTranslogTransferTracker tracker) { + tracker.incrementTotalUploadsStarted(); + tracker.incrementTotalUploadsStarted(); + tracker.incrementTotalUploadsStarted(); + tracker.incrementTotalUploadsSucceeded(); + tracker.incrementTotalUploadsFailed(); + int bytesStarted = randomIntBetween(100, 1000); + tracker.addUploadBytesStarted(bytesStarted); + tracker.addUploadBytesSucceeded(randomIntBetween(1, bytesStarted / 2)); + tracker.addUploadBytesFailed(randomIntBetween(1, bytesStarted / 2)); + } + + private static void assertRemoteTranslogStats( + RemoteTranslogTransferTracker remoteTranslogTransferTracker, + RemoteTranslogStats remoteTranslogStats + ) { + assertEquals(remoteTranslogTransferTracker.getTotalUploadsStarted(), remoteTranslogStats.getTotalUploadsStarted()); + assertEquals(remoteTranslogTransferTracker.getTotalUploadsSucceeded(), remoteTranslogStats.getTotalUploadsSucceeded()); + assertEquals(remoteTranslogTransferTracker.getTotalUploadsFailed(), remoteTranslogStats.getTotalUploadsFailed()); + assertEquals(remoteTranslogTransferTracker.getUploadBytesStarted(), remoteTranslogStats.getUploadBytesStarted()); + assertEquals(remoteTranslogTransferTracker.getUploadBytesSucceeded(), remoteTranslogStats.getUploadBytesSucceeded()); + assertEquals(remoteTranslogTransferTracker.getUploadBytesFailed(), remoteTranslogStats.getUploadBytesFailed()); + } + + private static void assertRemoteSegmentStats( + RemoteSegmentTransferTracker remoteSegmentTransferTracker, + RemoteSegmentStats remoteSegmentStats + ) { + assertEquals(remoteSegmentTransferTracker.getUploadBytesStarted(), remoteSegmentStats.getUploadBytesStarted()); + assertEquals(remoteSegmentTransferTracker.getUploadBytesSucceeded(), remoteSegmentStats.getUploadBytesSucceeded()); + assertEquals(remoteSegmentTransferTracker.getUploadBytesFailed(), remoteSegmentStats.getUploadBytesFailed()); + } } diff --git a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java index 836f4e1eda5e7..dbfc66d6de4b3 100644 --- a/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/LocalTranslogTests.java @@ -519,17 +519,18 @@ public void testStats() throws IOException { builder.startObject(); copy.toXContent(builder, ToXContent.EMPTY_PARAMS); builder.endObject(); - assertThat( - builder.toString(), - equalTo( - "{\"translog\":{\"operations\":4,\"size_in_bytes\":" - + 326 - + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" - + 271 - + ",\"earliest_last_modified_age\":" - + stats.getEarliestLastModifiedAge() - + "}}" - ) + assertEquals( + "{\"translog\":{\"operations\":4,\"size_in_bytes\":" + + 326 + + ",\"uncommitted_operations\":4,\"uncommitted_size_in_bytes\":" + + 271 + + ",\"earliest_last_modified_age\":" + + stats.getEarliestLastModifiedAge() + + ",\"remote_store\":{\"upload\":{" + + "\"total_uploads\":{\"started\":0,\"failed\":0,\"succeeded\":0}," + + "\"total_upload_size\":{\"started_bytes\":0,\"failed_bytes\":0,\"succeeded_bytes\":0}" + + "}}}}", + builder.toString() ); } } diff --git a/server/src/test/java/org/opensearch/index/translog/RemoteTranslogStatsTests.java b/server/src/test/java/org/opensearch/index/translog/RemoteTranslogStatsTests.java new file mode 100644 index 0000000000000..aa390cdba1275 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/translog/RemoteTranslogStatsTests.java @@ -0,0 +1,104 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.translog; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.remote.RemoteTranslogTransferTracker; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class RemoteTranslogStatsTests extends OpenSearchTestCase { + RemoteTranslogTransferTracker.Stats transferTrackerStats; + RemoteTranslogStats remoteTranslogStats; + + @Override + public void setUp() throws Exception { + super.setUp(); + transferTrackerStats = getRandomTransferTrackerStats(); + remoteTranslogStats = new RemoteTranslogStats(transferTrackerStats); + } + + public void testRemoteTranslogStatsCreationFromTransferTrackerStats() { + assertEquals(transferTrackerStats.totalUploadsStarted, remoteTranslogStats.getTotalUploadsStarted()); + assertEquals(transferTrackerStats.totalUploadsSucceeded, remoteTranslogStats.getTotalUploadsSucceeded()); + assertEquals(transferTrackerStats.totalUploadsFailed, remoteTranslogStats.getTotalUploadsFailed()); + assertEquals(transferTrackerStats.uploadBytesStarted, remoteTranslogStats.getUploadBytesStarted()); + assertEquals(transferTrackerStats.uploadBytesSucceeded, remoteTranslogStats.getUploadBytesSucceeded()); + assertEquals(transferTrackerStats.uploadBytesFailed, remoteTranslogStats.getUploadBytesFailed()); + } + + public void testRemoteTranslogStatsSerialization() throws IOException { + try (BytesStreamOutput out = new BytesStreamOutput()) { + remoteTranslogStats.writeTo(out); + try (StreamInput in = out.bytes().streamInput()) { + RemoteTranslogStats remoteTranslogStatsFromStream = new RemoteTranslogStats(in); + assertEquals(remoteTranslogStats, remoteTranslogStatsFromStream); + } + } + } + + public void testAdd() { + RemoteTranslogTransferTracker.Stats otherTransferTrackerStats = getRandomTransferTrackerStats(); + RemoteTranslogStats otherRemoteTranslogStats = new RemoteTranslogStats(otherTransferTrackerStats); + + otherRemoteTranslogStats.add(remoteTranslogStats); + + assertEquals( + otherRemoteTranslogStats.getTotalUploadsStarted(), + otherTransferTrackerStats.totalUploadsStarted + remoteTranslogStats.getTotalUploadsStarted() + ); + assertEquals( + otherRemoteTranslogStats.getTotalUploadsSucceeded(), + otherTransferTrackerStats.totalUploadsSucceeded + remoteTranslogStats.getTotalUploadsSucceeded() + ); + assertEquals( + otherRemoteTranslogStats.getTotalUploadsFailed(), + otherTransferTrackerStats.totalUploadsFailed + remoteTranslogStats.getTotalUploadsFailed() + ); + assertEquals( + otherRemoteTranslogStats.getUploadBytesStarted(), + otherTransferTrackerStats.uploadBytesStarted + remoteTranslogStats.getUploadBytesStarted() + ); + assertEquals( + otherRemoteTranslogStats.getUploadBytesSucceeded(), + otherTransferTrackerStats.uploadBytesSucceeded + remoteTranslogStats.getUploadBytesSucceeded() + ); + assertEquals( + otherRemoteTranslogStats.getUploadBytesFailed(), + otherTransferTrackerStats.uploadBytesFailed + remoteTranslogStats.getUploadBytesFailed() + ); + } + + private static RemoteTranslogTransferTracker.Stats getRandomTransferTrackerStats() { + return new RemoteTranslogTransferTracker.Stats( + new ShardId("test-idx", "test-idx", randomIntBetween(1, 10)), + 0L, + randomLongBetween(100, 500), + randomLongBetween(50, 100), + randomLongBetween(100, 200), + randomLongBetween(10000, 50000), + randomLongBetween(5000, 10000), + randomLongBetween(10000, 20000), + 0L, + 0D, + 0D, + 0D, + 0L, + 0L, + 0L, + 0L, + 0D, + 0D, + 0D + ); + } +}