Skip to content

Commit

Permalink
Common: add benchmarking module (close #370)
Browse files Browse the repository at this point in the history
  • Loading branch information
chuwy committed Oct 16, 2020
1 parent 9a4c4bd commit 0d35c6e
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 0 deletions.
5 changes: 5 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,8 @@ lazy val fs2 = project
.enablePlugins(BuildInfoPlugin)
.settings(BuildSettings.dockerSettings)
.enablePlugins(BuildInfoPlugin, JavaAppPackaging, DockerPlugin)

lazy val bench = project
.in(file("modules/bench"))
.dependsOn(fs2 % "test->test")
.enablePlugins(JmhPlugin)
6 changes: 6 additions & 0 deletions modules/bench/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
sourceDirectory in Jmh := (sourceDirectory in Test).value
classDirectory in Jmh := (classDirectory in Test).value
dependencyClasspath in Jmh := (dependencyClasspath in Test).value
// rewire tasks, so that 'jmh:run' automatically invokes 'jmh:compile' (otherwise a clean 'jmh:run' would fail)
compile in Jmh := (compile in Jmh).dependsOn(compile in Test).value
run in Jmh := (run in Jmh).dependsOn(Keys.compile in Jmh).evaluated
2 changes: 2 additions & 0 deletions modules/bench/src/test/resources/simplelogger.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.fs2.Assets=off
org.slf4j.simpleLogger.log.com.snowplowanalytics.snowplow.enrich.fs2.test.TestEnvironment=off
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.bench

import org.openjdk.jmh.annotations._
import java.util.concurrent.TimeUnit

import cats.effect.{ContextShift, IO, Clock}

import fs2.Stream

import com.snowplowanalytics.iglu.client.Client

import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader
import com.snowplowanalytics.snowplow.enrich.fs2.test.TestEnvironment
import com.snowplowanalytics.snowplow.enrich.fs2.{Enrich, Environment, EnrichSpec, Payload}

import org.apache.http.message.BasicNameValuePair


/**
* @example
* {{{
* jmh:run -i 15 -wi 10 -f1 -t1 EnrichBench
* }}}
*/
@State(Scope.Thread)
@BenchmarkMode(Array(Mode.AverageTime))
@OutputTimeUnit(TimeUnit.MICROSECONDS)
class EnrichBench {

implicit val ioClock: Clock[IO] = Clock.create[IO]

@Benchmark
def measureEnrichWithMinimalPayload(state: EnrichBench.BenchState) = {
Enrich.enrichWith[IO](IO.pure(EnrichmentRegistry()), Client.IgluCentral, None, (_: Option[Long]) => IO.unit)(state.raw).unsafeRunSync()
}

@Benchmark
def measureToCollectorPayload(state: EnrichBench.BenchState) = {
ThriftLoader.toCollectorPayload(state.raw.data, Enrich.processor)
}

@Benchmark
@OperationsPerInvocation(50) // 5 events repetated 10 times
def measureRunWithNoEnrichments(state: EnrichBench.BenchState) = {
// We used this benchmark to check if running the whole `enrichWith` on a blocking
// thread-pool will give us increase in performance. Results haven't confirm it:
// EnrichBench.measureRunWithNoEnrichments avgt 15 341.144 ± 18.884 us/op <- smaller blocker
// EnrichBench.measureRunWithNoEnrichments avgt 15 326.608 ± 16.714 us/op <- wrapping blocker
// EnrichBench.measureRunWithNoEnrichments avgt 15 292.907 ± 15.894 us/op <- no blocker at all
// However, I'm still leaving the "smaller blocker" in a hope that with actual IO enrichments
// it will give the expected increase in performance
implicit val CS: ContextShift[IO] = state.contextShift
state.useEnvironment(e => Enrich.run[IO](e).compile.drain).unsafeRunSync()
}
}

object EnrichBench {
@State(Scope.Benchmark)
class BenchState {
var raw: Payload[IO, Array[Byte]] = _
var useEnvironment: (Environment[IO] => IO[Unit]) => IO[Unit] = _
var contextShift: ContextShift[IO] = _

@Setup(Level.Trial)
def setup(): Unit = {

raw = EnrichSpec.payload[IO]

val input = Stream.emits(List(
EnrichSpec.colllectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.40") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.41") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.42") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.43") :: EnrichSpec.querystring
),
EnrichSpec.colllectorPayload.copy(
querystring = new BasicNameValuePair("ip", "125.12.2.44") :: EnrichSpec.querystring
),
)).repeatN(10).map(cp => Payload(cp.toRaw, IO.unit)).covary[IO]

useEnvironment = TestEnvironment.make(input).map(_.env).use(_: Environment[IO] => IO[Unit])

contextShift = IO.contextShift(scala.concurrent.ExecutionContext.global)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.bench

import org.openjdk.jmh.annotations._

import java.util.concurrent.TimeUnit

import cats.Id
import cats.data.Validated

import cats.effect.{IO, Clock}

import io.circe.Json

import com.snowplowanalytics.iglu.client.{Resolver, Client, CirceValidator}

import com.snowplowanalytics.snowplow.enrich.common.EtlPipeline
import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry

import com.snowplowanalytics.snowplow.enrich.fs2.{Enrich, EnrichSpec}

import org.joda.time.DateTime

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.AverageTime, Mode.Throughput))
@OutputTimeUnit(TimeUnit.MICROSECONDS)
class EtlPipelineBench {

private implicit val ioClock: Clock[IO] = Clock.create[IO]

private implicit val idClock: Clock[Id] = new Clock[Id] {
final def realTime(unit: TimeUnit): Id[Long] =
unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
final def monotonic(unit: TimeUnit): Id[Long] =
unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS)
}

