diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 616b62a30..ab63058e7 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -20,10 +20,10 @@ jobs: steps: - uses: actions/checkout@v2 - - name: Set up JDK 1.8 + - name: Set up JDK 11 uses: actions/setup-java@v1 with: - java-version: 1.8 + java-version: 11 - name: Prepare Mock server for SCE ApiRequestEnrichmentIntegrationTest (launch in background) run: python integration-tests/sce-api-lookup-test.py 8001 & - name: Prepare Postgres for SCE SqlLookupEnrichmentIntegrationTest (create entities) @@ -55,10 +55,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Set up JDK 1.8 + - name: Set up JDK 11 uses: actions/setup-java@v1 with: - java-version: 1.8 + java-version: 11 - name: Compare SBT version with git tag run: .github/check_tag.sh ${GITHUB_REF##*/} - name: Docker login @@ -81,10 +81,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - name: Set up JDK 1.8 + - name: Set up JDK 11 uses: actions/setup-java@v1 with: - java-version: 1.8 + java-version: 11 - name: Compare SBT version with git tag run: .github/check_tag.sh ${GITHUB_REF##*/} - name: Deploy SCE on Bintray Maven and Maven Central diff --git a/.jvmopts b/.jvmopts index ef0f90285..00c1707db 100644 --- a/.jvmopts +++ b/.jvmopts @@ -8,6 +8,5 @@ -XX:+TieredCompilation -XX:-UseGCOverheadLimit # effectively adds GC to Perm space --XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled # must be enabled for CMSClassUnloadingEnabled to work diff --git a/build.sbt b/build.sbt index 09db15707..7e8930f9b 100644 --- a/build.sbt +++ b/build.sbt @@ -19,7 +19,7 @@ lazy val root = project.in(file(".")) .settings(name := "enrich") .settings(BuildSettings.basicSettings) - .aggregate(common, beam, stream, kinesis, kafka, nsq, stdin, integrationTests) + .aggregate(common, beam, stream, kinesis, kafka, nsq, stdin) lazy val common = project .in(file("modules/common")) @@ -178,13 +178,4 @@ lazy val beam = ) .enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin) -lazy val integrationTests = project - .in(file("modules/integration-tests")) - .settings(moduleName := "integration-tests") - .settings(allStreamSettings) - .settings(BuildSettings.addExampleConfToTestCp) - .settings(libraryDependencies ++= Seq( - Dependencies.Libraries.kafka, - Dependencies.Libraries.jinJava - )) - .dependsOn(stream % "test->test", kafka % "test->compile") +Global / onChangedBuildSource := ReloadOnSourceChanges diff --git a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/enrichments/YauaaEnrichmentSpec.scala b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/enrichments/YauaaEnrichmentSpec.scala index 1610c046b..ac0288786 100644 --- a/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/enrichments/YauaaEnrichmentSpec.scala +++ b/modules/beam/src/test/scala/com.snowplowanalytics.snowplow.enrich.beam/enrichments/YauaaEnrichmentSpec.scala @@ -36,7 +36,8 @@ object YauaaEnrichmentSpec { "event_format" -> "jsonschema", "event_version" -> "1-0-0", "event" -> "page_ping", - "derived_contexts" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-1","data":{"deviceBrand":"Unknown","deviceName":"Desktop","operatingSystemVersionMajor":"7","layoutEngineNameVersion":"Gecko 12.0","operatingSystemNameVersion":"Windows 7","layoutEngineBuild":"20100101","layoutEngineNameVersionMajor":"Gecko 12","operatingSystemName":"Windows NT","agentVersionMajor":"12","layoutEngineVersionMajor":"12","deviceClass":"Desktop","agentNameVersionMajor":"Firefox 12","operatingSystemNameVersionMajor":"Windows 7","deviceCpuBits":"64","operatingSystemClass":"Desktop","layoutEngineName":"Gecko","agentName":"Firefox","agentVersion":"12.0","layoutEngineClass":"Browser","agentNameVersion":"Firefox 12.0","operatingSystemVersion":"7","deviceCpu":"Intel x86_64","agentClass":"Browser","layoutEngineVersion":"12.0"}}]}""".noSpaces) + "derived_contexts" -> json"""{"schema":"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1","data":[{"schema":"iglu:nl.basjes/yauaa_context/jsonschema/1-0-1","data":{"deviceBrand":"Unknown","deviceName":"Desktop","operatingSystemVersionMajor":"7","layoutEngineNameVersion":"Gecko 12.0","operatingSystemNameVersion":"Windows 7","layoutEngineBuild":"20100101","layoutEngineNameVersionMajor":"Gecko 12","operatingSystemName":"Windows NT","agentVersionMajor":"12","layoutEngineVersionMajor":"12","deviceClass":"Desktop","agentNameVersionMajor":"Firefox 12","operatingSystemNameVersionMajor":"Windows 7","deviceCpuBits":"64","operatingSystemClass":"Desktop","layoutEngineName":"Gecko","agentName":"Firefox","agentVersion":"12.0","layoutEngineClass":"Browser","agentNameVersion":"Firefox 12.0","operatingSystemVersion":"7","deviceCpu":"Intel x86_64","agentClass":"Browser","layoutEngineVersion":"12.0"}}]}""".noSpaces + ) } class YauaaEnrichmentSpec extends PipelineSpec { diff --git a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/outputs/EnrichedEventSpec.scala b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/outputs/EnrichedEventSpec.scala index e36a02b43..74cd42d4b 100644 --- a/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/outputs/EnrichedEventSpec.scala +++ b/modules/common/src/test/scala/com.snowplowanalytics.snowplow.enrich.common/outputs/EnrichedEventSpec.scala @@ -171,7 +171,6 @@ class EnrichedEventSpec extends Specification { testField(_.event_version = "event_version", _.event_version) testField(_.event_fingerprint = "event_fingerprint", _.event_fingerprint) testField(_.true_tstamp = "true_tstamp", _.true_tstamp) - } } diff --git a/modules/integration-tests/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaIntegrationSpec.scala b/modules/integration-tests/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaIntegrationSpec.scala deleted file mode 100644 index 08eaaf015..000000000 --- a/modules/integration-tests/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaIntegrationSpec.scala +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, and - * you may not use this file except in compliance with the Apache License - * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the Apache License Version 2.0 is distributed on an "AS - * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ - -package com.snowplowanalytics.snowplow.enrich.stream - -import java.time.{Duration => JDuration} -import java.util.Properties -import java.util.concurrent.ForkJoinPool - -import scala.concurrent._ -import scala.concurrent.duration.Duration -import scala.util.Try -import scala.collection.JavaConverters._ - -import cats.Id -import com.snowplowanalytics.iglu.client.Client -import com.snowplowanalytics.snowplow.badrows.Processor -import com.snowplowanalytics.snowplow.enrich.common.adapters.AdapterRegistry -import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry -import com.snowplowanalytics.snowplow.scalatracker.Tracker -import org.apache.kafka.clients.consumer.{ConsumerRecords, KafkaConsumer} -import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} -import org.specs2.matcher.{TraversableMatchers, TryMatchers} -import io.circe.Json - -import model.StreamsConfig - -/* - * Extending this trait creates a new integration test with a new instance of kafka - * See PiiEmitSpec for an example of how to use it - */ -trait KafkaIntegrationSpec extends TryMatchers with TraversableMatchers { - - import KafkaIntegrationSpecValues._ - implicit val ec = ExecutionContext.fromExecutor(new ForkJoinPool(16)) - val kafkaTopics = Set(testGoodIn, testGood, testBad, testPii) - - def expectedGood: Int - def expectedBad: Int - def expectedPii: Int - - def inputGood: List[Array[Byte]] - - def getMainApplicationFuture( - configuration: StreamsConfig, - client: Client[Id, Json], - adapterRegistry: AdapterRegistry, - registry: EnrichmentRegistry[Id], - tracker: Option[Tracker[Id]] - ): Future[Unit] = - Future { - val p = Processor("test", "1.0.0") - KafkaEnrich - .getSource(configuration, None, client, adapterRegistry, registry, tracker, p) - .toOption - .get - .run() - } - - def producerTimeoutSec: Int - def inputProduced(address: String): Try[Unit] = - Try(Await.result(produce(address: String), Duration(s"$producerTimeoutSec sec"))) - def testKafkaPropertiesProducer(address: String) = { - val props = new Properties() - props.put("bootstrap.servers", address) - props.put("client.id", "producer-george") - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer") - props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer") - props - } - def produce(address: String): Future[Unit] = - Future { - val testProducer = new KafkaProducer[String, Array[Byte]](testKafkaPropertiesProducer(address)) - val events = inputGood - events.foreach { r => - testProducer.send(new ProducerRecord(testGoodIn, "key", r)) - } - testProducer.flush - testProducer.close - } - private def getListOfRecords(cr: ConsumerRecords[String, String]): List[String] = - cr.asScala.map(_.value).toList - - val POLL_TIME_MSEC = 100L - - def getRecords( - topic: String, - expectedRecords: Int, - timeoutSec: Int, - address: String - ): Future[List[String]] = - Future { - val started = System.currentTimeMillis - val testKafkaPropertiesConsumer = { - val props = new Properties() - props.put("bootstrap.servers", address) - props.put("auto.offset.reset", "earliest") - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") - props - .put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") - props.put("group.id", s"consumer-$topic") - props - } - val testConsumerPii = new KafkaConsumer[String, String](testKafkaPropertiesConsumer) - testConsumerPii.subscribe(List(topic).asJava) - var records = getListOfRecords(testConsumerPii.poll(JDuration.ofMillis(POLL_TIME_MSEC))) - while (((System.currentTimeMillis - started) / 1000 < timeoutSec - 1) && records.size < expectedRecords) - records = records ++ getListOfRecords( - testConsumerPii.poll(JDuration.ofMillis(POLL_TIME_MSEC)) - ) - testConsumerPii.close() - records - } - - def consumerExecutionTimeoutSec: Int - def producedBadRecords(address: String): Future[List[String]] = - getRecords(testBad, expectedBad, consumerExecutionTimeoutSec, address) - def producedGoodRecords(address: String): Future[List[String]] = - getRecords(testGood, expectedGood, consumerExecutionTimeoutSec, address) - def producedPiiRecords(address: String): Future[List[String]] = - getRecords(testPii, expectedPii, consumerExecutionTimeoutSec, address) - def allResults(address: String): Future[(List[String], List[String], List[String])] = - for { - good <- producedGoodRecords(address) - bad <- producedBadRecords(address) - pii <- producedPiiRecords(address) - } yield (good, bad, pii) - -} - -object KafkaIntegrationSpecValues { - val (testGoodIn, testGood, testBad, testPii) = - ("testGoodIn", "testEnrichedGood", "testEnrichedBad", "testEnrichedUglyPii") -} diff --git a/modules/integration-tests/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaTestUtils.scala b/modules/integration-tests/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaTestUtils.scala deleted file mode 100644 index 6ae18b881..000000000 --- a/modules/integration-tests/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/KafkaTestUtils.scala +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, and - * you may not use this file except in compliance with the Apache License - * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the Apache License Version 2.0 is distributed on an "AS - * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ - -package com.snowplowanalytics.snowplow.enrich.stream - -import java.io.File -import java.net.InetSocketAddress -import java.util.Properties - -import scala.collection.JavaConverters._ -import scala.util.Random - -import kafka.server.{KafkaConfig, KafkaServerStartable} -import org.apache.zookeeper.server.{NIOServerCnxnFactory, ZooKeeperServer} -import org.apache.kafka.clients.admin.AdminClient -import org.apache.kafka.clients.admin.NewTopic - -class KafkaTestUtils { - // zk - private val zkHost = "localhost" - private val zkPort = 2181 - private var zk: EmbeddedZookeeper = _ - private var zkReady = false - - // kafka - private val brokerHost = "localhost" - private val brokerPort = 9092 - private var kafkaServer: KafkaServerStartable = _ - private var adminClient: AdminClient = _ - private var topicCountMap = Map.empty[String, Int] - private var brokerReady = false - - /** Zookeeper address */ - def zkAddress: String = { - assert(zkReady, "Zk not ready, cannot get address") - s"$zkHost:$zkPort" - } - - /** Kafka broker address */ - def brokerAddress: String = { - assert(brokerReady, "Broker not ready, cannot get address") - s"$brokerHost:$brokerPort" - } - - /** Start the Zookeeper and Kafka servers */ - def setup(): Unit = { - setupEmbeddedZookeeper() - setupEmbeddedKafkaServer() - } - - private def setupEmbeddedZookeeper(): Unit = { - zk = new EmbeddedZookeeper(zkHost, zkPort) - zkReady = true - } - - private def setupEmbeddedKafkaServer(): Unit = { - assert(zkReady, "Zk should be setup beforehand") - val kafkaConfig = new KafkaConfig(brokerProps) - kafkaServer = new KafkaServerStartable(kafkaConfig) - kafkaServer.startup() - brokerReady = true - val adminProps = { - val props = new Properties() - props.put("bootstrap.servers", brokerAddress) - props - } - adminClient = AdminClient.create(adminProps) - } - - /** Close the Kafka as well as the Zookeeper client and server */ - def tearDown(): Unit = { - brokerReady = false - zkReady = false - - if (adminClient != null) { - adminClient.close() - adminClient = null - } - - if (kafkaServer != null) { - kafkaServer.shutdown() - kafkaServer = null - } - - if (zk != null) { - zk.shutdown() - zk = null - } - - topicCountMap = Map.empty - } - - /** Create one or more topics */ - @scala.annotation.varargs - def createTopics(topics: String*): Unit = - for (topic <- topics) { - adminClient.createTopics(List(new NewTopic(topic, 1, 1)).asJava) - Thread.sleep(1000) - topicCountMap = topicCountMap + (topic -> 1) - } - - private def brokerProps: Properties = { - val props = new Properties - props.put("broker.id", "0") - props.put("host.name", brokerHost) - props.put("offsets.topic.replication.factor", "1") - props.put( - "log.dir", { - val dir = System.getProperty("java.io.tmpdir") + - "/logDir-" + new Random().nextInt(Int.MaxValue) - val f = new File(dir) - f.mkdirs() - dir - } - ) - props.put("port", brokerPort.toString) - props.put("zookeeper.connect", zkAddress) - props.put("zookeeper.connection.timeout.ms", "10000") - props - } - - private class EmbeddedZookeeper(hostname: String, port: Int) { - private val snapshotDir = { - val f = new File( - System.getProperty("java.io.tmpdir"), - "snapshotDir-" + Random.nextInt(Int.MaxValue) - ) - f.mkdirs() - f - } - private val logDir = { - val f = - new File(System.getProperty("java.io.tmpdir"), "logDir-" + Random.nextInt(Int.MaxValue)) - f.mkdirs() - f - } - - private val factory = { - val zkTickTime = 500 - val zk = new ZooKeeperServer(snapshotDir, logDir, zkTickTime) - val f = new NIOServerCnxnFactory - val maxCnxn = 16 - f.configure(new InetSocketAddress(hostname, port), maxCnxn) - f.startup(zk) - f - } - - def shutdown(): Unit = { - factory.shutdown() - snapshotDir.delete() - logDir.delete() - () - } - } -} diff --git a/modules/integration-tests/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/PiiEmitSpec.scala b/modules/integration-tests/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/PiiEmitSpec.scala deleted file mode 100644 index d058015a7..000000000 --- a/modules/integration-tests/src/test/scala/com.snowplowanalytics.snowplow.enrich.stream/PiiEmitSpec.scala +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Copyright (c) 2013-2020 Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Apache License Version 2.0, and - * you may not use this file except in compliance with the Apache License - * Version 2.0. You may obtain a copy of the Apache License Version 2.0 at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the Apache License Version 2.0 is distributed on an "AS - * IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the Apache License Version 2.0 for the specific language - * governing permissions and limitations there under. - */ -package com.snowplowanalytics.snowplow.enrich.stream - -import java.util.regex.Pattern -import java.util.concurrent.TimeUnit - -import scala.util.{Failure, Success, Try} -import scala.collection.JavaConverters._ -import scala.concurrent.duration.FiniteDuration -import scala.io.Source -import com.hubspot.jinjava.Jinjava -import com.typesafe.config.ConfigFactory -import org.apache.commons.codec.binary.Base64 -import org.specs2.concurrent.ExecutionEnv -import org.specs2.mutable.Specification -import org.specs2.matcher.{FutureMatchers, Matcher} -import org.specs2.specification.BeforeAfterAll -import pureconfig._ -import pureconfig.generic.auto._ -import pureconfig.generic.{FieldCoproductHint, ProductHint} -import good._ -import model.{StreamsConfig, TargetPlatformConfig} - -class PiiEmitSpec(implicit ee: ExecutionEnv) extends Specification with FutureMatchers with KafkaIntegrationSpec with BeforeAfterAll { - - var ktu: KafkaTestUtils = _ - override def beforeAll(): Unit = { - ktu = new KafkaTestUtils - ktu.setup() - ktu.createTopics(kafkaTopics.toList: _*) - } - override def afterAll(): Unit = - if (ktu != null) - ktu = null - - import KafkaIntegrationSpecValues._ - - def configValues = - Map( - "sinkType" -> "kafka", - "streamsInRaw" -> s"$testGoodIn", - "outEnriched" -> s"$testGood", - "outPii" -> s"$testPii", - "outBad" -> s"$testBad", - "partitionKeyName" -> "\"\"", - "kafkaBrokers" -> ktu.brokerAddress, - "bufferTimeThreshold" -> "1", - "bufferRecordThreshold" -> "1", - "bufferByteThreshold" -> "100000", - "enrichAppName" -> "Jim", - "enrichStreamsOutMaxBackoff" -> "1000", - "enrichStreamsOutMinBackoff" -> "1000", - "appName" -> "jim" - ) - - def config: String = - Try { - val configRes = getClass.getResourceAsStream("/config.hocon.sample") - Source.fromInputStream(configRes).getLines.mkString("\n") - } match { - case Failure(t) => - println(s"Unable to get config.hocon.sample: $t"); throw new Exception(t) - case Success(s) => s - } - - def configInstance: String = { - val jinJava = new Jinjava() - jinJava.render(config, configValues.asJava) - } - - private def decode(s: String): Array[Byte] = Base64.decodeBase64(s) - - // Input - override val inputGood = List( - decode(PagePingWithContextSpec.raw), - decode(PageViewWithContextSpec.raw), - decode(StructEventSpec.raw), - decode(StructEventWithContextSpec.raw), - decode(TransactionItemSpec.raw), - decode(TransactionSpec.raw) - ) - // Expected output counts - override val (expectedGood, expectedBad, expectedPii) = (inputGood.size, 0, inputGood.size) - - // Timeout for the producer - override val producerTimeoutSec = 5 - - // Timeout for all the consumers (good, bad, and pii) (running in parallel) - // You may want to adjust this if you are doing lots of slow work in the app - // Ordinarily the consumers return in less than 1 sec - override val consumerExecutionTimeoutSec = 15 - - implicit def hint[T]: ProductHint[T] = - ProductHint[T](ConfigFieldMapping(CamelCase, CamelCase)) - implicit val _: FieldCoproductHint[TargetPlatformConfig] = - new FieldCoproductHint[TargetPlatformConfig]("enabled") - - "Pii" should { - "emit all events" in { - - val parsedConfig = ConfigFactory.parseString(configInstance).resolve() - val configObject = Try { - loadConfigOrThrow[StreamsConfig](parsedConfig.getConfig("enrich.streams")) - } - configObject aka "enrichment config loading" must not beAFailedTry - - getMainApplicationFuture( - configObject.get, - SpecHelpers.client, - SpecHelpers.adapterRegistry, - SpecHelpers.enrichmentRegistry, - None - ) - inputProduced(ktu.brokerAddress) aka "sending input" must beSuccessfulTry - - def spaceJoinResult(expected: List[StringOrRegex]) = - expected - .flatMap({ - case JustRegex(r) => Some(r.toString) - case JustString(s) if s.nonEmpty => Some(Pattern.quote(s)) - case _ => None - }) - .mkString("\\s*") - - val expectedMatcher: Matcher[(List[String], List[String], List[String])] = beLike { - case (good: List[String], bad: List[String], pii: List[String]) => - bad aka "bad result list" must have size expectedBad - pii aka "pii result list" must have size expectedPii - good aka "good result list" must have size expectedGood - good aka "good result list" must containMatch( - spaceJoinResult(PagePingWithContextSpec.expected) - ) - pii aka "pii result list" must containMatch(spaceJoinResult(PagePingWithContextSpec.pii)) - good aka "good result list" must containMatch( - spaceJoinResult(PageViewWithContextSpec.expected) - ) - pii aka "pii result list" must containMatch(spaceJoinResult(PageViewWithContextSpec.pii)) - good aka "good result list" must containMatch(spaceJoinResult(StructEventSpec.expected)) - pii aka "pii result list" must containMatch(spaceJoinResult(StructEventSpec.pii)) - good aka "good result list" must containMatch( - spaceJoinResult(StructEventWithContextSpec.expected) - ) - pii aka "pii result list" must containMatch( - spaceJoinResult(StructEventWithContextSpec.pii) - ) - good aka "good result list" must containMatch( - spaceJoinResult(TransactionItemSpec.expected) - ) - pii aka "pii result list" must containMatch(spaceJoinResult(TransactionItemSpec.pii)) - good aka "good result list" must containMatch(spaceJoinResult(TransactionSpec.expected)) - pii aka "pii result list" must containMatch(spaceJoinResult(TransactionSpec.pii)) - } - allResults(ktu.brokerAddress) must expectedMatcher.await( - retries = 0, - timeout = FiniteDuration(consumerExecutionTimeoutSec.toLong, TimeUnit.SECONDS) - ) - } - } -} diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sinks/KafkaSink.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sinks/KafkaSink.scala index 889911594..b3175b060 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sinks/KafkaSink.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sinks/KafkaSink.scala @@ -22,8 +22,6 @@ package sinks import java.util.Properties -import scala.collection.JavaConverters._ - import cats.syntax.either._ import org.apache.kafka.clients.producer._ @@ -41,7 +39,7 @@ object KafkaSink { */ private def createProducer(kafkaConfig: Kafka, bufferConfig: BufferConfig): KafkaProducer[String, String] = { val properties = createProperties(kafkaConfig, bufferConfig) - properties.putAll(kafkaConfig.producerConf.getOrElse(Map()).asJava) + kafkaConfig.producerConf.getOrElse(Map()).foreach { case (k, v) => properties.setProperty(k, v) } new KafkaProducer[String, String](properties) } diff --git a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KafkaSource.scala b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KafkaSource.scala index 1f6fe48d7..214754dfe 100644 --- a/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KafkaSource.scala +++ b/modules/kafka/src/main/scala/com.snowplowanalytics.snowplow.enrich.stream/sources/KafkaSource.scala @@ -133,7 +133,7 @@ class KafkaSource private ( private def createConsumer(brokers: String, groupId: String): KafkaConsumer[String, Array[Byte]] = { val properties = createProperties(brokers, groupId) - properties.putAll(kafkaConfig.consumerConf.getOrElse(Map()).asJava) + kafkaConfig.consumerConf.getOrElse(Map()).foreach { case (k, v) => properties.setProperty(k, v) } new KafkaConsumer[String, Array[Byte]](properties) } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index 21f92c32b..f4ac99775 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -37,7 +37,7 @@ object BuildSettings { organization := "com.snowplowanalytics", scalaVersion := "2.12.11", version := "1.3.2", - javacOptions := Seq("-source", "1.8", "-target", "1.8"), + javacOptions := Seq("-source", "11", "-target", "11"), resolvers ++= Dependencies.resolutionRepos )