From b25aa33ef8465a756a021c6948b3a16aef7ce6ca Mon Sep 17 00:00:00 2001 From: YjyJeff <1731939194@qq.com> Date: Fri, 13 Sep 2024 00:08:02 +0800 Subject: [PATCH] fix: nested loop join requires outer table to be a FusedStream (#12189) Co-authored-by: jefffffyang --- datafusion/physical-plan/src/joins/nested_loop_join.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index dadd20714ead..c6f1833c13e0 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -316,6 +316,7 @@ impl ExecutionPlan for NestedLoopJoinExec { self.right().output_partitioning().partition_count(), ) }); + let outer_table = self.right.execute(partition, context)?; Ok(Box::pin(NestedLoopJoinStream { @@ -471,6 +472,12 @@ impl NestedLoopJoinStream { // Get or initialize visited_left_side bitmap if required by join type let visited_left_side = left_data.bitmap(); + // Check is_exhausted before polling the outer_table, such that when the outer table + // does not support `FusedStream`, Self will not poll it again + if self.is_exhausted { + return Poll::Ready(None); + } + self.outer_table .poll_next_unpin(cx) .map(|maybe_batch| match maybe_batch { @@ -501,8 +508,7 @@ impl NestedLoopJoinStream { } Some(err) => Some(err), None => { - if need_produce_result_in_final(self.join_type) && !self.is_exhausted - { + if need_produce_result_in_final(self.join_type) { // At this stage `visited_left_side` won't be updated, so it's // safe to report about probe completion. //