@Benchmark
def measureProcessEventsIO(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
EtlPipeline.processEvents[IO](state.adapterRegistry, state.enrichmentRegistryIo, Client.IgluCentral, Enrich.processor, state.dateTime, Validated.Valid(Some(payload))).unsafeRunSync()
}

@Benchmark
def measureProcessEventsId(state: EtlPipelineBench.BenchState) = {
val payload = EnrichSpec.colllectorPayload
EtlPipeline.processEvents[Id](state.adapterRegistry, state.enrichmentRegistryId, state.clientId, Enrich.processor, state.dateTime, Validated.Valid(Some(payload)))
}
}

object EtlPipelineBench {


@State(Scope.Benchmark)
class BenchState {
var dateTime: DateTime = _
var adapterRegistry: AdapterRegistry = _
var enrichmentRegistryId: EnrichmentRegistry[Id] = _
var enrichmentRegistryIo: EnrichmentRegistry[IO] = _
var clientId: Client[Id, Json] = _
var clientIO: Client[IO, Json] = _

@Setup(Level.Trial)
def setup(): Unit = {
dateTime = DateTime.parse("2010-06-30T01:20+02:00")
adapterRegistry = new AdapterRegistry()
enrichmentRegistryId = EnrichmentRegistry[Id]()
enrichmentRegistryIo = EnrichmentRegistry[IO]()
clientId = Client[Id, Json](Resolver(List(), None), CirceValidator)
clientIO = Client[IO, Json](Resolver(List(), None), CirceValidator)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.bench

import org.openjdk.jmh.annotations._

import java.util.concurrent.TimeUnit

import com.snowplowanalytics.snowplow.enrich.common.loaders.ThriftLoader
import com.snowplowanalytics.snowplow.enrich.fs2.{Enrich, EnrichSpec}

@State(Scope.Thread)
@BenchmarkMode(Array(Mode.AverageTime, Mode.Throughput))
@OutputTimeUnit(TimeUnit.MICROSECONDS)
class ThriftLoaderBench {

@Benchmark
def measureToCollectorPayload(state: EnrichBench.BenchState) =
ThriftLoader.toCollectorPayload(state.raw.data, Enrich.processor)
}

object ThriftLoaderBench {
@State(Scope.Benchmark)
class BenchState {
var data: Array[Byte] = _

@Setup(Level.Trial)
def setup(): Unit = {
data = EnrichSpec.colllectorPayload.toRaw
}
}
}

1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.4")
addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1")
addSbtPlugin("net.virtual-void" % "sbt-dependency-graph" % "0.10.0-RC1")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.0")

0 comments on commit 0d35c6e

Please sign in to comment.