Skip to content

Commit

Permalink
Add test for (monix#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
voidconductor committed Sep 23, 2019
1 parent d5b6c18 commit 9365a02
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 33 deletions.
10 changes: 2 additions & 8 deletions kafka-0.10.x/src/main/scala/monix/kafka/KafkaProducerSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package monix.kafka

import cats.effect.concurrent.Semaphore
import com.typesafe.scalalogging.StrictLogging
import monix.eval.{Coeval, Task}
import monix.execution.Ack.{Continue, Stop}
Expand Down Expand Up @@ -47,7 +46,6 @@ final class KafkaProducerSink[K, V] private (
implicit val scheduler: Scheduler = s
private[this] val p = producer.memoize
private[this] var isActive = true
private[this] val semaphore: Task[Semaphore[Task]] = Semaphore[Task](parallelism).memoize

def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] =
self.synchronized {
Expand All @@ -56,12 +54,8 @@ final class KafkaProducerSink[K, V] private (
val sendTask: Task[Seq[Option[RecordMetadata]]] =
if (parallelism == 1)
Task.traverse(list)(p.value().send(_))
else {
for {
s <- semaphore
res <- Task.wander(list)(r => s.withPermit(p.value().send(r)))
} yield res
}
else
Task.wanderN(parallelism)(list)(r => p.value().send(r))

val recovered = sendTask.map(_ => Continue).onErrorHandle { ex =>
logger.error("Unexpected error in KafkaProducerSink", ex)
Expand Down
10 changes: 2 additions & 8 deletions kafka-0.11.x/src/main/scala/monix/kafka/KafkaProducerSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package monix.kafka

import cats.effect.concurrent.Semaphore
import com.typesafe.scalalogging.StrictLogging
import monix.eval.{Coeval, Task}
import monix.execution.Ack.{Continue, Stop}
Expand Down Expand Up @@ -47,7 +46,6 @@ final class KafkaProducerSink[K, V] private (
implicit val scheduler: Scheduler = s
private[this] val p = producer.memoize
private[this] var isActive = true
private[this] val semaphore: Task[Semaphore[Task]] = Semaphore[Task](parallelism).memoize

def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] =
self.synchronized {
Expand All @@ -56,12 +54,8 @@ final class KafkaProducerSink[K, V] private (
val sendTask: Task[Seq[Option[RecordMetadata]]] =
if (parallelism == 1)
Task.traverse(list)(p.value().send(_))
else {
for {
s <- semaphore
res <- Task.wander(list)(r => s.withPermit(p.value().send(r)))
} yield res
}
else
Task.wanderN(parallelism)(list)(r => p.value().send(r))

val recovered = sendTask.map(_ => Continue).onErrorHandle { ex =>
logger.error("Unexpected error in KafkaProducerSink", ex)
Expand Down
10 changes: 2 additions & 8 deletions kafka-0.9.x/src/main/scala/monix/kafka/KafkaProducerSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package monix.kafka

import cats.effect.concurrent.Semaphore
import com.typesafe.scalalogging.StrictLogging
import monix.eval.{Coeval, Task}
import monix.execution.Ack.{Continue, Stop}
Expand Down Expand Up @@ -47,7 +46,6 @@ final class KafkaProducerSink[K, V] private (
implicit val scheduler: Scheduler = s
private[this] val p = producer.memoize
private[this] var isActive = true
private[this] val semaphore: Task[Semaphore[Task]] = Semaphore[Task](parallelism).memoize

def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] =
self.synchronized {
Expand All @@ -56,12 +54,8 @@ final class KafkaProducerSink[K, V] private (
val sendTask: Task[Seq[Option[RecordMetadata]]] =
if (parallelism == 1)
Task.traverse(list)(p.value().send(_))
else {
for {
s <- semaphore
res <- Task.wander(list)(r => s.withPermit(p.value().send(r)))
} yield res
}
else
Task.wanderN(parallelism)(list)(r => p.value().send(r))

val recovered = sendTask.map(_ => Continue).onErrorHandle { ex =>
logger.error("Unexpected error in KafkaProducerSink", ex)
Expand Down
10 changes: 2 additions & 8 deletions kafka-1.0.x/src/main/scala/monix/kafka/KafkaProducerSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package monix.kafka

import cats.effect.concurrent.Semaphore
import com.typesafe.scalalogging.StrictLogging
import monix.eval.{Coeval, Task}
import monix.execution.Ack.{Continue, Stop}
Expand Down Expand Up @@ -47,7 +46,6 @@ final class KafkaProducerSink[K, V] private (
implicit val scheduler: Scheduler = s
private[this] val p = producer.memoize
private[this] var isActive = true
private[this] val semaphore: Task[Semaphore[Task]] = Semaphore[Task](parallelism).memoize

def onNext(list: Seq[ProducerRecord[K, V]]): Future[Ack] =
self.synchronized {
Expand All @@ -56,12 +54,8 @@ final class KafkaProducerSink[K, V] private (
val sendTask: Task[Seq[Option[RecordMetadata]]] =
if (parallelism == 1)
Task.traverse(list)(p.value().send(_))
else {
for {
s <- semaphore
res <- Task.wander(list)(r => s.withPermit(p.value().send(r)))
} yield res
}
else
Task.wanderN(parallelism)(list)(r => p.value().send(r))

val recovered = sendTask.map(_ => Continue).onErrorHandle { ex =>
logger.error("Unexpected error in KafkaProducerSink", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.kafka.config.AutoOffsetReset
import monix.reactive.Observable
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.scalatest.FunSuite

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -189,4 +189,35 @@ class MonixKafkaTopicListTest extends FunSuite with KafkaTestKit {
assert(first.isDefined && second.isRight && third.isEmpty)
}
}

test("slow batches processing doesn't cause rebalancing") {
withRunningKafka {
val count = 10000

val consumerConfig = consumerCfg.copy(
maxPollInterval = 200.millis,
pollInterval = 100.millis
)

val producer = KafkaProducerSink[String, String](producerCfg, io)
val consumer = KafkaConsumerObservable[String, String](consumerConfig, List(topicName)).executeOn(io)

val pushT = Observable
.range(0, count)
.map(msg => new ProducerRecord(topicName, "obs", msg.toString))
.bufferIntrospective(1024)
.consumeWith(producer)

val listT = consumer
.take(count)
.map(_.value())
.bufferTumbling(count / 4)
.mapEval(s => Task.sleep(2.second) >> Task.delay(s))
.flatMap(Observable.fromIterable)
.toListL

val (result, _) = Await.result(Task.parZip2(listT.executeAsync, pushT.executeAsync).runToFuture, 60.seconds)
assert(result.map(_.toInt).sum === (0 until count).sum)
}
}
}

0 comments on commit 9365a02

Please sign in to comment.