Skip to content

Commit

Permalink
Merge pull request #42091 from cescoffier/redis-fix-subscribeAsMessages
Browse files Browse the repository at this point in the history
Fix Redis Pub/Sub subscribeAsMessages method
  • Loading branch information
gsmet committed Jul 23, 2024
2 parents 52130c8 + 3743f32 commit 1c81a17
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,14 @@ public Multi<RedisPubSubMessage<V>> subscribeAsMessages(String... channels) {

List<String> list = List.of(channels);
return Multi.createFrom().emitter(emitter -> {
subscribe(list, (channel, value) -> new DefaultRedisPubSubMessage<>(value, channel), emitter::complete,
emitter::fail)
.subscribe().with(subscriber -> emitter
.onTermination(() -> subscriber.unsubscribe(channels).subscribe().asCompletionStage()));
subscribe(list,
(channel, value) -> emitter.emit(new DefaultRedisPubSubMessage<>(value, channel)),
emitter::complete, emitter::fail)
.subscribe().with(x -> {
emitter.onTermination(() -> {
x.unsubscribe(channels).subscribe().asCompletionStage();
});
}, emitter::fail);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void initialize() {
ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(5));
pubsub = ds.pubsub(Person.class);

ReactiveRedisDataSourceImpl reactiveDS = new ReactiveRedisDataSourceImpl(vertx, redis, api);
var reactiveDS = new ReactiveRedisDataSourceImpl(vertx, redis, api);
reactive = reactiveDS.pubsub(Person.class);
}

Expand Down Expand Up @@ -371,6 +371,35 @@ void subscribeToSingleWithMultiAsMessages() {

}

@Test
void testSubscribeAsMessages() {
List<RedisPubSubMessage<Person>> people = new CopyOnWriteArrayList<>();
Multi<RedisPubSubMessage<Person>> multi = reactive.subscribeAsMessages(channel);

Cancellable cancellable = multi.subscribe().with(people::add);

pubsub.publish("foo", new Person("luke", "skywalker"));
pubsub.publish(channel, new Person("luke", "skywalker"));

Awaitility.await().until(() -> people.size() == 1);

pubsub.publish(channel, new Person("leia", "skywalker"));
pubsub.publish(channel, new Person("leia", "skywalker"));
pubsub.publish(channel, new Person("leia", "skywalker"));

Awaitility.await().until(() -> people.size() == 4);

assertThat(people).allSatisfy(m -> {
assertThat(m.getChannel()).isNotBlank();
assertThat(m.getPayload()).isNotNull();
});

cancellable.cancel();

awaitNoMoreActiveChannels();

}

@Test
void unsubscribe() {

Expand Down

0 comments on commit 1c81a17

Please sign in to comment.