Skip to content

Commit

Permalink
Stream FS2: allow source/sink concurrency settings to be configurable (
Browse files Browse the repository at this point in the history
…close #458)
  • Loading branch information
istreeter authored and oguzhanunlu committed May 6, 2021
1 parent ea7282d commit 6a3e074
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ object Environment {

private def validateAttributes(output: OutputConfig): ValidationResult[OutputConfig] =
output match {
case OutputConfig.PubSub(_, optAttributes) =>
optAttributes
case ps: OutputConfig.PubSub =>
ps.attributes
.fold[ValidationResult[OutputConfig]](output.valid) { attributes =>
val invalidAttributes = attributes.filterNot(enrichedFieldsMap.contains)
if (invalidAttributes.nonEmpty) NonEmptyList(invalidAttributes.head, invalidAttributes.tail.toList).invalid
Expand All @@ -249,13 +249,13 @@ object Environment {

private[fs2] def outputAttributes(output: OutputConfig): EnrichedEvent => Map[String, String] =
output match {
case OutputConfig.PubSub(_, Some(attributes)) =>
case OutputConfig.PubSub(_, Some(attributes), _, _, _, _) =>
val fields = enrichedFieldsMap.filter {
case (s, _) =>
attributes.contains(s)
}
attributesFromFields(fields)
case OutputConfig.PubSub(_, None) => _ => Map.empty
case OutputConfig.PubSub(_, None, _, _, _, _) => _ => Map.empty
case OutputConfig.FileSystem(_) =>
_ => Map.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object Payload {
* Controls the maximum number of payloads we can be waiting to get sunk
*
* For the Pubsub sink this should at least exceed the number events we can sink within
* [[io.Sinks.DelayThreshold]].
* [[io.Sinks.DefaultDelayThreshold]].
*
* For the FileSystem source this is the primary way that we control the memory footprint of the
* app.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ object io {

object Input {

case class PubSub private (subscription: String) extends Input {
case class PubSub private (
subscription: String,
parallelPullCount: Option[Int],
maxQueueSize: Option[Int]
) extends Input {
val (project, name) =
subscription.split("/").toList match {
case List("projects", project, "subscriptions", name) =>
Expand All @@ -58,24 +62,40 @@ object io {
case class FileSystem(dir: Path) extends Input

implicit val inputDecoder: Decoder[Input] =
deriveConfiguredDecoder[Input].emap {
case s @ PubSub(sub) =>
sub.split("/").toList match {
case List("projects", _, "subscriptions", _) =>
s.asRight
case _ =>
s"Subscription must conform projects/project-name/subscriptions/subscription-name format, $s given".asLeft
}
case other => other.asRight
}
deriveConfiguredDecoder[Input]
.emap {
case s @ PubSub(sub, _, _) =>
sub.split("/").toList match {
case List("projects", _, "subscriptions", _) =>
s.asRight
case _ =>
s"Subscription must conform projects/project-name/subscriptions/subscription-name format, $s given".asLeft
}
case other => other.asRight
}
.emap {
case PubSub(_, Some(p), _) if p < 0 =>
"PubSub parallelPullCount must be > 0".asLeft
case PubSub(_, _, Some(m)) if m < 0 =>
"PubSub maxQueueSize must be > 0".asLeft
case other =>
other.asRight
}
implicit val inputEncoder: Encoder[Input] =
deriveConfiguredEncoder[Input]
}

sealed trait Output

object Output {
case class PubSub private (topic: String, attributes: Option[Set[String]]) extends Output {
case class PubSub private (
topic: String,
attributes: Option[Set[String]],
delayThreshold: Option[FiniteDuration],
maxBatchSize: Option[Long],
maxBatchBytes: Option[Long],
numCallbackExecutors: Option[Int]
) extends Output {
val (project, name) =
topic.split("/").toList match {
case List("projects", project, "topics", name) =>
Expand All @@ -87,16 +107,32 @@ object io {
case class FileSystem(file: Path) extends Output

implicit val outputDecoder: Decoder[Output] =
deriveConfiguredDecoder[Output].emap {
case s @ PubSub(top, _) =>
top.split("/").toList match {
case List("projects", _, "topics", _) =>
s.asRight
case _ =>
s"Topic must conform projects/project-name/topics/topic-name format, $s given".asLeft
}
case other => other.asRight
}
deriveConfiguredDecoder[Output]
.emap {
case s @ PubSub(top, _, _, _, _, _) =>
top.split("/").toList match {
case List("projects", _, "topics", _) =>
s.asRight
case _ =>
s"Topic must conform projects/project-name/topics/topic-name format, $top given".asLeft
}
case other => other.asRight
}
.emap {
case PubSub(_, _, Some(d), _, _, _) if d < Duration.Zero =>
"PubSub delay threshold cannot be less than 0".asLeft
case PubSub(_, _, _, Some(m), _, _) if m < 0 =>
"PubSub max batch size cannot be less than 0".asLeft
case PubSub(_, _, _, _, Some(m), _) if m < 0 =>
"PubSub max batch bytes cannot be less than 0".asLeft
case PubSub(_, _, _, _, _, Some(m)) if m < 0 =>
"PubSub callback executors cannot be less than 0".asLeft
case other =>
other.asRight
}

import ConfigFile.finiteDurationEncoder

implicit val outputEncoder: Encoder[Output] =
deriveConfiguredEncoder[Output]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,27 @@ object Sinks {
* batches if DelayThreshold is too large. If the source MaxQueueSize is sufficiently large,
* then the DelayThreshold should not ever cause blocking.
*/
val DelayThreshold: FiniteDuration = 200.milliseconds
val DefaultDelayThreshold: FiniteDuration = 200.milliseconds

/**
* A batch of messages will be emitted to PubSub when the batch reaches 1000 messages.
* We use 1000 because it is the maximum batch size allowed by PubSub.
* This overrides the permutive library default of `5`
*/
val PubsubMaxBatchSize = 1000L
val DefaultPubsubMaxBatchSize = 1000L

/**
* A batch of messages will be emitted to PubSub when the batch reaches 10 MB.
* We use 10MB because it is the maximum batch size allowed by PubSub.
*/
val PubsubMaxBatchBytes = 10000000L
val DefaultPubsubMaxBatchBytes = 10000000L

/**
* The number of threads used internally by permutive library to process the callback after message delivery.
* The callback does very little "work" so we use the minimum number of threads.
* This overrides the permutive library default of 3 * num processors
*/
val NumCallbackExecutors = 1
val DefaultNumCallbackExecutors = 1

private implicit def unsafeLogger[F[_]: Sync]: Logger[F] =
Slf4jLogger.getLogger[F]
Expand Down Expand Up @@ -96,10 +96,10 @@ object Sinks {
output: Output.PubSub
): Resource[F, AttributedData[A] => F[Unit]] = {
val config = PubsubProducerConfig[F](
batchSize = PubsubMaxBatchSize,
requestByteThreshold = Some(PubsubMaxBatchBytes),
delayThreshold = DelayThreshold,
callbackExecutors = NumCallbackExecutors,
batchSize = output.maxBatchSize.getOrElse(DefaultPubsubMaxBatchSize),
requestByteThreshold = Some(output.maxBatchBytes.getOrElse(DefaultPubsubMaxBatchBytes)),
delayThreshold = output.delayThreshold.getOrElse(DefaultDelayThreshold),
callbackExecutors = output.numCallbackExecutors.getOrElse(DefaultNumCallbackExecutors),
onFailedTerminate = err => Logger[F].error(err)("PubSub sink termination error")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object Source {
* These threads do very little "work" apart from writing the message to a concurrent Queue.
* Overrides the permutive library default of `3`.
*/
val ParallelPullCount = 1
val DefaultParallelPullCount = 1

/**
* Configures the "max outstanding element count" of pubSub.
Expand All @@ -48,7 +48,7 @@ object Source {
*
* (`1000` is the permutive library default; we re-define it here to be explicit)
*/
val MaxQueueSize = 1000
val DefaultMaxQueueSize = 1000

def read[F[_]: Concurrent: ContextShift](
blocker: Blocker,
Expand All @@ -74,7 +74,11 @@ object Source {
val onFailedTerminate: Throwable => F[Unit] =
e => Sync[F].delay(System.err.println(s"Cannot terminate ${e.getMessage}"))
val pubSubConfig =
PubsubGoogleConsumerConfig(onFailedTerminate = onFailedTerminate, parallelPullCount = ParallelPullCount, maxQueueSize = MaxQueueSize)
PubsubGoogleConsumerConfig(
onFailedTerminate = onFailedTerminate,
parallelPullCount = input.parallelPullCount.getOrElse(DefaultParallelPullCount),
maxQueueSize = input.maxQueueSize.getOrElse(DefaultMaxQueueSize)
)
val projectId = Model.ProjectId(input.project)
val subscriptionId = Model.Subscription(input.name)
val errorHandler: (PubsubMessage, Throwable, F[Unit], F[Unit]) => F[Unit] = // Should be useless
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ class CliConfigSpec extends Specification with CatsIO {

val expected = ConfigFile(
io.Authentication.Gcp,
io.Input.PubSub("projects/test-project/subscriptions/inputSub"),
io.Output.PubSub("projects/test-project/topics/good-topic", None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", "platform")))),
io.Output.PubSub("projects/test-project/topics/bad-topic", None),
io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None),
io.Output.PubSub("projects/test-project/topics/good-topic", None, None, None, None, None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", "platform")), None, None, None, None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None),
None,
None
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class ConfigFileSpec extends Specification with CatsIO {
val configPath = Paths.get(getClass.getResource("/config.fs2.hocon.sample").toURI)
val expected = ConfigFile(
io.Authentication.Gcp,
io.Input.PubSub("projects/test-project/subscriptions/inputSub"),
io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id"))),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None),
io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None),
io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", None, None, None, None, None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None),
Some(7.days),
Some(
io.Monitoring(
Expand Down Expand Up @@ -100,10 +100,10 @@ class ConfigFileSpec extends Specification with CatsIO {

val configFile = ConfigFile(
io.Authentication.Gcp,
io.Input.PubSub("projects/test-project/subscriptions/inputSub"),
io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1))),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)))),
io.Output.PubSub("projects/test-project/topics/bad-topic", None),
io.Input.PubSub("projects/test-project/subscriptions/inputSub", None, None),
io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id", invalidAttr1)), None, None, None, None),
Some(io.Output.PubSub("projects/test-project/topics/pii-topic", Some(Set("app_id", invalidAttr2)), None, None, None, None)),
io.Output.PubSub("projects/test-project/topics/bad-topic", None, None, None, None, None),
Some(7.days),
Some(
io.Monitoring(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class EnvironmentSpec extends Specification with CatsIO {

"outputAttributes" should {
"fetch attribute values" in {
val output = io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")))
val output = io.Output.PubSub("projects/test-project/topics/good-topic", Some(Set("app_id")), None, None, None, None)
val ee = new EnrichedEvent()
ee.app_id = "test_app"

Expand Down

0 comments on commit 6a3e074

Please sign in to comment.