Skip to content

Commit

Permalink
nit fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Sarthak Aggarwal <sarthagg@amazon.com>
  • Loading branch information
sarthakaggarwal97 committed Aug 7, 2024
1 parent 0344858 commit 41697e6
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ class CountValueAggregator implements ValueAggregator<Long> {

public static final long DEFAULT_INITIAL_VALUE = 1L;
private final StarTreeNumericType starTreeNumericType;
private final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.LONG;

public CountValueAggregator(StarTreeNumericType starTreeNumericType) {
this.starTreeNumericType = starTreeNumericType;
}

@Override
public StarTreeNumericType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

@Override
public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {

Expand All @@ -35,10 +41,11 @@ public Long getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {

@Override
public Long mergeAggregatedValueAndSegmentValue(Long value, Long segmentDocValue) {
if (value == null) {
return getIdentityMetricValue();
assert value != null;
if (segmentDocValue != null) {
return value + 1;
}
return value + 1;
return value;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,18 @@ abstract class StatelessDoubleValueAggregator implements ValueAggregator<Double>

protected final StarTreeNumericType starTreeNumericType;
protected final Double identityValue;
private final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.DOUBLE;

public StatelessDoubleValueAggregator(StarTreeNumericType starTreeNumericType, Double identityValue) {
this.starTreeNumericType = starTreeNumericType;
this.identityValue = identityValue;
}

@Override
public StarTreeNumericType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

@Override
public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
if (segmentDocValue == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@
class SumValueAggregator implements ValueAggregator<Double> {

private final StarTreeNumericType starTreeNumericType;
private final StarTreeNumericType VALUE_AGGREGATOR_TYPE = StarTreeNumericType.DOUBLE;

private CompensatedSum kahanSummation = new CompensatedSum(0, 0);

public SumValueAggregator(StarTreeNumericType starTreeNumericType) {
this.starTreeNumericType = starTreeNumericType;
}

@Override
public StarTreeNumericType getAggregatedValueType() {
return VALUE_AGGREGATOR_TYPE;
}

@Override
public Double getInitialAggregatedValueForSegmentDocValue(Long segmentDocValue) {
kahanSummation.reset(0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,20 @@
*/
package org.opensearch.index.compositeindex.datacube.startree.aggregators;

import org.opensearch.index.compositeindex.datacube.startree.aggregators.numerictype.StarTreeNumericType;

/**
* A value aggregator that pre-aggregates on the input values for a specific type of aggregation.
*
* @opensearch.experimental
*/
public interface ValueAggregator<A> {

/**
* Returns the data type of the aggregated value.
*/
StarTreeNumericType getAggregatedValueType();

/**
* Returns the initial aggregated value.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,7 @@ protected long readMetrics(RandomAccessInput input, long offset, int numMetrics,
throw new IllegalStateException("Unsupported metric type");
}
}
offset += StarTreeDocumentBitSetUtil.readBitSet(
input,
offset,
metrics,
index -> metricAggregatorInfos.get(index).getValueAggregators().getIdentityMetricValue()
);
offset += StarTreeDocumentBitSetUtil.readBitSet(input, offset, metrics, index -> null);
return offset;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,17 +470,18 @@ public List<SequentialDocValuesIterator> getMetricReaders(SegmentWriteState stat
List<SequentialDocValuesIterator> metricReaders = new ArrayList<>();
for (Metric metric : this.starTreeField.getMetrics()) {
for (MetricStat metricStat : metric.getMetrics()) {
SequentialDocValuesIterator metricReader;
FieldInfo metricFieldInfo = state.fieldInfos.fieldInfo(metric.getField());
if (metricFieldInfo == null) {
metricFieldInfo = getFieldInfo(metric.getField());
}
// TODO
// if (metricStat != MetricStat.COUNT) {
// Need not initialize the metric reader for COUNT metric type
SequentialDocValuesIterator metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
// }
if (metricStat != MetricStat.COUNT) {
metricReader = new SequentialDocValuesIterator(
fieldProducerMap.get(metricFieldInfo.name).getSortedNumeric(metricFieldInfo)
);
} else {
metricReader = new SequentialDocValuesIterator();
}

metricReaders.add(metricReader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

package org.opensearch.index.compositeindex.datacube.startree.utils;

import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.DocIdSetIterator;
import org.opensearch.common.annotation.ExperimentalApi;
Expand Down Expand Up @@ -42,6 +43,14 @@ public SequentialDocValuesIterator(DocIdSetIterator docIdSetIterator) {
this.docIdSetIterator = docIdSetIterator;
}

/**
* Constructs a new SequentialDocValuesIterator instance with an empty sorted numeric.
*
*/
public SequentialDocValuesIterator() {
this.docIdSetIterator = DocValues.emptySortedNumeric();
}

/**
* Returns the id of the latest document.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ public void testGetInitialAggregatedValueForSegmentDocNullValue() {
}

public void testMergeAggregatedNullValueAndSegmentNullValue() {
assertEquals(aggregator.getIdentityMetricValue(), aggregator.mergeAggregatedValueAndSegmentValue(null, null));
if (aggregator instanceof CountValueAggregator) {
assertThrows(AssertionError.class, () -> aggregator.mergeAggregatedValueAndSegmentValue(null, null));
} else {
assertEquals(aggregator.getIdentityMetricValue(), aggregator.mergeAggregatedValueAndSegmentValue(null, null));
}
}

public void testMergeAggregatedNullValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,14 @@ public void test_sortAndAggregateStarTreeDocuments_nullAndMinusOneInDimensionFie
new Long[] { 2L, null, 3L, 4L },
new Double[] { 12.0, null, randomDouble(), 8.0, 20.0 }
);
starTreeDocuments[1] = new StarTreeDocument(new Long[] { null, 4L, 2L, 1L }, new Double[] { 10.0, null, randomDouble(), 12.0, 10.0 });
starTreeDocuments[2] = new StarTreeDocument(new Long[] { null, 4L, 2L, 1L }, new Double[] { 14.0, null, randomDouble(), 6.0, 24.0 });
starTreeDocuments[1] = new StarTreeDocument(
new Long[] { null, 4L, 2L, 1L },
new Double[] { 10.0, null, randomDouble(), 12.0, 10.0 }
);
starTreeDocuments[2] = new StarTreeDocument(
new Long[] { null, 4L, 2L, 1L },
new Double[] { 14.0, null, randomDouble(), 6.0, 24.0 }
);
starTreeDocuments[3] = new StarTreeDocument(new Long[] { 2L, null, 3L, 4L }, new Double[] { 9.0, null, randomDouble(), 9.0, 12.0 });
starTreeDocuments[4] = new StarTreeDocument(new Long[] { -1L, 4L, 2L, 1L }, new Double[] { 11.0, null, randomDouble(), 8.0, 13.0 });

Expand Down Expand Up @@ -453,7 +459,7 @@ public void test_sortAndAggregateStarTreeDocuments_nullDimensionsAndNullMetrics(
starTreeDocuments[4] = new StarTreeDocument(new Long[] { null, null, null, null }, new Double[] { null, null, null, null, null });

List<StarTreeDocument> inorderStarTreeDocuments = List.of(
new StarTreeDocument(new Long[] { null, null, null, null }, new Object[] { 0.0, 0.0, 5L })
new StarTreeDocument(new Long[] { null, null, null, null }, new Object[] { 0.0, 0.0, 0L, null, null })
);
Iterator<StarTreeDocument> expectedStarTreeDocumentIterator = inorderStarTreeDocuments.iterator();

Expand All @@ -469,11 +475,87 @@ public void test_sortAndAggregateStarTreeDocuments_nullDimensionsAndNullMetrics(
? NumericUtils.doubleToSortableLong((Double) starTreeDocuments[i].metrics[2])
: null;
Long metric4 = starTreeDocuments[i].metrics[3] != null
? NumericUtils.doubleToSortableLong((Double) starTreeDocuments[i].metrics[2])
? NumericUtils.doubleToSortableLong((Double) starTreeDocuments[i].metrics[3])
: null;
Long metric5 = starTreeDocuments[i].metrics[4] != null
? NumericUtils.doubleToSortableLong((Double) starTreeDocuments[i].metrics[4])
: null;
segmentStarTreeDocuments[i] = new StarTreeDocument(
starTreeDocuments[i].dimensions,
new Object[] { metric1, metric2, metric3, metric4, metric5 }
);
}
SequentialDocValuesIterator[] dimsIterators = getDimensionIterators(segmentStarTreeDocuments);
List<SequentialDocValuesIterator> metricsIterators = getMetricIterators(segmentStarTreeDocuments);
builder = getStarTreeBuilder(compositeField, writeState, mapperService);
Iterator<StarTreeDocument> segmentStarTreeDocumentIterator = builder.sortAndAggregateSegmentDocuments(
dimsIterators,
metricsIterators
);

while (segmentStarTreeDocumentIterator.hasNext() && expectedStarTreeDocumentIterator.hasNext()) {
StarTreeDocument resultStarTreeDocument = segmentStarTreeDocumentIterator.next();
StarTreeDocument expectedStarTreeDocument = expectedStarTreeDocumentIterator.next();
assertEquals(expectedStarTreeDocument.dimensions[0], resultStarTreeDocument.dimensions[0]);
assertEquals(expectedStarTreeDocument.dimensions[1], resultStarTreeDocument.dimensions[1]);
assertEquals(expectedStarTreeDocument.dimensions[2], resultStarTreeDocument.dimensions[2]);
assertEquals(expectedStarTreeDocument.dimensions[3], resultStarTreeDocument.dimensions[3]);
assertEquals(expectedStarTreeDocument.metrics[0], resultStarTreeDocument.metrics[0]);
assertEquals(expectedStarTreeDocument.metrics[1], resultStarTreeDocument.metrics[1]);
assertEquals(expectedStarTreeDocument.metrics[2], resultStarTreeDocument.metrics[2]);
assertEquals(expectedStarTreeDocument.metrics[3], resultStarTreeDocument.metrics[3]);
assertEquals(expectedStarTreeDocument.metrics[4], resultStarTreeDocument.metrics[4]);
}
builder.build(segmentStarTreeDocumentIterator);
validateStarTree(builder.getRootNode(), 4, 1, builder.getStarTreeDocuments());
}

public void test_sortAndAggregateStarTreeDocuments_nullDimensionsAndFewNullMetrics() throws IOException {
int noOfStarTreeDocuments = 5;
StarTreeDocument[] starTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments];

double sumValue = randomDouble();
double minValue = randomDouble();
double maxValue = randomDouble();

// Setting second metric iterator as empty sorted numeric , indicating a metric field is null
starTreeDocuments[0] = new StarTreeDocument(
new Long[] { null, null, null, null },
new Double[] { null, null, randomDouble(), null, maxValue }
);
starTreeDocuments[1] = new StarTreeDocument(new Long[] { null, null, null, null }, new Double[] { null, null, null, null, null });
starTreeDocuments[2] = new StarTreeDocument(
new Long[] { null, null, null, null },
new Double[] { null, null, null, minValue, null }
);
starTreeDocuments[3] = new StarTreeDocument(new Long[] { null, null, null, null }, new Double[] { null, null, null, null, null });
starTreeDocuments[4] = new StarTreeDocument(
new Long[] { null, null, null, null },
new Double[] { sumValue, null, randomDouble(), null, null }
);

List<StarTreeDocument> inorderStarTreeDocuments = List.of(
new StarTreeDocument(new Long[] { null, null, null, null }, new Object[] { sumValue, 0.0, 2L, minValue, maxValue })
);
Iterator<StarTreeDocument> expectedStarTreeDocumentIterator = inorderStarTreeDocuments.iterator();

StarTreeDocument[] segmentStarTreeDocuments = new StarTreeDocument[noOfStarTreeDocuments];
for (int i = 0; i < noOfStarTreeDocuments; i++) {
Long metric1 = starTreeDocuments[i].metrics[0] != null
? NumericUtils.doubleToSortableLong((Double) starTreeDocuments[i].metrics[0])
: null;
Long metric2 = starTreeDocuments[i].metrics[1] != null
? NumericUtils.doubleToSortableLong((Double) starTreeDocuments[i].metrics[1])
: null;
Long metric3 = starTreeDocuments[i].metrics[2] != null
? NumericUtils.doubleToSortableLong((Double) starTreeDocuments[i].metrics[2])
: null;
Long metric4 = starTreeDocuments[i].metrics[3] != null
? NumericUtils.doubleToSortableLong((Double) starTreeDocuments[i].metrics[3])
: null;
Long metric5 = starTreeDocuments[i].metrics[4] != null
? NumericUtils.doubleToSortableLong((Double) starTreeDocuments[i].metrics[4])
: null;
segmentStarTreeDocuments[i] = new StarTreeDocument(
starTreeDocuments[i].dimensions,
new Object[] { metric1, metric2, metric3, metric4, metric5 }
Expand Down

0 comments on commit 41697e6

Please sign in to comment.