diff --git a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java index 628b7c041678..11edf396c9b2 100644 --- a/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java +++ b/processing/src/main/java/org/apache/druid/frame/allocation/AppendableMemory.java @@ -135,8 +135,13 @@ public boolean reserveAdditional(final int bytes) if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) { // Allocation needed. // Math.max(allocationSize, bytes) in case "bytes" is greater than SOFT_MAXIMUM_ALLOCATION_SIZE. + // However, cap the allocation request to the available bytes in the allocator, in case the requested bytes + // are less than what are available in the allocator, however the SOFT_MAXIMUM_ALLOCATION_SIZE is greater than the + // bytes available in the allocator. In such a case where bytes < available < SOFT_MAXIMUM_ALLOCATION_SIZE, we + // want to allocate all the available memory in the allocator, and in the other cases where available is the greatest + // of all, we want to allocate according to the max of bytes & SOFT_MAXIMUM_ALLOCATION_SIZE final Optional> newMemory = - allocator.allocate(Math.max(nextAllocationSize, bytes)); + allocator.allocate(Math.min(allocator.available(), Math.max(nextAllocationSize, bytes))); if (!newMemory.isPresent()) { return false; diff --git a/processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java b/processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java index 1961398b2054..ee3af073f882 100644 --- a/processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java +++ b/processing/src/main/java/org/apache/druid/frame/allocation/HeapMemoryAllocator.java @@ -19,6 +19,7 @@ package org.apache.druid.frame.allocation; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import org.apache.datasketches.memory.WritableMemory; import org.apache.druid.collections.ResourceHolder; @@ -37,7 +38,8 @@ public class HeapMemoryAllocator implements MemoryAllocator private long bytesAllocated = 0; - private HeapMemoryAllocator(final long capacity) + @VisibleForTesting + HeapMemoryAllocator(final long capacity) { this.capacity = capacity; } @@ -53,7 +55,7 @@ public static HeapMemoryAllocator unlimited() @Override public Optional> allocate(final long size) { - if (bytesAllocated < capacity - size) { + if (size <= capacity - bytesAllocated) { bytesAllocated += size; return Optional.of( diff --git a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java index 27da7cd51cec..3cb5c686e9d6 100644 --- a/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java +++ b/processing/src/main/java/org/apache/druid/frame/segment/FrameCursorUtils.java @@ -26,6 +26,7 @@ import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.filter.BoundDimFilter; import org.apache.druid.query.filter.Filter; import org.apache.druid.query.ordering.StringComparators; @@ -42,6 +43,25 @@ public class FrameCursorUtils { + + /** + * Exception to be thrown when the subquery's rows are too wide to fit in a single frame. In such case, byte based + * limiting should be disabled or the user should modify the query. + *

+ * NOTE: This error message is not appropriate when a similar exception is hit in MSQ, since this workaround + * is not applicable in that scenario + */ + public static final DruidException SUBQUERY_ROW_TOO_LARGE_EXCEPTION = + DruidException + .forPersona(DruidException.Persona.OPERATOR) + .ofCategory(DruidException.Category.CAPACITY_EXCEEDED) + .build( + "Subquery's row size exceeds the frame size and therefore cannot write the subquery's " + + "row to the frame. Either modify the subqueries to materialize smaller rows by removing wide columns, " + + "or disable byte based limiting by setting '%s' to 'disabled'", + QueryContexts.MAX_SUBQUERY_BYTES_KEY + ); + private FrameCursorUtils() { // No instantiation. @@ -79,60 +99,66 @@ public static Filter buildFilter(@Nullable Filter filter, Interval interval) /** * Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor, - * and writes the columns to the frames - * - * @param cursor Cursor to write to the frame - * @param frameWriterFactory Frame writer factory to write to the frame. - * Determines the signature of the rows that are written to the frames + * and writes the columns to the frames. The iterable is lazy, and it traverses the required portion of the cursor + * as required */ - public static Sequence cursorToFrames( - Cursor cursor, - FrameWriterFactory frameWriterFactory + public static Iterable cursorToFramesIterable( + final Cursor cursor, + final FrameWriterFactory frameWriterFactory ) { + return () -> new Iterator() + { + @Override + public boolean hasNext() + { + return !cursor.isDone(); + } - return Sequences.simple( - () -> new Iterator() - { - @Override - public boolean hasNext() - { - return !cursor.isDone(); - } - - @Override - public Frame next() - { - // Makes sure that cursor contains some elements prior. This ensures if no row is written, then the row size - // is larger than the MemoryAllocators returned by the provided factory - if (!hasNext()) { - throw new NoSuchElementException(); + @Override + public Frame next() + { + // Makes sure that cursor contains some elements prior. This ensures if no row is written, then the row size + // is larger than the MemoryAllocators returned by the provided factory + if (!hasNext()) { + throw new NoSuchElementException(); + } + boolean firstRowWritten = false; + Frame frame; + try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) { + while (!cursor.isDone()) { + if (!frameWriter.addSelection()) { + break; } - boolean firstRowWritten = false; - Frame frame; - try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) { - while (!cursor.isDone()) { - if (!frameWriter.addSelection()) { - break; - } - firstRowWritten = true; - cursor.advance(); - } - - if (!firstRowWritten) { - throw DruidException - .forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.CAPACITY_EXCEEDED) - .build("Subquery's row size exceeds the frame size and therefore cannot write the subquery's " - + "row to the frame. This is a non-configurable static limit that can only be modified by the " - + "developer."); - } + firstRowWritten = true; + cursor.advance(); + } - frame = Frame.wrap(frameWriter.toByteArray()); - } - return frame; + if (!firstRowWritten) { + throw SUBQUERY_ROW_TOO_LARGE_EXCEPTION; } + + frame = Frame.wrap(frameWriter.toByteArray()); } - ); + return frame; + } + }; + } + + /** + * Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor, + * and writes the columns to the frames + * + * @param cursor Cursor to write to the frame + * @param frameWriterFactory Frame writer factory to write to the frame. + * It also determines the signature of the rows that are written to the frames + */ + public static Sequence cursorToFramesSequence( + final Cursor cursor, + final FrameWriterFactory frameWriterFactory + ) + { + + return Sequences.simple(cursorToFramesIterable(cursor, frameWriterFactory)); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java index 96b8c7ec69f4..66ba25d6b347 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/GroupByQueryQueryToolChest.java @@ -736,7 +736,7 @@ public Optional> resultsAsFrames( Cursor cursor = cursorAndCloseable.lhs; Closeable closeble = cursorAndCloseable.rhs; - Sequence frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(closeble); + Sequence frames = FrameCursorUtils.cursorToFramesSequence(cursor, frameWriterFactory).withBaggage(closeble); return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature))); } diff --git a/processing/src/main/java/org/apache/druid/query/scan/ConcatCursor.java b/processing/src/main/java/org/apache/druid/query/scan/ConcatCursor.java deleted file mode 100644 index 12fa165c6db6..000000000000 --- a/processing/src/main/java/org/apache/druid/query/scan/ConcatCursor.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.scan; - -import com.google.common.math.IntMath; -import org.apache.druid.math.expr.ExpressionType; -import org.apache.druid.query.dimension.DimensionSpec; -import org.apache.druid.query.filter.DruidPredicateFactory; -import org.apache.druid.query.filter.ValueMatcher; -import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; -import org.apache.druid.segment.ColumnSelectorFactory; -import org.apache.druid.segment.ColumnValueSelector; -import org.apache.druid.segment.Cursor; -import org.apache.druid.segment.DimensionSelector; -import org.apache.druid.segment.IdLookup; -import org.apache.druid.segment.RowIdSupplier; -import org.apache.druid.segment.column.ColumnCapabilities; -import org.apache.druid.segment.data.IndexedInts; -import org.joda.time.DateTime; - -import javax.annotation.Nullable; -import java.util.List; - -/** - * Combines multiple cursors and iterates over them. It skips over the empty cursors - * The {@link DimensionSelector} and {@link ColumnValueSelector} it generates hold the reference to the original object - * because the cursor might be advanced independently after extracting out the {@link ColumnSelectorFactory} like in - * {@link org.apache.druid.frame.segment.FrameCursorUtils#cursorToFrames}. This ensures that the selectors always return - * the value pointed by the {@link #currentCursor}. - */ -public class ConcatCursor implements Cursor -{ - - private final List cursors; - private int currentCursor; - - public ConcatCursor( - List cursors - ) - { - this.cursors = cursors; - currentCursor = 0; - skipEmptyCursors(); - } - - @Override - public ColumnSelectorFactory getColumnSelectorFactory() - { - return new ColumnSelectorFactory() - { - @Override - public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) - { - return new DimensionSelector() - { - @Override - public IndexedInts getRow() - { - return cursors.get(currentCursor).getColumnSelectorFactory().makeDimensionSelector(dimensionSpec).getRow(); - } - - @Override - public ValueMatcher makeValueMatcher(@Nullable String value) - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeDimensionSelector(dimensionSpec) - .makeValueMatcher(value); - } - - @Override - public ValueMatcher makeValueMatcher(DruidPredicateFactory predicateFactory) - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeDimensionSelector(dimensionSpec) - .makeValueMatcher(predicateFactory); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeDimensionSelector(dimensionSpec) - .inspectRuntimeShape(inspector); - } - - @Nullable - @Override - public Object getObject() - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeDimensionSelector(dimensionSpec) - .getObject(); - } - - @Override - public Class classOfObject() - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeDimensionSelector(dimensionSpec) - .classOfObject(); - } - - @Override - public int getValueCardinality() - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeDimensionSelector(dimensionSpec) - .getValueCardinality(); - } - - @Nullable - @Override - public String lookupName(int id) - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeDimensionSelector(dimensionSpec) - .lookupName(id); - } - - @Override - public boolean nameLookupPossibleInAdvance() - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeDimensionSelector(dimensionSpec) - .nameLookupPossibleInAdvance(); - } - - @Nullable - @Override - public IdLookup idLookup() - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeDimensionSelector(dimensionSpec) - .idLookup(); - } - }; - } - - @Override - public ColumnValueSelector makeColumnValueSelector(String columnName) - { - return new ColumnValueSelector() - { - @Override - public double getDouble() - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeColumnValueSelector(columnName) - .getDouble(); - } - - @Override - public float getFloat() - { - return cursors.get(currentCursor).getColumnSelectorFactory().makeColumnValueSelector(columnName).getFloat(); - } - - @Override - public long getLong() - { - return cursors.get(currentCursor).getColumnSelectorFactory().makeColumnValueSelector(columnName).getLong(); - } - - @Override - public void inspectRuntimeShape(RuntimeShapeInspector inspector) - { - cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeColumnValueSelector(columnName) - .inspectRuntimeShape(inspector); - } - - @Override - public boolean isNull() - { - return cursors.get(currentCursor).getColumnSelectorFactory().makeColumnValueSelector(columnName).isNull(); - } - - @Nullable - @Override - public Object getObject() - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeColumnValueSelector(columnName) - .getObject(); - } - - @Override - public Class classOfObject() - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .makeColumnValueSelector(columnName) - .classOfObject(); - } - }; - } - - @Override - public ColumnCapabilities getColumnCapabilitiesWithDefault(String column, ColumnCapabilities defaultCapabilites) - { - return cursors.get(currentCursor) - .getColumnSelectorFactory() - .getColumnCapabilitiesWithDefault(column, defaultCapabilites); - } - - @Nullable - @Override - public ExpressionType getType(String name) - { - return cursors.get(currentCursor).getColumnSelectorFactory().getType(name); - } - - @Nullable - @Override - public ColumnCapabilities getColumnCapabilities(String column) - { - return cursors.get(currentCursor).getColumnSelectorFactory().getColumnCapabilities(column); - } - - @Nullable - @Override - public RowIdSupplier getRowIdSupplier() - { - return cursors.get(currentCursor).getColumnSelectorFactory().getRowIdSupplier(); - } - }; - } - - @Override - public DateTime getTime() - { - return cursors.get(currentCursor).getTime(); - } - - @Override - public void advance() - { - if (currentCursor < cursors.size()) { - cursors.get(currentCursor).advance(); - advanceCursor(); - } - } - - @Override - public void advanceUninterruptibly() - { - if (currentCursor < cursors.size()) { - cursors.get(currentCursor).advanceUninterruptibly(); - advanceCursor(); - } - } - - @Override - public boolean isDone() - { - return currentCursor == cursors.size(); - } - - @Override - public boolean isDoneOrInterrupted() - { - return isDone() || Thread.currentThread().isInterrupted(); - } - - @Override - public void reset() - { - while (currentCursor >= 0) { - if (currentCursor < cursors.size()) { - cursors.get(currentCursor).reset(); - } - currentCursor = IntMath.checkedSubtract(currentCursor, 1); - } - currentCursor = 0; - skipEmptyCursors(); - } - - /** - * This method should be called whenever the currentCursor gets updated. It skips over the empty cursors so that the - * current pointer is pointing to a valid cursor - */ - private void skipEmptyCursors() - { - while (currentCursor < cursors.size() && cursors.get(currentCursor).isDone()) { - currentCursor = IntMath.checkedAdd(currentCursor, 1); - } - } - - /** - * This method updates the current cursor. This is used to update the current cursor under question. - */ - private void advanceCursor() - { - if (cursors.get(currentCursor).isDone()) { - currentCursor = IntMath.checkedAdd(currentCursor, 1); - skipEmptyCursors(); - } - } -} diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java index 8b5f28570b2b..63722db74afb 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryQueryToolChest.java @@ -22,41 +22,24 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; import com.google.common.base.Functions; -import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; -import com.google.common.collect.PeekingIterator; import com.google.inject.Inject; -import org.apache.druid.frame.Frame; -import org.apache.druid.frame.FrameType; import org.apache.druid.frame.allocation.MemoryAllocatorFactory; -import org.apache.druid.frame.segment.FrameCursorUtils; -import org.apache.druid.frame.write.FrameWriterFactory; -import org.apache.druid.frame.write.FrameWriterUtils; -import org.apache.druid.frame.write.FrameWriters; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.UOE; import org.apache.druid.java.util.common.guava.BaseSequence; import org.apache.druid.java.util.common.guava.Sequence; import org.apache.druid.java.util.common.guava.Sequences; -import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.query.FrameSignaturePair; import org.apache.druid.query.GenericQueryMetricsFactory; -import org.apache.druid.query.IterableRowsCursorHelper; import org.apache.druid.query.Query; import org.apache.druid.query.QueryMetrics; import org.apache.druid.query.QueryRunner; import org.apache.druid.query.QueryToolChest; import org.apache.druid.query.aggregation.MetricManipulationFn; -import org.apache.druid.segment.Cursor; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.utils.CloseableUtils; -import java.io.Closeable; -import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; @@ -189,99 +172,17 @@ public Optional> resultsAsFrames( ) { final RowSignature defaultRowSignature = resultArraySignature(query); - ScanResultValueIterator resultSequenceIterator = new ScanResultValueIterator(resultSequence); - - Iterable> retVal = () -> new Iterator>() - { - PeekingIterator scanResultValuePeekingIterator = Iterators.peekingIterator(resultSequenceIterator); - - @Override - public boolean hasNext() - { - return scanResultValuePeekingIterator.hasNext(); - } - - @Override - public Sequence next() - { - final List batch = new ArrayList<>(); - final ScanResultValue scanResultValue = scanResultValuePeekingIterator.next(); - batch.add(scanResultValue); - // If the rowSignature is not provided, assume that the scanResultValue can contain any number of the columns - // that appear in the original scan query - final RowSignature rowSignature = scanResultValue.getRowSignature() != null - ? scanResultValue.getRowSignature() - : defaultRowSignature; - while (scanResultValuePeekingIterator.hasNext()) { - RowSignature nextRowSignature = scanResultValuePeekingIterator.peek().getRowSignature(); - if (nextRowSignature == null) { - nextRowSignature = defaultRowSignature; - } - if (nextRowSignature != null && nextRowSignature.equals(rowSignature)) { - batch.add(scanResultValuePeekingIterator.next()); - } else { - break; - } - } - return convertScanResultValuesToFrame( - batch, - rowSignature, - query, - memoryAllocatorFactory, - useNestedForUnknownTypes - ); - } - }; - return Optional.of(Sequences.concat(retVal).withBaggage(resultSequenceIterator)); - } - - private Sequence convertScanResultValuesToFrame( - List batch, - RowSignature rowSignature, - ScanQuery query, - MemoryAllocatorFactory memoryAllocatorFactory, - boolean useNestedForUnknownTypes - ) - { - Preconditions.checkNotNull(rowSignature, "'rowSignature' must be provided"); - - List cursors = new ArrayList<>(); - Closer closer = Closer.create(); - - for (ScanResultValue scanResultValue : batch) { - final List rows = (List) scanResultValue.getEvents(); - final Function mapper = getResultFormatMapper(query.getResultFormat(), rowSignature.getColumnNames()); - final Iterable formattedRows = Lists.newArrayList(Iterables.transform(rows, (Function) mapper)); - - Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable( - formattedRows, - rowSignature - ); - Cursor cursor = cursorAndCloseable.lhs; - Closeable closeable = cursorAndCloseable.rhs; - cursors.add(cursor); - // Cursors created from iterators don't have any resources, therefore this is mostly a defensive check - closer.register(closeable); - } - - RowSignature modifiedRowSignature = useNestedForUnknownTypes - ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) - : rowSignature; - FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory( - FrameType.COLUMNAR, - memoryAllocatorFactory, - modifiedRowSignature, - new ArrayList<>() - ); - - - Cursor concatCursor = new ConcatCursor(cursors); - Sequence frames = FrameCursorUtils.cursorToFrames( - concatCursor, - frameWriterFactory + return Optional.of( + Sequences.simple( + new ScanResultValueFramesIterable( + resultSequence, + memoryAllocatorFactory, + useNestedForUnknownTypes, + defaultRowSignature, + rowSignature -> getResultFormatMapper(query.getResultFormat(), rowSignature.getColumnNames()) + ) + ) ); - - return frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature)).withBaggage(closer); } @Override @@ -299,7 +200,7 @@ public Sequence resultsAsArrays(final ScanQuery query, final Sequence< ); } - private Function getResultFormatMapper(ScanQuery.ResultFormat resultFormat, List fields) + private static Function getResultFormatMapper(ScanQuery.ResultFormat resultFormat, List fields) { Function mapper; diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java new file mode 100644 index 000000000000..42f57628461b --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueFramesIterable.java @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.FrameType; +import org.apache.druid.frame.allocation.MemoryAllocatorFactory; +import org.apache.druid.frame.segment.FrameCursorUtils; +import org.apache.druid.frame.write.FrameWriter; +import org.apache.druid.frame.write.FrameWriterFactory; +import org.apache.druid.frame.write.FrameWriterUtils; +import org.apache.druid.frame.write.FrameWriters; +import org.apache.druid.java.util.common.Pair; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.FrameSignaturePair; +import org.apache.druid.query.IterableRowsCursorHelper; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.column.RowSignature; + +import java.io.Closeable; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** + * Returns a thread-unsafe iterable, that converts a sequence of {@link ScanResultValue} to an iterable of {@link FrameSignaturePair}. + * ScanResultValues can have heterogenous row signatures, and the returned sequence would have batched + * them into frames appropriately. + *

+ * The batching process greedily merges the values from the scan result values that have the same signature, while + * still maintaining the manageable frame sizes that is determined by the memory allocator by splitting the rows + * whenever necessary. + *

+ * It is necessary that we don't batch and store the ScanResultValues somewhere (like a List) while we do this processing + * to prevent the heap from exhausting, without limit. It has to be done online - as the scan result values get materialized, + * we produce frames. A few ScanResultValues might be stored however (if the frame got cut off in the middle) + *

+ * Assuming that we have a sequence of scan result values like: + *

+ * ScanResultValue1 - RowSignatureA - 3 rows + * ScanResultValue2 - RowSignatureB - 2 rows + * ScanResultValue3 - RowSignatureA - 1 rows + * ScanResultValue4 - RowSignatureA - 4 rows + * ScanResultValue5 - RowSignatureB - 3 rows + *

+ * Also, assume that each individual frame can hold two rows (in practice, it is determined by the row size and + * the memory block allocated by the memory allocator factory) + *

+ * The output would be a sequence like: + * Frame1 - RowSignatureA - rows 1-2 from ScanResultValue1 + * Frame2 - RowSignatureA - row 3 from ScanResultValue1 + * Frame3 - RowSignatureB - rows 1-2 from ScanResultValue2 + * Frame4 - RowSignatureA - row 1 from ScanResultValue3, row 1 from ScanResultValue4 + * Frame5 - RowSignatureA - row 2-3 from ScanResultValue4 + * Frame6 - RowSignatureA - row 4 from ScanResultValue4 + * Frame7 - RowSignatureB - row 1-2 from ScanResultValue5 + * Frame8 - RowSignatureB - row 3 from ScanResultValue6 + *

+ */ + +public class ScanResultValueFramesIterable implements Iterable +{ + + final Sequence resultSequence; + final MemoryAllocatorFactory memoryAllocatorFactory; + final boolean useNestedForUnknownTypes; + final RowSignature defaultRowSignature; + final Function> resultFormatMapper; + + public ScanResultValueFramesIterable( + Sequence resultSequence, + MemoryAllocatorFactory memoryAllocatorFactory, + boolean useNestedForUnknownTypes, + RowSignature defaultRowSignature, + Function> resultFormatMapper + ) + { + this.resultSequence = resultSequence; + this.memoryAllocatorFactory = memoryAllocatorFactory; + this.useNestedForUnknownTypes = useNestedForUnknownTypes; + this.defaultRowSignature = defaultRowSignature; + this.resultFormatMapper = resultFormatMapper; + } + + @Override + public Iterator iterator() + { + return new ScanResultValueFramesIterator( + resultSequence, + memoryAllocatorFactory, + useNestedForUnknownTypes, + defaultRowSignature, + resultFormatMapper + ); + } + + private static class ScanResultValueFramesIterator implements Iterator + { + + /** + * Memory allocator factory to use for frame writers + */ + final MemoryAllocatorFactory memoryAllocatorFactory; + + /** + * Replace unknown types in the row signature with {@code COMPLEX} + */ + final boolean useNestedForUnknownTypes; + + /** + * Default row signature to use if the scan result value doesn't cantain any row signature. This will usually be + * the row signature of the scan query containing only the column names (and no types) + */ + final RowSignature defaultRowSignature; + + /** + * Mapper to convert the scan result value to rows + */ + final Function> resultFormatMapper; + + /** + * Accumulating the closers for all the resources created so far + */ + final Closer closer = Closer.create(); + + /** + * Iterator from the scan result value's sequence, so that we can fetch the individual values. Closer registers the + * iterator so that we can clean up any resources held by the sequence's iterator, and prevent leaking + */ + final ScanResultValueIterator resultSequenceIterator; + + /** + * Either null, or points to the current non-empty cursor (and the cursor would point to the current row) + */ + Cursor currentCursor = null; + + /** + * Row signature of the current row + */ + RowSignature currentRowSignature = null; + + + public ScanResultValueFramesIterator( + Sequence resultSequence, + MemoryAllocatorFactory memoryAllocatorFactory, + boolean useNestedForUnknownTypes, + RowSignature defaultRowSignature, + Function> resultFormatMapper + ) + { + this.memoryAllocatorFactory = memoryAllocatorFactory; + this.useNestedForUnknownTypes = useNestedForUnknownTypes; + this.defaultRowSignature = defaultRowSignature; + this.resultFormatMapper = resultFormatMapper; + this.resultSequenceIterator = new ScanResultValueIterator(resultSequence); + + closer.register(resultSequenceIterator); + + // Makes sure that we run through all the empty scan result values at the beginning and are pointing to a valid + // row + populateCursor(); + } + + @Override + public boolean hasNext() + { + return !done(); + } + + @Override + public FrameSignaturePair next() + { + if (!hasNext()) { + throw new NoSuchElementException("No more frames to produce. Call `hasNext()` before calling `next()`"); + } + + // It would ensure that the cursor and the currentRowSignature is populated properly before we + // start all the processing + populateCursor(); + boolean firstRowWritten = false; + // While calling populateCursor() repeatedly, currentRowSignature might change. Therefore we store the signature + // with which we have written the frames + final RowSignature writtenSignature = currentRowSignature; + FrameWriterFactory frameWriterFactory = FrameWriters.makeFrameWriterFactory( + FrameType.COLUMNAR, + memoryAllocatorFactory, + currentRowSignature, + Collections.emptyList() + ); + Frame frame; + try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(new SettableCursorColumnSelectorFactory( + () -> currentCursor, + currentRowSignature + ))) { + while (populateCursor()) { // Do till we don't have any more rows, or the next row isn't compatible with the current row + if (!frameWriter.addSelection()) { // Add the cursor's row to the frame, till the frame is full + break; + } + firstRowWritten = true; + currentCursor.advance(); + } + + if (!firstRowWritten) { + throw FrameCursorUtils.SUBQUERY_ROW_TOO_LARGE_EXCEPTION; + } + frame = Frame.wrap(frameWriter.toByteArray()); + } + + return new FrameSignaturePair(frame, writtenSignature); + } + + /** + * Returns true if the iterator has no more frames to produce + */ + private boolean done() + { + return + (currentCursor == null || currentCursor.isDone()) + // If the current cursor is not done, then we can generate rows from the current cursor itself + && !(resultSequenceIterator.hasNext()); // If the cursor is done, but we still have more frames in the input + } + + /** + * This is the most important method of this iterator. This determines if two consecutive scan result values can + * be batched or not, populates the value of the {@link #currentCursor} and {@link #currentRowSignature}, + * during the course of the iterator, and facilitates the {@link #next()} + *

+ * Multiple calls to populateCursor, without advancing the {@link #currentCursor} is idempotent. This allows successive + * calls to this method in next(), done() and hasNext() methods without having any additional logic in the callers + *

+ * Preconditions: none (This method can be called safely any time) + *

+ * Postconditions - + * if (hasNext()) was false before calling the method - none + * if (hasNext()) was true before calling the method - + * 1. {@link #currentCursor} - Points to the cursor with non-empty value (i.e. isDone()) is false, and the cursor points + * to the next row present in the sequence of the scan result values. This row would get materialized to frame + * 2. {@link #currentRowSignature} - Row signature of the row. + *

+ * Return value - + * if (hasNext()) is false before calling the method - returns false + * if (hasNext()) is true before calling the method - returns true if previousRowSignature == currentRowSignature + */ + private boolean populateCursor() + { + if (currentCursor != null && !currentCursor.isDone()) { + return true; + } + + if (done()) { + return false; + } + + // At this point, we know that we need to move to the next non-empty cursor, AND it exists, because + // done() is not false + ScanResultValue scanResultValue = resultSequenceIterator.next(); + final RowSignature rowSignature = scanResultValue.getRowSignature() != null + ? scanResultValue.getRowSignature() + : defaultRowSignature; + RowSignature modifiedRowSignature = useNestedForUnknownTypes + ? FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature) + : rowSignature; + + // currentRowSignature at this time points to the previous row's signature + final boolean compatible = modifiedRowSignature != null + && modifiedRowSignature.equals(currentRowSignature); + + final List rows = (List) scanResultValue.getEvents(); + final Iterable formattedRows = Lists.newArrayList(Iterables.transform( + rows, + (Function) resultFormatMapper.apply(modifiedRowSignature) + )); + + Pair cursorAndCloseable = IterableRowsCursorHelper.getCursorFromIterable( + formattedRows, + modifiedRowSignature + ); + + currentCursor = cursorAndCloseable.lhs; + closer.register(cursorAndCloseable.rhs); + + // Donot update the previous rowSignature before ensuring that the cursor is not null + if (currentCursor.isDone()) { + return populateCursor(); + } + + currentRowSignature = modifiedRowSignature; + return compatible; + } + } +} diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueIterator.java b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueIterator.java index 646c69eaf185..eb8183b19030 100644 --- a/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueIterator.java +++ b/processing/src/main/java/org/apache/druid/query/scan/ScanResultValueIterator.java @@ -31,7 +31,7 @@ * underlying sequence. Similar to {@link Yielder}, once close is called on the iterator, the calls to the rest of the * iterator's methods are undefined. */ -public class ScanResultValueIterator implements CloseableIterator +public class ScanResultValueIterator implements CloseableIterator { Yielder yielder; @@ -53,7 +53,7 @@ public boolean hasNext() } @Override - public Object next() + public ScanResultValue next() { ScanResultValue scanResultValue = yielder.get(); yielder = yielder.next(null); diff --git a/processing/src/main/java/org/apache/druid/query/scan/SettableCursorColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/query/scan/SettableCursorColumnSelectorFactory.java new file mode 100644 index 000000000000..182d5e2c7462 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/query/scan/SettableCursorColumnSelectorFactory.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import org.apache.druid.query.dimension.DimensionSpec; +import org.apache.druid.query.filter.DruidPredicateFactory; +import org.apache.druid.query.filter.ValueMatcher; +import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.ColumnValueSelector; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.DimensionSelector; +import org.apache.druid.segment.IdLookup; +import org.apache.druid.segment.RowIdSupplier; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.data.IndexedInts; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; +import java.util.function.Supplier; + +/** + * A column selector factory, that represents the column values from multiple underlying cursors. It essentially + * wraps over the multiple cursors and can be passed to methods that expect column value selector from a single cursor. + * It is the duty of the caller to know when to switch from one cursor to another + * + * It is expected to work correctly if the individual cursors have the same corresponding columns, columnTypes, + * and column capabilities since the usual pattern throughout the code is to cache these values and + * assume they are fixed throughout. + */ +@NotThreadSafe +public class SettableCursorColumnSelectorFactory implements ColumnSelectorFactory +{ + /** + * Cursor supplier whose supplied value will determine what cursor and the values to expose to the caller + */ + private final Supplier cursorSupplier; + + /** + * Overarching row signature of the values returned by the cursor + */ + private final RowSignature rowSignature; + + public SettableCursorColumnSelectorFactory(final Supplier cursorSupplier, final RowSignature rowSignature) + { + this.cursorSupplier = cursorSupplier; + this.rowSignature = rowSignature; + } + + @Override + public DimensionSelector makeDimensionSelector(DimensionSpec dimensionSpec) + { + return new DimensionSelector() + { + @Override + public IndexedInts getRow() + { + return cursorSupplier.get().getColumnSelectorFactory().makeDimensionSelector(dimensionSpec).getRow(); + } + + @Override + public ValueMatcher makeValueMatcher(@Nullable String value) + { + return new ValueMatcher() + { + @Override + public boolean matches(boolean includeUnknown) + { + return cursorSupplier.get() + .getColumnSelectorFactory() + .makeDimensionSelector(dimensionSpec) + .makeValueMatcher(value) + .matches(includeUnknown); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + cursorSupplier.get() + .getColumnSelectorFactory() + .makeDimensionSelector(dimensionSpec) + .makeValueMatcher(value) + .inspectRuntimeShape(inspector); + } + }; + } + + @Override + public ValueMatcher makeValueMatcher(DruidPredicateFactory predicateFactory) + { + return new ValueMatcher() + { + @Override + public boolean matches(boolean includeUnknown) + { + return cursorSupplier.get() + .getColumnSelectorFactory() + .makeDimensionSelector(dimensionSpec) + .makeValueMatcher(predicateFactory) + .matches(includeUnknown); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + cursorSupplier.get() + .getColumnSelectorFactory() + .makeDimensionSelector(dimensionSpec) + .makeValueMatcher(predicateFactory) + .inspectRuntimeShape(inspector); + } + }; + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + cursorSupplier.get() + .getColumnSelectorFactory() + .makeDimensionSelector(dimensionSpec) + .inspectRuntimeShape(inspector); + + } + + @Nullable + @Override + public Object getObject() + { + return cursorSupplier.get() + .getColumnSelectorFactory() + .makeDimensionSelector(dimensionSpec) + .getObject(); + } + + @Override + public Class classOfObject() + { + return cursorSupplier.get() + .getColumnSelectorFactory() + .makeDimensionSelector(dimensionSpec) + .classOfObject(); + + } + + /** + * Cardinality of the concatenation of these selectors would be unknown + */ + @Override + public int getValueCardinality() + { + return CARDINALITY_UNKNOWN; + } + + @Nullable + @Override + public String lookupName(int id) + { + return cursorSupplier.get() + .getColumnSelectorFactory() + .makeDimensionSelector(dimensionSpec) + .lookupName(id); + } + + /** + * Name lookup is not possible in advance, because not all may support name lookups in the first place and they + * will have their own name lookups that will conflict with one another + */ + @Override + public boolean nameLookupPossibleInAdvance() + { + return false; + } + + @Nullable + @Override + public IdLookup idLookup() + { + return null; + } + }; + } + + /** + * Create {@link ColumnValueSelector} for the give column name + */ + @Override + public ColumnValueSelector makeColumnValueSelector(String columnName) + { + return new ColumnValueSelector() + { + @Override + public double getDouble() + { + return cursorSupplier + .get() + .getColumnSelectorFactory() + .makeColumnValueSelector(columnName) + .getDouble(); + } + + @Override + public float getFloat() + { + return cursorSupplier.get().getColumnSelectorFactory().makeColumnValueSelector(columnName).getFloat(); + } + + @Override + public long getLong() + { + return cursorSupplier.get().getColumnSelectorFactory().makeColumnValueSelector(columnName).getLong(); + } + + @Override + public void inspectRuntimeShape(RuntimeShapeInspector inspector) + { + cursorSupplier.get() + .getColumnSelectorFactory() + .makeColumnValueSelector(columnName) + .inspectRuntimeShape(inspector); + + } + + @Override + public boolean isNull() + { + return cursorSupplier.get().getColumnSelectorFactory().makeColumnValueSelector(columnName).isNull(); + } + + @Nullable + @Override + public Object getObject() + { + return cursorSupplier.get() + .getColumnSelectorFactory() + .makeColumnValueSelector(columnName) + .getObject(); + + } + + @Override + public Class classOfObject() + { + return cursorSupplier.get() + .getColumnSelectorFactory() + .makeColumnValueSelector(columnName) + .classOfObject(); + + } + }; + } + + /** + * Return column capabilities from the signature. This returns the capabilities with the minimal assumptions. + * For example, one cursor can have capabilities with multivalues set to FALSE, while other can have it set to TRUE + * Creating the capabilites from the signature would leave the state undefined. If the caller has more knowledge + * about all the cursors that will get created, the caller can subclass and override the function with that information + */ + @Nullable + @Override + public ColumnCapabilities getColumnCapabilities(String column) + { + // Assume only the type of the capabilities + return rowSignature.getColumnCapabilities(column); + } + + /** + * Since we don't know all the cursors beforehand, we can't reconcile their row id suppliers to provide + * one here + */ + @Nullable + @Override + public RowIdSupplier getRowIdSupplier() + { + return null; + } +} diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index cd8e553bf512..71d36bb9bbed 100644 --- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -493,7 +493,7 @@ public Optional> resultsAsFrames( new ArrayList<>() ); - Sequence frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(closeable); + Sequence frames = FrameCursorUtils.cursorToFramesSequence(cursor, frameWriterFactory).withBaggage(closeable); // All frames are generated with the same signature therefore we can attach the row signature return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature))); diff --git a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java index 87b50e0e4677..b850114f3bcf 100644 --- a/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java +++ b/processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java @@ -577,7 +577,7 @@ public Optional> resultsAsFrames( new ArrayList<>() ); - Sequence frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(closeable); + Sequence frames = FrameCursorUtils.cursorToFramesSequence(cursor, frameWriterFactory).withBaggage(closeable); return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature))); } diff --git a/processing/src/main/java/org/apache/druid/segment/ColumnInspector.java b/processing/src/main/java/org/apache/druid/segment/ColumnInspector.java index 7d91d50e851b..d8b4aaf083fc 100644 --- a/processing/src/main/java/org/apache/druid/segment/ColumnInspector.java +++ b/processing/src/main/java/org/apache/druid/segment/ColumnInspector.java @@ -37,15 +37,6 @@ public interface ColumnInspector extends Expr.InputBindingInspector @Nullable ColumnCapabilities getColumnCapabilities(String column); - default ColumnCapabilities getColumnCapabilitiesWithDefault(String column, ColumnCapabilities defaultCapabilites) - { - final ColumnCapabilities capabilities = getColumnCapabilities(column); - if (capabilities != null) { - return capabilities; - } - return defaultCapabilites; - } - @Nullable @Override default ExpressionType getType(String name) diff --git a/processing/src/test/java/org/apache/druid/frame/allocation/HeapMemoryAllocatorTest.java b/processing/src/test/java/org/apache/druid/frame/allocation/HeapMemoryAllocatorTest.java new file mode 100644 index 000000000000..ac69eaee8a5a --- /dev/null +++ b/processing/src/test/java/org/apache/druid/frame/allocation/HeapMemoryAllocatorTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.frame.allocation; + +import org.apache.datasketches.memory.WritableMemory; +import org.apache.druid.collections.ResourceHolder; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Optional; + +public class HeapMemoryAllocatorTest +{ + private static final int ALLOCATOR_SIZE = 10; + + @Test + public void testAllocationInSinglePass() + { + MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE); + Optional> memoryResourceHolderOptional = heapMemoryAllocator.allocate(ALLOCATOR_SIZE); + Assert.assertTrue(memoryResourceHolderOptional.isPresent()); + ResourceHolder memoryResourceHolder = memoryResourceHolderOptional.get(); + WritableMemory memory = memoryResourceHolder.get(); + for (int i = 0; i < ALLOCATOR_SIZE; ++i) { + memory.putByte(i, (byte) 0xFF); + } + } + + @Test + public void testAllocationInMultiplePasses() + { + MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE); + + Optional> memoryResourceHolderOptional1 = heapMemoryAllocator.allocate(ALLOCATOR_SIZE + - 4); + Assert.assertTrue(memoryResourceHolderOptional1.isPresent()); + ResourceHolder memoryResourceHolder1 = memoryResourceHolderOptional1.get(); + WritableMemory memory1 = memoryResourceHolder1.get(); + + Optional> memoryResourceHolderOptional2 = heapMemoryAllocator.allocate(4); + Assert.assertTrue(memoryResourceHolderOptional2.isPresent()); + ResourceHolder memoryResourceHolder2 = memoryResourceHolderOptional2.get(); + WritableMemory memory2 = memoryResourceHolder2.get(); + + for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) { + memory1.putByte(i, (byte) 0xFF); + } + for (int i = 0; i < 4; ++i) { + memory2.putByte(i, (byte) 0xFE); + } + // Readback to ensure that value hasn't been overwritten + for (int i = 0; i < ALLOCATOR_SIZE - 4; ++i) { + Assert.assertEquals((byte) 0xFF, memory1.getByte(i)); + } + for (int i = 0; i < 4; ++i) { + Assert.assertEquals((byte) 0xFE, memory2.getByte(i)); + } + } + + @Test + public void testOverallocationInSinglePass() + { + MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE); + Optional> memoryResourceHolderOptional = + heapMemoryAllocator.allocate(ALLOCATOR_SIZE + 1); + Assert.assertFalse(memoryResourceHolderOptional.isPresent()); + } + + @Test + public void testOverallocationInMultiplePasses() + { + MemoryAllocator heapMemoryAllocator = new HeapMemoryAllocator(ALLOCATOR_SIZE); + Optional> memoryResourceHolderOptional = + heapMemoryAllocator.allocate(ALLOCATOR_SIZE - 4); + Assert.assertTrue(memoryResourceHolderOptional.isPresent()); + Assert.assertFalse(heapMemoryAllocator.allocate(5).isPresent()); + } + +} diff --git a/processing/src/test/java/org/apache/druid/query/FrameBasedInlineDataSourceSerializerTest.java b/processing/src/test/java/org/apache/druid/query/FrameBasedInlineDataSourceSerializerTest.java index e01c9459fa12..7899a6aed84f 100644 --- a/processing/src/test/java/org/apache/druid/query/FrameBasedInlineDataSourceSerializerTest.java +++ b/processing/src/test/java/org/apache/druid/query/FrameBasedInlineDataSourceSerializerTest.java @@ -132,7 +132,7 @@ private FrameBasedInlineDataSource convertToFrameBasedInlineDataSource( ); Cursor cursor = cursorAndCloseable.lhs; RowSignature modifiedRowSignature = FrameWriterUtils.replaceUnknownTypesWithNestedColumns(rowSignature); - Sequence frames = FrameCursorUtils.cursorToFrames( + Sequence frames = FrameCursorUtils.cursorToFramesSequence( cursor, FrameWriters.makeFrameWriterFactory( FrameType.ROW_BASED, diff --git a/processing/src/test/java/org/apache/druid/query/scan/ConcatCursorTest.java b/processing/src/test/java/org/apache/druid/query/scan/ConcatCursorTest.java deleted file mode 100644 index afe7a95cfdcf..000000000000 --- a/processing/src/test/java/org/apache/druid/query/scan/ConcatCursorTest.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.query.scan; - -import com.google.common.collect.ImmutableList; -import org.apache.druid.segment.Cursor; -import org.apache.druid.segment.ListCursor; -import org.junit.Assert; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -public class ConcatCursorTest -{ - @Test - public void testConcatCursor() - { - Cursor dummyCursor1 = new ListCursor(new ArrayList<>()); - Cursor cursor1 = new ListCursor(ImmutableList.of("a", "b")); - Cursor dummyCursor2 = new ListCursor(new ArrayList<>()); - Cursor cursor2 = new ListCursor(ImmutableList.of("c", "d")); - Cursor dummyCursor3 = new ListCursor(new ArrayList<>()); - - Cursor concatCursor = new ConcatCursor(ImmutableList.of( - dummyCursor1, - cursor1, - dummyCursor2, - cursor2, - dummyCursor3 - )); - - List tempList = new ArrayList<>(); - // Initial iteration - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b", "c", "d"), tempList); - - // Check if reset() works after exhausting the cursor - concatCursor.reset(); - tempList.clear(); - for (int i = 0; i < 3; ++i) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b", "c"), tempList); - - // Check if reset() works from the middle - concatCursor.reset(); - tempList.clear(); - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b", "c", "d"), tempList); - } - - @Test - public void testConcatCursorOfEmptyCursors() - { - Cursor dummyCursor1 = new ListCursor(new ArrayList<>()); - Cursor dummyCursor2 = new ListCursor(new ArrayList<>()); - Cursor concatCursor = new ConcatCursor(ImmutableList.of( - dummyCursor1, - dummyCursor2 - )); - Assert.assertTrue(concatCursor.isDone()); - } - - @Test - public void testConcatCursorWhenBeginningCursorIsEmpty() - { - Cursor dummyCursor1 = new ListCursor(new ArrayList<>()); - Cursor cursor1 = new ListCursor(ImmutableList.of("a", "b")); - Cursor concatCursor = new ConcatCursor(ImmutableList.of( - dummyCursor1, - cursor1 - )); - - List tempList = new ArrayList<>(); - - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - - // Check if reset() works after exhausting the cursor - concatCursor.reset(); - tempList.clear(); - for (int i = 0; i < 1; ++i) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a"), tempList); - - // Check if reset() works from the middle - concatCursor.reset(); - tempList.clear(); - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - } - - @Test - public void testConcatCursorWhenEndingCursorIsEmpty() - { - Cursor dummyCursor1 = new ListCursor(new ArrayList<>()); - Cursor cursor1 = new ListCursor(ImmutableList.of("a", "b")); - Cursor concatCursor = new ConcatCursor(ImmutableList.of( - cursor1, - dummyCursor1 - )); - - List tempList = new ArrayList<>(); - - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - - // Check if reset() works after exhausting the cursor - concatCursor.reset(); - tempList.clear(); - for (int i = 0; i < 1; ++i) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a"), tempList); - - // Check if reset() works from the middle - concatCursor.reset(); - tempList.clear(); - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - } - - @Test - public void testConcatCursorWhenMultipleEmptyCursorsAtBeginning() - { - Cursor dummyCursor1 = new ListCursor(new ArrayList<>()); - Cursor dummyCursor2 = new ListCursor(new ArrayList<>()); - Cursor dummyCursor3 = new ListCursor(new ArrayList<>()); - Cursor cursor1 = new ListCursor(ImmutableList.of("a", "b")); - Cursor concatCursor = new ConcatCursor(ImmutableList.of( - dummyCursor1, - dummyCursor2, - dummyCursor3, - cursor1 - )); - - List tempList = new ArrayList<>(); - - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - - // Check if reset() works after exhausting the cursor - concatCursor.reset(); - tempList.clear(); - for (int i = 0; i < 1; ++i) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a"), tempList); - - // Check if reset() works from the middle - concatCursor.reset(); - tempList.clear(); - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - } - - @Test - public void testConcatCursorWhenMultipleEmptyCursorsAtEnd() - { - Cursor dummyCursor1 = new ListCursor(new ArrayList<>()); - Cursor dummyCursor2 = new ListCursor(new ArrayList<>()); - Cursor dummyCursor3 = new ListCursor(new ArrayList<>()); - Cursor cursor1 = new ListCursor(ImmutableList.of("a", "b")); - Cursor concatCursor = new ConcatCursor(ImmutableList.of( - cursor1, - dummyCursor1, - dummyCursor2, - dummyCursor3 - )); - - List tempList = new ArrayList<>(); - - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - - // Check if reset() works after exhausting the cursor - concatCursor.reset(); - tempList.clear(); - for (int i = 0; i < 1; ++i) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a"), tempList); - - // Check if reset() works from the middle - concatCursor.reset(); - tempList.clear(); - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - } - - @Test - public void testConcatCursorWhenMultipleEmptyCursorsAtTheMiddle() - { - Cursor dummyCursor1 = new ListCursor(new ArrayList<>()); - Cursor dummyCursor2 = new ListCursor(new ArrayList<>()); - Cursor dummyCursor3 = new ListCursor(new ArrayList<>()); - Cursor cursor1 = new ListCursor(ImmutableList.of("a")); - Cursor cursor2 = new ListCursor(ImmutableList.of("b")); - Cursor concatCursor = new ConcatCursor(ImmutableList.of( - cursor1, - dummyCursor1, - dummyCursor2, - dummyCursor3, - cursor2 - )); - - List tempList = new ArrayList<>(); - - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - - // Check if reset() works after exhausting the cursor - concatCursor.reset(); - tempList.clear(); - for (int i = 0; i < 1; ++i) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a"), tempList); - - // Check if reset() works from the middle - concatCursor.reset(); - tempList.clear(); - while (!concatCursor.isDone()) { - tempList.add(concatCursor.getColumnSelectorFactory().makeColumnValueSelector("ignored").getObject()); - concatCursor.advance(); - } - Assert.assertEquals(ImmutableList.of("a", "b"), tempList); - } -} diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java new file mode 100644 index 000000000000..bdd64c1c8bd8 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/query/scan/ScanResultValueFramesIterableTest.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.query.scan; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.druid.frame.allocation.ArenaMemoryAllocatorFactory; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.query.FrameBasedInlineDataSource; +import org.apache.druid.query.FrameSignaturePair; +import org.apache.druid.query.QueryToolChestTestHelper; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Test cases that ensure the correctness of {@link ScanResultValueFramesIterable} in presence of different signatures. + * There are some more in {@link ScanQueryQueryToolChestTest} that verify the workings of the iterable in bigger picture. + */ +public class ScanResultValueFramesIterableTest extends InitializedNullHandlingTest +{ + + private static final RowSignature SIGNATURE1 = RowSignature.builder() + .add("col1", ColumnType.LONG) + .add("col2", ColumnType.DOUBLE) + .build(); + + private static final RowSignature SIGNATURE2 = RowSignature.builder() + .add("col1", ColumnType.DOUBLE) + .add("col2", ColumnType.LONG) + .build(); + + + @Test + public void testEmptySequence() + { + ScanResultValueFramesIterable iterable = createIterable(); + List frames = Lists.newArrayList(iterable); + Assert.assertEquals(0, frames.size()); + } + + @Test + public void testAllEmptyScanResultValuesInSequence() + { + + List frames1 = Lists.newArrayList( + createIterable( + scanResultValue1(0) + ) + ); + Assert.assertEquals(0, frames1.size()); + + List frames2 = Lists.newArrayList( + createIterable( + scanResultValue1(0), + scanResultValue2(0), + scanResultValue1(0) + ) + ); + Assert.assertEquals(0, frames2.size()); + } + + @Test + public void testBatchingWithHomogenousScanResultValues() + { + List frames = Lists.newArrayList( + createIterable( + scanResultValue1(2), + scanResultValue1(2) + ) + ); + Assert.assertEquals(1, frames.size()); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{1L, 1.0D}, + new Object[]{2L, 2.0D}, + new Object[]{1L, 1.0D}, + new Object[]{2L, 2.0D} + ), + new FrameBasedInlineDataSource(frames, SIGNATURE1).getRowsAsSequence() + ); + } + + @Test + public void testBatchingWithHomogenousAndEmptyScanResultValues() + { + final List[] framesList = new List[3]; + framesList[0] = Lists.newArrayList( + createIterable( + scanResultValue1(0), + scanResultValue1(0), + scanResultValue1(2), + scanResultValue1(0), + scanResultValue1(0), + scanResultValue1(0), + scanResultValue1(2), + scanResultValue1(0) + ) + ); + + framesList[1] = Lists.newArrayList( + createIterable( + scanResultValue2(0), + scanResultValue2(0), + scanResultValue1(2), + scanResultValue2(0), + scanResultValue2(0), + scanResultValue2(0), + scanResultValue1(2), + scanResultValue2(0) + ) + ); + + framesList[2] = Lists.newArrayList( + createIterable( + scanResultValue1(0), + scanResultValue2(0), + scanResultValue1(2), + scanResultValue2(0), + scanResultValue2(0), + scanResultValue1(0), + scanResultValue1(2), + scanResultValue1(0) + ) + ); + + for (List frames : framesList) { + Assert.assertEquals(1, frames.size()); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{1L, 1.0D}, + new Object[]{2L, 2.0D}, + new Object[]{1L, 1.0D}, + new Object[]{2L, 2.0D} + ), + new FrameBasedInlineDataSource(frames, SIGNATURE1).getRowsAsSequence() + ); + + } + } + + + @Test + public void testBatchingWithHeterogenousScanResultValues() + { + List frames = Lists.newArrayList( + createIterable( + scanResultValue1(2), + scanResultValue2(2) + ) + ); + Assert.assertEquals(2, frames.size()); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{1L, 1.0D}, + new Object[]{2L, 2.0D} + ), + new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + ); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{3.0D, 3L}, + new Object[]{4.0D, 4L} + ), + new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + ); + } + + @Test + public void testBatchingWithHeterogenousAndEmptyScanResultValues() + { + List frames = Lists.newArrayList( + createIterable( + scanResultValue1(0), + scanResultValue2(0), + scanResultValue1(2), + scanResultValue1(0), + scanResultValue2(2), + scanResultValue2(0), + scanResultValue2(0) + ) + ); + Assert.assertEquals(2, frames.size()); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{1L, 1.0D}, + new Object[]{2L, 2.0D} + ), + new FrameBasedInlineDataSource(Collections.singletonList(frames.get(0)), SIGNATURE1).getRowsAsSequence() + ); + QueryToolChestTestHelper.assertArrayResultsEquals( + ImmutableList.of( + new Object[]{3.0D, 3L}, + new Object[]{4.0D, 4L} + ), + new FrameBasedInlineDataSource(Collections.singletonList(frames.get(1)), SIGNATURE2).getRowsAsSequence() + ); + } + + @Test + public void testSplitting() + { + List frames = Lists.newArrayList( + createIterable( + Collections.nCopies(100, scanResultValue1(2)).toArray(new ScanResultValue[0]) + ) + ); + Assert.assertEquals(5, frames.size()); + } + + private static ScanResultValueFramesIterable createIterable( + ScanResultValue... scanResultValues + ) + { + return new ScanResultValueFramesIterable( + Sequences.simple(Arrays.asList(scanResultValues)), + new ArenaMemoryAllocatorFactory(1000), + false, + null, + rowSignature -> (Object[] rows) -> rows + ); + } + + // Signature: col1: LONG, col2: DOUBLE + private static ScanResultValue scanResultValue1(int numRows) + { + return new ScanResultValue( + "dummy", + ImmutableList.of("col1", "col2"), + IntStream.range(1, 1 + numRows).mapToObj(i -> new Object[]{i, (double) i}).collect(Collectors.toList()), + SIGNATURE1 + ); + } + + // Signature: col1: DOUBLE, col2: LONG + private static ScanResultValue scanResultValue2(int numRows) + { + return new ScanResultValue( + "dummy", + ImmutableList.of("col1", "col2"), + IntStream.range(3, 3 + numRows).mapToObj(i -> new Object[]{(double) i, i}).collect(Collectors.toList()), + SIGNATURE2 + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/segment/join/table/FrameBasedIndexedTableTest.java b/processing/src/test/java/org/apache/druid/segment/join/table/FrameBasedIndexedTableTest.java index 6a244e9bae97..8b093184d574 100644 --- a/processing/src/test/java/org/apache/druid/segment/join/table/FrameBasedIndexedTableTest.java +++ b/processing/src/test/java/org/apache/druid/segment/join/table/FrameBasedIndexedTableTest.java @@ -223,7 +223,7 @@ public void setup() ROW_SIGNATURE, new ArrayList<>() ); - Frame frame = Iterables.getOnlyElement(FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).toList()); + Frame frame = Iterables.getOnlyElement(FrameCursorUtils.cursorToFramesSequence(cursor, frameWriterFactory).toList()); dataSource = new FrameBasedInlineDataSource( ImmutableList.of(new FrameSignaturePair(frame, ROW_SIGNATURE)), diff --git a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java index 80c6d0a86af0..c8525771ceb5 100644 --- a/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/org/apache/druid/server/ClientQuerySegmentWalker.java @@ -45,6 +45,7 @@ import org.apache.druid.query.InlineDataSource; import org.apache.druid.query.PostProcessingOperator; import org.apache.druid.query.Query; +import org.apache.druid.query.QueryContexts; import org.apache.druid.query.QueryDataSource; import org.apache.druid.query.QueryPlus; import org.apache.druid.query.QueryRunner; @@ -669,10 +670,7 @@ private static > DataSource toInlineDataSource( case ROW_LIMIT: if (limitAccumulator.get() >= rowLimitToUse) { subqueryStatsProvider.incrementQueriesExceedingRowLimit(); - throw ResourceLimitExceededException.withMessage( - "Cannot issue the query, subqueries generated results beyond maximum[%d] rows", - rowLimitToUse - ); + throw ResourceLimitExceededException.withMessage(rowLimitExceededMessage(rowLimitToUse)); } subqueryStatsProvider.incrementSubqueriesWithRowLimit(); dataSource = materializeResultsAsArray( @@ -687,10 +685,7 @@ private static > DataSource toInlineDataSource( case MEMORY_LIMIT: if (memoryLimitAccumulator.get() >= memoryLimit) { subqueryStatsProvider.incrementQueriesExceedingByteLimit(); - throw ResourceLimitExceededException.withMessage( - "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes", - memoryLimit - ); + throw ResourceLimitExceededException.withMessage(byteLimitExceededMessage(memoryLimit)); } Optional maybeDataSource = materializeResultsAsFrames( query, @@ -707,10 +702,7 @@ private static > DataSource toInlineDataSource( // Check if the previous row limit accumulator has exceeded the memory results if (limitAccumulator.get() >= rowLimitToUse) { subqueryStatsProvider.incrementQueriesExceedingRowLimit(); - throw ResourceLimitExceededException.withMessage( - "Cannot issue the query, subqueries generated results beyond maximum[%d] rows", - rowLimitToUse - ); + throw ResourceLimitExceededException.withMessage(rowLimitExceededMessage(rowLimitToUse)); } subqueryStatsProvider.incrementSubqueriesWithRowLimit(); subqueryStatsProvider.incrementSubqueriesFallingBackToRowLimit(); @@ -769,10 +761,7 @@ private static > Optional materializeR limitAccumulator.addAndGet(frame.getFrame().numRows()); if (memoryLimitAccumulator.addAndGet(frame.getFrame().numBytes()) >= memoryLimit) { subqueryStatsProvider.incrementQueriesExceedingByteLimit(); - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] bytes", - memoryLimit - ); + throw ResourceLimitExceededException.withMessage(byteLimitExceededMessage(memoryLimit)); } frameSignaturePairs.add(frame); @@ -819,10 +808,7 @@ private static > DataSource materializeResultsAsAr (acc, in) -> { if (limitAccumulator.getAndIncrement() >= rowLimitToUse) { subqueryStatsProvider.incrementQueriesExceedingRowLimit(); - throw ResourceLimitExceededException.withMessage( - "Subquery generated results beyond maximum[%d] rows", - rowLimitToUse - ); + throw ResourceLimitExceededException.withMessage(rowLimitExceededMessage(rowLimitToUse)); } acc.add(in); return acc; @@ -831,6 +817,32 @@ private static > DataSource materializeResultsAsAr return InlineDataSource.fromIterable(resultList, signature); } + private static String byteLimitExceededMessage(final long memoryLimit) + { + return org.apache.druid.java.util.common.StringUtils.format( + "Cannot issue the query, subqueries generated results beyond maximum[%d] bytes. Increase the " + + "JVM's memory or set the '%s' in the query context to increase the space allocated for subqueries to " + + "materialize their results. Manually alter the value carefully as it can cause the broker to go out of memory.", + memoryLimit, + QueryContexts.MAX_SUBQUERY_BYTES_KEY + ); + } + + private static String rowLimitExceededMessage(final int rowLimitUsed) + { + return org.apache.druid.java.util.common.StringUtils.format( + "Cannot issue the query, subqueries generated results beyond maximum[%d] rows. Try setting the " + + "'%s' in the query context to '%s' for enabling byte based limit, which chooses an optimal limit based on " + + "memory size and result's heap usage or manually configure the values of either '%s' or '%s' in the query " + + "context. Manually alter the value carefully as it can cause the broker to go out of memory.", + rowLimitUsed, + QueryContexts.MAX_SUBQUERY_BYTES_KEY, + SubqueryGuardrailHelper.AUTO_LIMIT_VALUE, + QueryContexts.MAX_SUBQUERY_BYTES_KEY, + QueryContexts.MAX_SUBQUERY_ROWS_KEY + ); + } + /** * A {@link QueryRunner} which validates that a *specific* query is passed in, and then swaps it with another one. * Useful since the inlining we do relies on passing the modified query to the underlying {@link QuerySegmentWalker}, diff --git a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java index 563f4db0d5ec..ae808e2eb20d 100644 --- a/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java +++ b/server/src/test/java/org/apache/druid/server/ClientQuerySegmentWalkerTest.java @@ -800,7 +800,13 @@ public void testTimeseriesOnGroupByOnTableErrorTooManyRows() .withId(DUMMY_QUERY_ID); expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Subquery generated results beyond maximum[2] rows"); + expectedException.expectMessage( + "Cannot issue the query, subqueries generated results beyond maximum[2] rows. Try setting the " + + "'maxSubqueryBytes' in the query context to 'auto' for enabling byte based limit, which chooses an optimal " + + "limit based on memory size and result's heap usage or manually configure the values of either 'maxSubqueryBytes' " + + "or 'maxSubqueryRows' in the query context. Manually alter the value carefully as it can cause the broker to go out " + + "of memory." + ); testQuery(query, ImmutableList.of(), ImmutableList.of()); } @@ -895,7 +901,12 @@ public void testTimeseriesOnGroupByOnTableErrorTooLarge() .withId(DUMMY_QUERY_ID); expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Subquery generated results beyond maximum[1] bytes"); + expectedException.expectMessage( + "Cannot issue the query, subqueries generated results beyond maximum[1] bytes. Increase the " + + "JVM's memory or set the 'maxSubqueryBytes' in the query context to increase the space " + + "allocated for subqueries to materialize their results. Manually alter the value carefully as it can cause " + + "the broker to go out of memory." + ); testQuery(query, ImmutableList.of(), ImmutableList.of()); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java index ea8089e4c917..91316512e35f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteSubqueryTest.java @@ -676,7 +676,13 @@ public void testMaxSubqueryRows() { if ("without memory limit".equals(testName)) { expectedException.expect(ResourceLimitExceededException.class); - expectedException.expectMessage("Subquery generated results beyond maximum[1]"); + expectedException.expectMessage( + "Cannot issue the query, subqueries generated results beyond maximum[1] rows. Try setting the " + + "'maxSubqueryBytes' in the query context to 'auto' for enabling byte based limit, which chooses an optimal " + + "limit based on memory size and result's heap usage or manually configure the values of either 'maxSubqueryBytes' " + + "or 'maxSubqueryRows' in the query context. Manually alter the value carefully as it can cause the broker to " + + "go out of memory." + ); Map modifiedQueryContext = new HashMap<>(queryContext); modifiedQueryContext.put(QueryContexts.MAX_SUBQUERY_ROWS_KEY, 1);