Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug while materializing scan's result to frames #15987

Merged
merged 6 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,13 @@ public boolean reserveAdditional(final int bytes)
if (idx < 0 || bytes + limits.getInt(idx) > blockHolders.get(idx).get().getCapacity()) {
// Allocation needed.
// Math.max(allocationSize, bytes) in case "bytes" is greater than SOFT_MAXIMUM_ALLOCATION_SIZE.
// However, cap the allocation request to the available bytes in the allocator, in case the requested bytes
// are less than what are available in the allocator, however the SOFT_MAXIMUM_ALLOCATION_SIZE is greater than the
// bytes available in the allocator. In such a case where bytes < available < SOFT_MAXIMUM_ALLOCATION_SIZE, we
// want to allocate all the available memory in the allocator, and in the other cases where available is the greatest
// of all, we want to allocate according to the max of bytes & SOFT_MAXIMUM_ALLOCATION_SIZE
final Optional<ResourceHolder<WritableMemory>> newMemory =
allocator.allocate(Math.max(nextAllocationSize, bytes));
allocator.allocate(Math.min(allocator.available(), Math.max(nextAllocationSize, bytes)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Could you please explain this statement.
    If you look at line 129, we are 100% sure that (bytes<=alllocator.avalable()) so why would be allocate a new chunk which is exactly same as allocate.available().
    We should never go more than the bytes required rite?
    NextAllocationSize is more of minCheck.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll explain with an example here, consider the following scenario:

  • nextAllocationSize will be in multiples of 2, let's say for a particular run of the dependable memory, it is 1024.
  • Someone wants to allocate 600 bytes, i.e. bytes = 600
  • Remaining memory in the allocator is 700, therefore allocator.available() = 700

Since someone wants to allocate 600 bytes, and the available bytes in the allocator are 700, the allocation should succeed, and we should be able to give them 600 bytes (This is what Line 129 also checks)

However, if you look at the original code, it will do
allocator.allocate(Math.max(1024, 600) i.e. allocator.allocate(1024) which would fail.

The new code will do
allocator.allocate(Math.min(700, Math.max(1024, 600)), i.e. allocator.allocate(700), which should pass (which is the required behavior).

nextAllocationSize is more like a "2x buffer allocator limit" - every time we hit the limit of allocation, we multiply the next block to allocate by 2. So we do allocations like 1, 2, 4, 8, 16, ...., to minimize the number of allocations we need to do (and amortize the cost, as per my understanding). I think a similar principle is applied when dictionaries do dynamic hashing.

Therefore even though the caller requests bytes, we give the caller a larger block, so that the next time the caller requests bytes, we don't reallocate again. However, this fails to take into account the edge case, that even though the caller requests x bytes, and the allocator can satisfy that condition, but not the condition for nextAllocationSize, we fail, even though we should pass. Hence a cap of the allocation, and the available memory. In normal cases, allocator.available >>>> Math.max(nextAllocationSize, bytes), therefore most of the time the code should be doing what it's supposed to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment in the code to clarify.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. SGTM


if (!newMemory.isPresent()) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.frame.allocation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import org.apache.datasketches.memory.WritableMemory;
import org.apache.druid.collections.ResourceHolder;
Expand All @@ -37,7 +38,8 @@ public class HeapMemoryAllocator implements MemoryAllocator

private long bytesAllocated = 0;

private HeapMemoryAllocator(final long capacity)
@VisibleForTesting
HeapMemoryAllocator(final long capacity)
{
this.capacity = capacity;
}
Expand All @@ -53,7 +55,7 @@ public static HeapMemoryAllocator unlimited()
@Override
public Optional<ResourceHolder<WritableMemory>> allocate(final long size)
{
if (bytesAllocated < capacity - size) {
if (size <= capacity - bytesAllocated) {
LakshSingla marked this conversation as resolved.
Show resolved Hide resolved
bytesAllocated += size;

return Optional.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.filter.BoundDimFilter;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.query.ordering.StringComparators;
Expand All @@ -42,6 +43,25 @@

public class FrameCursorUtils
{

/**
* Exception to be thrown when the subquery's rows are too wide to fit in a single frame. In such case, byte based
* limiting should be disabled or the user should modify the query.
* <p>
* NOTE: This error message is not appropriate when a similar exception is hit in MSQ, since this workaround
* is not applicable in that scenario
*/
public static final DruidException SUBQUERY_ROW_TOO_LARGE_EXCEPTION =
DruidException
.forPersona(DruidException.Persona.OPERATOR)
.ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
.build(
"Subquery's row size exceeds the frame size and therefore cannot write the subquery's "
+ "row to the frame. Either modify the subqueries to materialize smaller rows by removing wide columns, "
+ "or disable byte based limiting by setting '%s' to 'disabled'",
QueryContexts.MAX_SUBQUERY_BYTES_KEY
);

private FrameCursorUtils()
{
// No instantiation.
Expand Down Expand Up @@ -79,60 +99,66 @@ public static Filter buildFilter(@Nullable Filter filter, Interval interval)

/**
* Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor,
* and writes the columns to the frames
*
* @param cursor Cursor to write to the frame
* @param frameWriterFactory Frame writer factory to write to the frame.
* Determines the signature of the rows that are written to the frames
* and writes the columns to the frames. The iterable is lazy, and it traverses the required portion of the cursor
* as required
*/
public static Sequence<Frame> cursorToFrames(
Cursor cursor,
FrameWriterFactory frameWriterFactory
public static Iterable<Frame> cursorToFramesIterable(
final Cursor cursor,
final FrameWriterFactory frameWriterFactory
)
{
return () -> new Iterator<Frame>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}

return Sequences.simple(
() -> new Iterator<Frame>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}

@Override
public Frame next()
{
// Makes sure that cursor contains some elements prior. This ensures if no row is written, then the row size
// is larger than the MemoryAllocators returned by the provided factory
if (!hasNext()) {
throw new NoSuchElementException();
@Override
public Frame next()
{
// Makes sure that cursor contains some elements prior. This ensures if no row is written, then the row size
// is larger than the MemoryAllocators returned by the provided factory
if (!hasNext()) {
throw new NoSuchElementException();
}
boolean firstRowWritten = false;
Frame frame;
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
while (!cursor.isDone()) {
if (!frameWriter.addSelection()) {
break;
}
boolean firstRowWritten = false;
Frame frame;
try (final FrameWriter frameWriter = frameWriterFactory.newFrameWriter(cursor.getColumnSelectorFactory())) {
while (!cursor.isDone()) {
if (!frameWriter.addSelection()) {
break;
}
firstRowWritten = true;
cursor.advance();
}

if (!firstRowWritten) {
throw DruidException
.forPersona(DruidException.Persona.DEVELOPER)
.ofCategory(DruidException.Category.CAPACITY_EXCEEDED)
.build("Subquery's row size exceeds the frame size and therefore cannot write the subquery's "
+ "row to the frame. This is a non-configurable static limit that can only be modified by the "
+ "developer.");
}
firstRowWritten = true;
cursor.advance();
}

frame = Frame.wrap(frameWriter.toByteArray());
}
return frame;
if (!firstRowWritten) {
throw SUBQUERY_ROW_TOO_LARGE_EXCEPTION;
}

frame = Frame.wrap(frameWriter.toByteArray());
}
);
return frame;
}
};
}

/**
* Writes a {@link Cursor} to a sequence of {@link Frame}. This method iterates over the rows of the cursor,
* and writes the columns to the frames
*
* @param cursor Cursor to write to the frame
* @param frameWriterFactory Frame writer factory to write to the frame.
* It also determines the signature of the rows that are written to the frames
*/
public static Sequence<Frame> cursorToFramesSequence(
final Cursor cursor,
final FrameWriterFactory frameWriterFactory
)
{

return Sequences.simple(cursorToFramesIterable(cursor, frameWriterFactory));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ public Optional<Sequence<FrameSignaturePair>> resultsAsFrames(
Cursor cursor = cursorAndCloseable.lhs;
Closeable closeble = cursorAndCloseable.rhs;

Sequence<Frame> frames = FrameCursorUtils.cursorToFrames(cursor, frameWriterFactory).withBaggage(closeble);
Sequence<Frame> frames = FrameCursorUtils.cursorToFramesSequence(cursor, frameWriterFactory).withBaggage(closeble);

return Optional.of(frames.map(frame -> new FrameSignaturePair(frame, modifiedRowSignature)));
}
Expand Down
Loading
Loading