Skip to content

Commit

Permalink
Add support to Writeable interface
Browse files Browse the repository at this point in the history
UploadStats and UploadMetric will be passed between Nodes during
StatsRequest.

Signed-off-by: Vijayan Balasubramanian <balasvij@amazon.com>
  • Loading branch information
VijayanB committed May 11, 2022
1 parent 67f1288 commit cb1d95d
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@
import java.util.Objects;

import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.xcontent.ToXContentFragment;
import org.opensearch.common.xcontent.XContentBuilder;

/**
* UploadMetric stores metric for an upload API
*/
public final class UploadMetric implements ToXContentFragment {
public final class UploadMetric implements ToXContentFragment, Writeable {

public enum FIELDS {
UPLOAD,
Expand Down Expand Up @@ -116,6 +119,16 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public void writeTo(StreamOutput output) throws IOException {
output.writeString(metricID);
output.writeString(type);
output.writeVLong(uploadCount);
output.writeVLong(successCount);
output.writeVLong(failedCount);
output.writeVLong(duration);
}

/**
* Builder to create {@link UploadMetric}
*/
Expand Down Expand Up @@ -166,5 +179,21 @@ public UploadMetric build() {
return new UploadMetric(this);
}

/**
* Deserialize {@link UploadMetric} from given {@link StreamInput}
* @param input StreamInput instance
* @return UploadMetric returns {@link UploadMetric} from {@link StreamInput}
* @throws IOException if unable to read from {@link StreamInput}
*/
public static UploadMetric fromStreamInput(StreamInput input) throws IOException {
String metricId = input.readString();
String type = input.readString();
UploadMetricBuilder builder = new UploadMetricBuilder(metricId, type).uploadCount(input.readVLong())
.successCount(input.readVLong())
.failedCount(input.readVLong())
.duration(input.readVLong());
return builder.build();
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@

package org.opensearch.geospatial.stats.upload;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.metrics.CounterMetric;

/**
* Contains the total upload stats
*/
public final class UploadStats {
public final class UploadStats implements Writeable {

private static final UploadStats instance = new UploadStats();

Expand All @@ -34,6 +38,19 @@ public static UploadStats getInstance() {
totalAPICount = new CounterMetric();
}

/**
* Get UploadStats from {@link StreamInput}.
* @param input contains {@link UploadStats} in serialized form
* @return UploadStats instance
* @throws IOException if cannot read {@link UploadStats} from given input
*/
public static UploadStats fromStreamInput(StreamInput input) throws IOException {
UploadStats instance = new UploadStats();
instance.totalAPICount.inc(input.readVLong());
instance.metrics.addAll(input.readSet(UploadMetric.UploadMetricBuilder::fromStreamInput));
return instance;
}

/**
* Add new metric to {@link UploadStats}
* @param newMetric {@link UploadMetric} to be added to Stats
Expand Down Expand Up @@ -72,4 +89,9 @@ public List<UploadMetric> getMetrics() {
return List.copyOf(metrics);
}

@Override
public void writeTo(StreamOutput output) throws IOException {
output.writeVLong(getTotalAPICount());
output.writeCollection(metrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import static org.opensearch.geospatial.GeospatialTestHelper.buildFieldNameValuePair;
import static org.opensearch.geospatial.GeospatialTestHelper.randomLowerCaseString;

import java.io.IOException;

import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.test.OpenSearchTestCase;

Expand Down Expand Up @@ -73,4 +77,45 @@ public void testToXContent() {
assertTrue(metricAsString.contains(buildFieldNameValuePair(UploadMetric.FIELDS.FAILED, actualMetric.getFailedCount())));
assertTrue(metricAsString.contains(buildFieldNameValuePair(UploadMetric.FIELDS.SUCCESS, actualMetric.getSuccessCount())));
}

public void testStreams() throws IOException {
UploadMetric actualMetric = GeospatialTestHelper.generateRandomUploadMetric();
BytesStreamOutput output = new BytesStreamOutput();
actualMetric.writeTo(output);
StreamInput in = StreamInput.wrap(output.bytes().toBytesRef().bytes);

UploadMetric serializedMetric = UploadMetric.UploadMetricBuilder.fromStreamInput(in);
assertNotNull("serialized metric cannot be null", serializedMetric);
assertEquals(
"upload count is not matching between serialized and deserialized",
actualMetric.getUploadCount(),
serializedMetric.getUploadCount()
);
assertEquals(
"success count is not matching between serialized and deserialized",
actualMetric.getSuccessCount(),
serializedMetric.getSuccessCount()
);
assertEquals(
"failed count is not matching between serialized and deserialized",
actualMetric.getFailedCount(),
serializedMetric.getFailedCount()
);
assertEquals(
"duration is not matching between serialized and deserialized",
actualMetric.getDuration(),
serializedMetric.getDuration()
);
assertEquals(
"metric id is not matching between serialized and deserialized",
actualMetric.getMetricID(),
serializedMetric.getMetricID()
);
assertEquals(
"geospatial type is not matching between serialized and deserialized",
actualMetric.getType(),
serializedMetric.getType()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@

import static org.opensearch.geospatial.GeospatialTestHelper.GEOJSON;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.IntStream;

import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.geospatial.GeospatialTestHelper;
import org.opensearch.test.OpenSearchTestCase;

Expand Down Expand Up @@ -60,4 +63,23 @@ public void testAddMetricFailsForDuplicateMetrics() {
stats.addMetric(randomMetric);
assertThrows("duplicate metrics are not allowed", IllegalArgumentException.class, () -> stats.addMetric(randomMetric));
}

public void testStreams() throws IOException {
UploadStats stats = new UploadStats();
int metricCount = randomIntBetween(MIN_API_CALLED, MAX_API_CALLED);
Set<UploadMetric> expectedMetrics = new HashSet<>();
IntStream.rangeClosed(MIN_API_CALLED, metricCount).forEach(unUsed -> {
UploadMetric randomMetric = GeospatialTestHelper.generateRandomUploadMetric();
expectedMetrics.add(randomMetric);
stats.addMetric(randomMetric);
stats.incrementAPICount();
});
BytesStreamOutput output = new BytesStreamOutput();
stats.writeTo(output);
StreamInput in = StreamInput.wrap(output.bytes().toBytesRef().bytes);
UploadStats serializedStats = UploadStats.fromStreamInput(in);
assertNotNull("serialized stats cannot be null", serializedStats);
assertEquals("api count is ", stats.getTotalAPICount(), serializedStats.getTotalAPICount());
assertEquals("failed to serialize metrics", stats.getMetrics().size(), serializedStats.getMetrics().size());
}
}

0 comments on commit cb1d95d

Please sign in to comment.