Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FluxBuffer hangs when buffer Supplier returns a Set #3821

Closed
Sage-Pierce opened this issue Jun 1, 2024 · 0 comments · Fixed by #3822
Closed

FluxBuffer hangs when buffer Supplier returns a Set #3821

Sage-Pierce opened this issue Jun 1, 2024 · 0 comments · Fixed by #3822
Assignees
Labels
type/enhancement A general enhancement
Milestone

Comments

@Sage-Pierce
Copy link
Contributor

Sage-Pierce commented Jun 1, 2024

If Flux.buffer(int maxSize, Supplier<Collection> bufferSupplier) is invoked where the bufferSupplier returns a Set, the stream may hang on duplicate upstream emissions and bounded downstream request.

Extra Context

I have a use case where I'd like to process batches of n distinct elements from a source at a time, and using Flux.buffer(n, LinkedHashSet::new) would be a convenient, concise way to do so.

Expected Behavior

The stream should not hang and should complete successfully

Actual Behavior

The stream hangs and never completes

Steps to Reproduce

This test will fail

	@Test
	public void supplierUsesSet() {
		Flux.just(1, 1, 2)
			.<Set<Integer>>buffer(2, HashSet::new)
			.take(1, true)
			.as(StepVerifier::create)
			.expectNext(Stream.of(1, 2).collect(Collectors.toSet()))
			.expectComplete()
			.verify(Duration.ofSeconds(2));
	}

If the test is changed to not contain duplicates, i.e. Flux.just(1, 2), the test passes.

Possible Solution

FluxBuffer ignores whether adding to the buffer actually has any effect on the buffer. If adding to the buffer doesn't modify it, an extra s.request(1) should be issued.

Your Environment

  • Reactor version(s) used: 3.7.0-SNAPSHOT
  • Other relevant libraries versions (eg. netty, ...): N/A
  • JVM version (java -version): 17.0.8
  • OS and version (eg uname -a): MacOS Darwin 23.5.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants