Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add pickup morsel strategy to control memory usage #21037

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,8 @@ CONF_Int32(io_coalesce_read_max_buffer_size, "8388608");
CONF_Int32(io_coalesce_read_max_distance_size, "1048576");
CONF_Int32(io_tasks_per_scan_operator, "4");
CONF_Int32(connector_io_tasks_per_scan_operator, "16");
CONF_Int32(io_sleep_ms, "0");
CONF_Int32(connector_io_tasks_min_size, "4");

// Enable output trace logs in aws-sdk-cpp for diagnosis purpose.
// Once logging is enabled in your application, the SDK will generate log files in your current working directory
Expand Down
7 changes: 7 additions & 0 deletions be/src/exec/connector_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,13 @@ pipeline::OpFactories ConnectorScanNode::decompose_to_pipeline(pipeline::Pipelin

// port from olap scan node. to control chunk buffer usage, we can control memory consumption to avoid OOM.
size_t max_buffer_capacity = pipeline::ScanOperator::max_buffer_capacity() * dop;

if (_limit != -1) {
size_t max_chunks = std::max<size_t>(_limit / (runtime_state()->chunk_size()), 1);
max_buffer_capacity = std::min(max_buffer_capacity, max_chunks);
}
VLOG_FILE << "[ZZZ] limit = " << _limit << ", max buffer capacity = " << max_buffer_capacity;

size_t default_buffer_capacity = std::min<size_t>(max_buffer_capacity, _estimated_max_concurrent_chunks());
pipeline::ChunkBufferLimiterPtr buffer_limiter = std::make_unique<pipeline::DynamicChunkBufferLimiter>(
max_buffer_capacity, default_buffer_capacity, mem_limit, runtime_state()->chunk_size());
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/pipeline_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,7 @@ void PipelineDriver::_update_statistics(size_t total_chunks_moved, size_t total_
if (ScanOperator* scan = source_scan_operator()) {
query_ctx()->incr_cur_scan_rows_num(scan->get_last_scan_rows_num());
query_ctx()->incr_cur_scan_bytes(scan->get_last_scan_bytes());
scan->finish_driver_process();
}

// Update cpu cost of this query
Expand Down
91 changes: 91 additions & 0 deletions be/src/exec/pipeline/scan/connector_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ ConnectorScanOperator::ConnectorScanOperator(OperatorFactory* factory, int32_t i
: ScanOperator(factory, id, driver_sequence, dop, scan_node) {}

Status ConnectorScanOperator::do_prepare(RuntimeState* state) {
const TQueryOptions& options = state->query_options();
bool shared_scan = _scan_node->is_shared_scan_enabled();
_unique_metrics->add_info_string("SharedScan", shared_scan ? "True" : "False");
if (options.__isset.enable_connector_adaptive_io_tasks) {
_enable_adaptive_io_tasks = options.enable_connector_adaptive_io_tasks;
}
return Status::OK();
}

Expand Down Expand Up @@ -152,6 +156,93 @@ connector::ConnectorType ConnectorScanOperator::connector_type() {
return scan_node->connector_type();
}

bool ConnectorScanOperator::is_running_all_io_tasks() const {
PickupMorselState& state = _pickup_morsel_state;
// VLOG_FILE << "[ZZZ] running all. seq = " << _driver_sequence << ", io = " << _num_running_io_tasks
// << ", exp = " << state.max_io_tasks;
bool ret = (state.max_io_tasks != 0) && (_num_running_io_tasks >= state.max_io_tasks);
return ret;
// return ScanOperator::is_running_all_io_tasks();
}

void ConnectorScanOperator::finish_driver_process() {
PickupMorselState& state = _pickup_morsel_state;
state.adjusted_io_tasks = false;
state.early_cut = 0;
}

bool ConnectorScanOperator::has_output() const {
bool ret = ScanOperator::has_output();
// if (!ret) {
// PickupMorselState& state = _pickup_morsel_state;
// state.adjusted_io_tasks = false;
// }
if (ret) {
_unpluging = true;
}
return ret;
}

int ConnectorScanOperator::update_pickup_morsel_state() {
PickupMorselState& state = _pickup_morsel_state;
int& io_tasks = state.max_io_tasks;
if (!_enable_adaptive_io_tasks) {
io_tasks = _io_tasks_per_scan_operator;
return _io_tasks_per_scan_operator;
}

size_t chunks = num_buffered_chunks();
size_t thres = _buffer_unplug_threshold();
const int MIN_IO_TASKS = config::connector_io_tasks_min_size;
// if (((state.flip_counter++) & 0x31) != 0) return io_tasks;
if (state.adjusted_io_tasks) return io_tasks;
state.adjusted_io_tasks = true;

auto f = [&]() {
size_t C = 0;
size_t P = 0;
for (int i = 0; i < state.N; i++) {
C += state.history_chunks[i];
P += state.history_P[i];
}
double avgc = C * 1.0 / state.N;
double avgp = P * 1.0 / state.N;
state.history_chunks[state.history_index] = chunks;
if (chunks <= avgc) {
avgp += 1;
} else {
avgp -= 1;
}
int value = int(avgp + 0.5);
value = std::max<int>(value, MIN_IO_TASKS);
value = std::min<int>(value, _io_tasks_per_scan_operator);
state.history_P[state.history_index] = value;
state.history_index = (state.history_index + 1) % state.N;

VLOG_FILE << "[XX] id = " << _driver_sequence << ", chunks = " << chunks << ", c=" << avgc << ", p=" << avgp
<< ", P=" << value;

// if (chunks >= (2 * thres)) {
// io_tasks = std::max(MIN_IO_TASKS, io_tasks - 1);
// VLOG_FILE << "[XXX] queue FULL. id = " << _driver_sequence << ", update to " << io_tasks;
// } else if (chunks <= thres) {
// io_tasks = std::min(io_tasks + MIN_IO_TASKS, _io_tasks_per_scan_operator);
// VLOG_FILE << "[XXX] queue not full. id = " << _driver_sequence << ", update to " << io_tasks;
// } else {
// // if buffer is enough. then don't do anything.
// }
// return io_tasks;
io_tasks = value;
return value;
};

int value = f();
state.adjusted_io_tasks = true;
VLOG_FILE << "[XXX] pickup morsel. id = " << _driver_sequence << ", P = " << value << ", chunk/thres = " << chunks
<< "/" << thres;
return value;
}

// ==================== ConnectorChunkSource ====================
ConnectorChunkSource::ConnectorChunkSource(ScanOperator* op, RuntimeProfile* runtime_profile, MorselPtr&& morsel,
ConnectorScanNode* scan_node, BalancedChunkBuffer& chunk_buffer)
Expand Down
20 changes: 20 additions & 0 deletions be/src/exec/pipeline/scan/connector_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,26 @@ class ConnectorScanOperator : public ScanOperator {
ChunkBufferTokenPtr pin_chunk(int num_chunks) override;
bool is_buffer_full() const override;
void set_buffer_finished() override;

bool is_running_all_io_tasks() const override;
int update_pickup_morsel_state() override;
void finish_driver_process() override;
bool has_output() const override;

private:
struct PickupMorselState {
static constexpr int N = 32;
int history_chunks[N];
int history_P[N];
int history_index = 0;
int flip_counter = 0;
int moving_value = 0;
int max_io_tasks = 0;
int64 early_cut = 0;
bool adjusted_io_tasks = false;
};
mutable PickupMorselState _pickup_morsel_state;
bool _enable_adaptive_io_tasks = true;
};

class ConnectorChunkSource : public ChunkSource {
Expand Down
30 changes: 27 additions & 3 deletions be/src/exec/pipeline/scan/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ Status ScanOperator::prepare(RuntimeState* state) {
_submit_task_counter = ADD_COUNTER(_unique_metrics, "SubmitTaskCount", TUnit::UNIT);
_peak_scan_task_queue_size_counter = _unique_metrics->AddHighWaterMarkCounter(
"PeakScanTaskQueueSize", TUnit::UNIT, RuntimeProfile::Counter::create_strategy(TUnit::UNIT));
_peak_io_tasks_counter = _unique_metrics->AddHighWaterMarkCounter(
"PeakIOTasks", TUnit::UNIT, RuntimeProfile::Counter::create_strategy(TCounterAggregateType::AVG));

RETURN_IF_ERROR(do_prepare(state));
return Status::OK();
Expand Down Expand Up @@ -111,6 +113,10 @@ size_t ScanOperator::_buffer_unplug_threshold() const {
return threshold;
}

bool ScanOperator::is_running_all_io_tasks() const {
return _num_running_io_tasks >= _io_tasks_per_scan_operator;
}

bool ScanOperator::has_output() const {
if (_is_finished) {
return false;
Expand All @@ -132,6 +138,7 @@ bool ScanOperator::has_output() const {
// 2.4 No more tasks: pull_chunk, so return true

size_t chunk_number = num_buffered_chunks();
// VLOG_FILE << "[ZZZ] chunk = " << chunk_number << ", unplug = " << _unpluging;
if (_unpluging) {
if (chunk_number > 0) {
return true;
Expand All @@ -150,7 +157,7 @@ bool ScanOperator::has_output() const {
return chunk_number > 0;
}

if (_num_running_io_tasks >= _io_tasks_per_scan_operator) {
if (is_running_all_io_tasks()) {
return false;
}

Expand Down Expand Up @@ -250,6 +257,10 @@ int64_t ScanOperator::global_rf_wait_timeout_ns() const {
return 1000'000L * global_rf_collector->scan_wait_timeout_ms();
}
Status ScanOperator::_try_to_trigger_next_scan(RuntimeState* state) {
// to sure to put it here for updating state.
// because we want to update state based on raw data.
int total_cnt = update_pickup_morsel_state();

if (_num_running_io_tasks >= _io_tasks_per_scan_operator) {
return Status::OK();
}
Expand All @@ -259,21 +270,34 @@ Status ScanOperator::_try_to_trigger_next_scan(RuntimeState* state) {
// Avoid uneven distribution when io tasks execute very fast, so we start
// traverse the chunk_source array from last visit idx
int cnt = _io_tasks_per_scan_operator;
int to_sched[_io_tasks_per_scan_operator];
int size = 0;

// pick up already started chunk source.
while (--cnt >= 0) {
_chunk_source_idx = (_chunk_source_idx + 1) % _io_tasks_per_scan_operator;
int i = _chunk_source_idx;
if (_is_io_task_running[i]) {
total_cnt -= 1;
continue;
}

if (_chunk_sources[i] != nullptr && _chunk_sources[i]->has_next_chunk()) {
RETURN_IF_ERROR(_trigger_next_scan(state, i));
total_cnt -= 1;
} else {
RETURN_IF_ERROR(_pickup_morsel(state, i));
to_sched[size++] = i;
}
}

size = std::min(size, total_cnt);
// pick up new chunk source.
for (int i = 0; i < size; i++) {
int idx = to_sched[i];
RETURN_IF_ERROR(_pickup_morsel(state, idx));
}

_peak_io_tasks_counter->set(_num_running_io_tasks);

return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/pipeline/scan/scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class ScanOperator : public SourceOperator {

void set_query_ctx(const QueryContextPtr& query_ctx);

virtual bool is_running_all_io_tasks() const;
virtual int update_pickup_morsel_state() { return _io_tasks_per_scan_operator; }
virtual void finish_driver_process() {}

protected:
static constexpr size_t kIOTaskBatchSize = 64;

Expand Down Expand Up @@ -173,6 +177,7 @@ class ScanOperator : public SourceOperator {
RuntimeProfile::HighWaterMarkCounter* _peak_scan_task_queue_size_counter = nullptr;
// The total number of the original tablets in this fragment instance.
RuntimeProfile::Counter* _tablets_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_io_tasks_counter = nullptr;
};

class ScanOperatorFactory : public SourceOperatorFactory {
Expand Down
6 changes: 6 additions & 0 deletions be/src/io/s3_input_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include "metrics/metrics.h"
#endif

#include "common/config.h"

namespace starrocks::io {

inline Status make_error_status(const Aws::S3::S3Error& error) {
Expand All @@ -46,6 +48,10 @@ StatusOr<int64_t> S3InputStream::read(void* out, int64_t count) {
request.SetKey(_object);
request.SetRange(std::move(range));

if (config::io_sleep_ms > 0) {
usleep(config::io_sleep_ms * 1000);
}

Aws::S3::Model::GetObjectOutcome outcome = _s3client->GetObject(request);
if (outcome.IsSuccess()) {
Aws::IOStream& body = outcome.GetResult().GetBody();
Expand Down
1 change: 0 additions & 1 deletion be/test/exprs/hyperloglog_functions_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ TEST_F(HyperLogLogFunctionsTest, hllSerializeTest) {
ASSERT_TRUE(v->is_numeric());
auto expect = ColumnHelper::cast_to<TYPE_BIGINT>(v);


v = HyperloglogFunctions::hll_serialize(ctx, {col1}).value();
ASSERT_TRUE(v->is_binary());
v = HyperloglogFunctions::hll_deserialize(ctx, {v}).value();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1013,9 +1013,6 @@ public FragmentScanRangeAssignment getFragmentScanRangeAssignment(PlanFragmentId
@VisibleForTesting
void computeScanRangeAssignment() throws Exception {
SessionVariable sv = connectContext.getSessionVariable();



// set scan ranges/locations for scan nodes
for (ScanNode scanNode : scanNodes) {
// the parameters of getScanRangeLocations may ignore, It dosn't take effect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String HUDI_MOR_FORCE_JNI_READER = "hudi_mor_force_jni_reader";
public static final String IO_TASKS_PER_SCAN_OPERATOR = "io_tasks_per_scan_operator";
public static final String CONNECTOR_IO_TASKS_PER_SCAN_OPERATOR = "connector_io_tasks_per_scan_operator";
public static final String ENABLE_CONNECTOR_ADAPTIVE_IO_TASKS = "enable_connector_adaptive_io_tasks";

public static final String ENABLE_QUERY_CACHE = "enable_query_cache";
public static final String QUERY_CACHE_FORCE_POPULATE = "query_cache_force_populate";
Expand Down Expand Up @@ -907,6 +908,9 @@ public void setEnableParallelMerge(boolean enableParallelMerge) {
@VariableMgr.VarAttr(name = CONNECTOR_IO_TASKS_PER_SCAN_OPERATOR)
private int connectorIoTasksPerScanOperator = 16;

@VariableMgr.VarAttr(name = ENABLE_CONNECTOR_ADAPTIVE_IO_TASKS)
private boolean enableConnectorAdaptiveIoTasks = true;

@VariableMgr.VarAttr(name = ENABLE_POPULATE_BLOCK_CACHE)
private boolean enablePopulateBlockCache = true;

Expand Down Expand Up @@ -2041,6 +2045,7 @@ public TQueryOptions toThrift() {
tResult.setHudi_mor_force_jni_reader(hudiMORForceJNIReader);
tResult.setIo_tasks_per_scan_operator(ioTasksPerScanOperator);
tResult.setConnector_io_tasks_per_scan_operator(connectorIoTasksPerScanOperator);
tResult.setEnable_connector_adaptive_io_tasks(enableConnectorAdaptiveIoTasks);
return tResult;
}

Expand Down
1 change: 1 addition & 0 deletions gensrc/thrift/InternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ struct TQueryOptions {
86: optional i32 io_tasks_per_scan_operator = 4;
87: optional i32 connector_io_tasks_per_scan_operator = 16;
88: optional double runtime_filter_early_return_selectivity = 0.05;
89: optional bool enable_connector_adaptive_io_tasks = true;
}


Expand Down