diff --git a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index 4395a0b9..294ab906 100644 --- a/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -16,7 +16,6 @@ package monix.kafka -import cats.effect.concurrent.Semaphore import com.typesafe.scalalogging.StrictLogging import monix.eval.{Coeval, Task} import monix.execution.Ack.{Continue, Stop} @@ -47,7 +46,6 @@ final class KafkaProducerSink[K, V] private ( implicit val scheduler: Scheduler = s private[this] val p = producer.memoize private[this] var isActive = true - private[this] val semaphore: Task[Semaphore[Task]] = Semaphore[Task](parallelism).memoize def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] = self.synchronized { @@ -56,12 +54,8 @@ final class KafkaProducerSink[K, V] private ( val sendTask: Task[Seq[Option[RecordMetadata]]] = if (parallelism == 1) Task.traverse(list)(p.value().send(_)) - else { - for { - s <- semaphore - res <- Task.wander(list)(r => s.withPermit(p.value().send(r))) - } yield res - } + else + Task.wanderN(parallelism)(list)(r => p.value().send(r)) val recovered = sendTask.map(_ => Continue).onErrorHandle { ex => logger.error("Unexpected error in KafkaProducerSink", ex) diff --git a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index 4395a0b9..294ab906 100644 --- a/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -16,7 +16,6 @@ package monix.kafka -import cats.effect.concurrent.Semaphore import com.typesafe.scalalogging.StrictLogging import monix.eval.{Coeval, Task} import monix.execution.Ack.{Continue, Stop} @@ -47,7 +46,6 @@ final class KafkaProducerSink[K, V] private ( implicit val scheduler: Scheduler = s private[this] val p = producer.memoize private[this] var isActive = true - private[this] val semaphore: Task[Semaphore[Task]] = Semaphore[Task](parallelism).memoize def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] = self.synchronized { @@ -56,12 +54,8 @@ final class KafkaProducerSink[K, V] private ( val sendTask: Task[Seq[Option[RecordMetadata]]] = if (parallelism == 1) Task.traverse(list)(p.value().send(_)) - else { - for { - s <- semaphore - res <- Task.wander(list)(r => s.withPermit(p.value().send(r))) - } yield res - } + else + Task.wanderN(parallelism)(list)(r => p.value().send(r)) val recovered = sendTask.map(_ => Continue).onErrorHandle { ex => logger.error("Unexpected error in KafkaProducerSink", ex) diff --git a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index 4395a0b9..294ab906 100644 --- a/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -16,7 +16,6 @@ package monix.kafka -import cats.effect.concurrent.Semaphore import com.typesafe.scalalogging.StrictLogging import monix.eval.{Coeval, Task} import monix.execution.Ack.{Continue, Stop} @@ -47,7 +46,6 @@ final class KafkaProducerSink[K, V] private ( implicit val scheduler: Scheduler = s private[this] val p = producer.memoize private[this] var isActive = true - private[this] val semaphore: Task[Semaphore[Task]] = Semaphore[Task](parallelism).memoize def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] = self.synchronized { @@ -56,12 +54,8 @@ final class KafkaProducerSink[K, V] private ( val sendTask: Task[Seq[Option[RecordMetadata]]] = if (parallelism == 1) Task.traverse(list)(p.value().send(_)) - else { - for { - s <- semaphore - res <- Task.wander(list)(r => s.withPermit(p.value().send(r))) - } yield res - } + else + Task.wanderN(parallelism)(list)(r => p.value().send(r)) val recovered = sendTask.map(_ => Continue).onErrorHandle { ex => logger.error("Unexpected error in KafkaProducerSink", ex) diff --git a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala index 4395a0b9..294ab906 100644 --- a/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala +++ b/kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala @@ -16,7 +16,6 @@ package monix.kafka -import cats.effect.concurrent.Semaphore import com.typesafe.scalalogging.StrictLogging import monix.eval.{Coeval, Task} import monix.execution.Ack.{Continue, Stop} @@ -47,7 +46,6 @@ final class KafkaProducerSink[K, V] private ( implicit val scheduler: Scheduler = s private[this] val p = producer.memoize private[this] var isActive = true - private[this] val semaphore: Task[Semaphore[Task]] = Semaphore[Task](parallelism).memoize def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] = self.synchronized { @@ -56,12 +54,8 @@ final class KafkaProducerSink[K, V] private ( val sendTask: Task[Seq[Option[RecordMetadata]]] = if (parallelism == 1) Task.traverse(list)(p.value().send(_)) - else { - for { - s <- semaphore - res <- Task.wander(list)(r => s.withPermit(p.value().send(r))) - } yield res - } + else + Task.wanderN(parallelism)(list)(r => p.value().send(r)) val recovered = sendTask.map(_ => Continue).onErrorHandle { ex => logger.error("Unexpected error in KafkaProducerSink", ex) diff --git a/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala b/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala index 2e84efa6..7c42c3a7 100644 --- a/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala +++ b/kafka-1.0.x/src/test/scala/monix/kafka/MonixKafkaTopicListTest.scala @@ -22,8 +22,8 @@ import monix.eval.Task import monix.execution.Scheduler.Implicits.global import monix.kafka.config.AutoOffsetReset import monix.reactive.Observable -import org.apache.kafka.common.TopicPartition import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition import org.scalatest.FunSuite import scala.collection.JavaConverters._ @@ -189,4 +189,35 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit { assert(first.isDefined && second.isRight && third.isEmpty) } } + + test("slow batches processing doesn't cause rebalancing") { + withRunningKafka { + val count = 10000 + + val consumerConfig = consumerCfg.copy( + maxPollInterval = 200.millis, + pollInterval = 100.millis + ) + + val producer = KafkaProducerSink[String, String](producerCfg, io) + val consumer = KafkaConsumerObservable[String, String](consumerConfig, List(topicName)).executeOn(io) + + val pushT = Observable + .range(0, count) + .map(msg => new ProducerRecord(topicName, "obs", msg.toString)) + .bufferIntrospective(1024) + .consumeWith(producer) + + val listT = consumer + .take(count) + .map(_.value()) + .bufferTumbling(count / 4) + .mapEval(s => Task.sleep(2.second) >> Task.delay(s)) + .flatMap(Observable.fromIterable) + .toListL + + val (result, _) = Await.result(Task.parZip2(listT.executeAsync, pushT.executeAsync).runToFuture, 60.seconds) + assert(result.map(_.toInt).sum === (0 until count).sum) + } + } }