From 229efb8d6bc31023612f1c74a995bbcbc7644fa9 Mon Sep 17 00:00:00 2001 From: Anton Parkhomenko Date: Sat, 10 Oct 2020 15:18:36 +0300 Subject: [PATCH] Common: add benchmarking module (close #370) --- build.sbt | 5 + modules/bench/build.sbt | 6 + .../test/resources/simplelogger.properties | 2 + .../EnrichBench.scala | 105 ++++++++++++++++++ .../EtlPipelineBench.scala | 85 ++++++++++++++ .../ThriftLoaderBench.scala | 43 +++++++ project/plugins.sbt | 1 + 7 files changed, 247 insertions(+) create mode 100644 modules/bench/build.sbt create mode 100644 modules/bench/src/test/resources/simplelogger.properties create mode 100644 modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala create mode 100644 modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala create mode 100644 modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala diff --git a/build.sbt b/build.sbt index d2a5d921c..0dd1566b0 100644 --- a/build.sbt +++ b/build.sbt @@ -233,3 +233,8 @@ lazy val fs2 = project .enablePlugins(BuildInfoPlugin) .settings(BuildSettings.dockerSettings) .enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin) + +lazy val bench = project + .in(file("modules/bench")) + .dependsOn(fs2 % "test->test") + .enablePlugins(JmhPlugin) diff --git a/modules/bench/build.sbt b/modules/bench/build.sbt new file mode 100644 index 000000000..6e115c649 --- /dev/null +++ b/modules/bench/build.sbt @@ -0,0 +1,6 @@ +sourceDirectory in Jmh := (sourceDirectory in Test).value +classDirectory in Jmh := (classDirectory in Test).value +dependencyClasspath in Jmh := (dependencyClasspath in Test).value +// rewire tasks, so that 'jmh:run' automatically invokes 'jmh:compile' (otherwise a clean 'jmh:run' would fail) +compile in Jmh := (compile in Jmh).dependsOn(compile in Test).value +run in Jmh := (run in Jmh).dependsOn(Keys.compile in Jmh).evaluated \ No newline at end of file diff --git a/modules/bench/src/test/resources/simplelogger.properties b/modules/bench/src/test/resources/simplelogger.properties new file mode 100644 index 000000000..7c0551b2b --- /dev/null +++ b/modules/bench/src/test/resources/simplelogger.properties @@ -0,0 +1,2 @@ +org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.fs2.Assets=off +org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.fs2.test.TestEnvironment=off diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala new file mode 100644 index 000000000..9fd5ff1bd --- /dev/null +++ b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EnrichBench.scala @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.bench + +import org.openjdk.jmh.annotations._ +import java.util.concurrent.TimeUnit + +import cats.effect.{ContextShift, IO, Clock} + +import fs2.Stream + +import com.snowplowanalytics.iglu.client.Client + +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry +import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader +import com.snowplowanalytics.snowplow.enrich.fs2.test.TestEnvironment +import com.snowplowanalytics.snowplow.enrich.fs2.{Enrich, Environment, EnrichSpec, Payload} + +import org.apache.http.message.BasicNameValuePair + + +/** + * @example + * {{{ + * jmh:run -i 15 -wi 10 -f1 -t1 EnrichBench + * }}} + */ +@State(Scope.Thread) +@BenchmarkMode(Array(Mode.AverageTime)) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +class EnrichBench { + + implicit val ioClock: Clock[IO] = Clock.create[IO] + + @Benchmark + def measureEnrichWithMinimalPayload(state: EnrichBench.BenchState) = { + Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), Client.IgluCentral, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync() + } + + @Benchmark + def measureToCollectorPayload(state: EnrichBench.BenchState) = { + ThriftLoader.toCollectorPayload(state.raw.data, Enrich.processor) + } + + @Benchmark + @OperationsPerInvocation(50) // 5 events repetated 10 times + def measureRunWithNoEnrichments(state: EnrichBench.BenchState) = { + // We used this benchmark to check if running the whole `enrichWith` on a blocking + // thread-pool will give us increase in performance. Results haven't confirm it: + // EnrichBench.measureRunWithNoEnrichments avgt 15 341.144 ± 18.884 us/op <- smaller blocker + // EnrichBench.measureRunWithNoEnrichments avgt 15 326.608 ± 16.714 us/op <- wrapping blocker + // EnrichBench.measureRunWithNoEnrichments avgt 15 292.907 ± 15.894 us/op <- no blocker at all + // However, I'm still leaving the "smaller blocker" in a hope that with actual IO enrichments + // it will give the expected increase in performance + implicit val CS: ContextShift[IO] = state.contextShift + state.useEnvironment(e => Enrich.run[IO](e).compile.drain).unsafeRunSync() + } +} + +object EnrichBench { + @State(Scope.Benchmark) + class BenchState { + var raw: Payload[IO, Array[Byte]] = _ + var useEnvironment: (Environment[IO] => IO[Unit]) => IO[Unit] = _ + var contextShift: ContextShift[IO] = _ + + @Setup(Level.Trial) + def setup(): Unit = { + + raw = EnrichSpec.payload[IO] + + val input = Stream.emits(List( + EnrichSpec.colllectorPayload.copy( + querystring = new BasicNameValuePair("ip", "125.12.2.40") :: EnrichSpec.querystring + ), + EnrichSpec.colllectorPayload.copy( + querystring = new BasicNameValuePair("ip", "125.12.2.41") :: EnrichSpec.querystring + ), + EnrichSpec.colllectorPayload.copy( + querystring = new BasicNameValuePair("ip", "125.12.2.42") :: EnrichSpec.querystring + ), + EnrichSpec.colllectorPayload.copy( + querystring = new BasicNameValuePair("ip", "125.12.2.43") :: EnrichSpec.querystring + ), + EnrichSpec.colllectorPayload.copy( + querystring = new BasicNameValuePair("ip", "125.12.2.44") :: EnrichSpec.querystring + ), + )).repeatN(10).map(cp => Payload(cp.toRaw, IO.unit)).covary[IO] + + useEnvironment = TestEnvironment.make(input).map(_.env).use(_: Environment[IO] => IO[Unit]) + + contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global) + } + } +} \ No newline at end of file diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala new file mode 100644 index 000000000..5b65e66de --- /dev/null +++ b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/EtlPipelineBench.scala @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.bench + +import org.openjdk.jmh.annotations._ + +import java.util.concurrent.TimeUnit + +import cats.Id +import cats.data.Validated + +import cats.effect.{IO, Clock} + +import io.circe.Json + +import com.snowplowanalytics.iglu.client.{Resolver, Client, CirceValidator} + +import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline +import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry +import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry + +import com.snowplowanalytics.snowplow.enrich.fs2.{Enrich, EnrichSpec} + +import org.joda.time.DateTime + +@State(Scope.Thread) +@BenchmarkMode(Array(Mode.AverageTime, Mode.Throughput)) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +class EtlPipelineBench { + + private implicit val ioClock: Clock[IO] = Clock.create[IO] + + private implicit val idClock: Clock[Id] = new Clock[Id] { + final def realTime(unit: TimeUnit): Id[Long] = + unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS) + final def monotonic(unit: TimeUnit): Id[Long] = + unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS) + } + + @Benchmark + def measureProcessEventsIO(state: EtlPipelineBench.BenchState) = { + val payload = EnrichSpec.colllectorPayload + EtlPipeline.processEvents[IO](state.adapterRegistry, state.enrichmentRegistryIo, Client.IgluCentral, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))).unsafeRunSync() + } + + @Benchmark + def measureProcessEventsId(state: EtlPipelineBench.BenchState) = { + val payload = EnrichSpec.colllectorPayload + EtlPipeline.processEvents[Id](state.adapterRegistry, state.enrichmentRegistryId, state.clientId, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))) + } +} + +object EtlPipelineBench { + + + @State(Scope.Benchmark) + class BenchState { + var dateTime: DateTime = _ + var adapterRegistry: AdapterRegistry = _ + var enrichmentRegistryId: EnrichmentRegistry[Id] = _ + var enrichmentRegistryIo: EnrichmentRegistry[IO] = _ + var clientId: Client[Id, Json] = _ + var clientIO: Client[IO, Json] = _ + + @Setup(Level.Trial) + def setup(): Unit = { + dateTime = DateTime.parse("2010-06-30T01:20+02:00") + adapterRegistry = new AdapterRegistry() + enrichmentRegistryId = EnrichmentRegistry[Id]() + enrichmentRegistryIo = EnrichmentRegistry[IO]() + clientId = Client[Id, Json](Resolver(List(), None), CirceValidator) + clientIO = Client[IO, Json](Resolver(List(), None), CirceValidator) + } + } +} diff --git a/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala new file mode 100644 index 000000000..77e9af0f5 --- /dev/null +++ b/modules/bench/src/test/scala/com.snowplowanalytics.snowplow.enrich.bench/ThriftLoaderBench.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.enrich.bench + +import org.openjdk.jmh.annotations._ + +import java.util.concurrent.TimeUnit + +import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader +import com.snowplowanalytics.snowplow.enrich.fs2.{Enrich, EnrichSpec} + +@State(Scope.Thread) +@BenchmarkMode(Array(Mode.AverageTime, Mode.Throughput)) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +class ThriftLoaderBench { + + @Benchmark + def measureToCollectorPayload(state: EnrichBench.BenchState) = + ThriftLoader.toCollectorPayload(state.raw.data, Enrich.processor) +} + +object ThriftLoaderBench { + @State(Scope.Benchmark) + class BenchState { + var data: Array[Byte] = _ + + @Setup(Level.Trial) + def setup(): Unit = { + data = EnrichSpec.colllectorPayload.toRaw + } + } +} + diff --git a/project/plugins.sbt b/project/plugins.sbt index 21136454b..180ed82ca 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -7,3 +7,4 @@ addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.4") addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1") addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.0") \ No newline at end of file