diff --git a/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie b/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie index ada02b3cf70..9c5de945e20 100644 --- a/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie +++ b/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie @@ -122,6 +122,9 @@ 2 akka.http.scaladsl.* 0 akka.http.scaladsl.Http2Ext 0 akka.http.scaladsl.HttpExt +0 akka.http.scaladsl.server.ExceptionHandler$ +0 akka.http.scaladsl.common.StrictForm$ +0 akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage$$anon$* # saves ~0.1s skipping ~407 classes 2 akka.stream.* 0 akka.stream.impl.FanIn$SubInput @@ -133,6 +136,8 @@ 0 akka.http.javadsl.model.HttpHeader 0 akka.http.scaladsl.model.HttpRequest 0 akka.http.scaladsl.model.headers.* +0 akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport +0 akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport$class 0 akka.http.scaladsl.unmarshalling.* 0 akka.http.scaladsl.server.PathMatcher$Matched 0 akka.http.scaladsl.server.directives.ParameterDirectives$class @@ -142,6 +147,7 @@ 0 akka.http.scaladsl.server.directives.FormFieldDirectives$class 0 akka.http.scaladsl.server.directives.FormFieldDirectives 0 akka.http.scaladsl.model.Uri +0 akka.http.scaladsl.model.Multipart$FormData 0 akka.http.scaladsl.model.FormData 0 akka.http.scaladsl.server.RequestContextImpl 0 akka.http.scaladsl.server.directives.CookieDirectives$class diff --git a/dd-java-agent/instrumentation/akka-http-10.0/build.gradle b/dd-java-agent/instrumentation/akka-http-10.0/build.gradle index 9da1132327d..a63f2c4c764 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/build.gradle +++ b/dd-java-agent/instrumentation/akka-http-10.0/build.gradle @@ -11,7 +11,10 @@ apply plugin: 'call-site-instrumentation' // we put the test classes in the baseTest test set so that the scala // version is not inherited addTestSuite('baseTest') +addTestSuiteExtendingForDir('baseForkedTest', 'baseTest', 'baseTest') addTestSuiteForDir('version101Test', 'baseTest') +addTestSuiteExtendingForDir('version101ForkedTest', 'version101Test', 'baseTest') +addTestSuiteForDir('version102Scala213Test', 'latestDepTest') addTestSuite('latestDepTest') addTestSuite('lagomTest') addTestSuite('iastTest') @@ -87,6 +90,9 @@ artifacts { } sourceSets { + version102Scala213Test.groovy.srcDir sourceSets.baseTest.groovy + version102Scala213Test.scala.srcDir sourceSets.baseTest.scala + latestDepTest.groovy.srcDir sourceSets.baseTest.groovy latestDepTest.scala.srcDir sourceSets.baseTest.scala } @@ -107,8 +113,9 @@ dependencies { // First 10.0.x version with a convenient way to test http2 support baseTestImplementation group: 'com.typesafe.akka', name: 'akka-http_2.11', version: '10.0.10' baseTestImplementation group: 'com.typesafe.akka', name: 'akka-http2-support_2.11', version: '10.0.10' + baseTestImplementation group: 'com.typesafe.akka', name: 'akka-http-jackson_2.11', version: '10.0.10' + baseTestImplementation group: 'com.typesafe.akka', name: 'akka-http-spray-json_2.11', version: '10.0.10' - iastTestImplementation group: 'com.typesafe.akka', name: 'akka-http-jackson_2.11', version: '10.0.10' iastTestImplementation(testFixtures(project(':dd-java-agent:agent-iast'))) iastTestCompileOnly group: 'de.thetaphi', name: 'forbiddenapis', version: '3.4' iastTestRuntimeOnly project(':dd-java-agent:instrumentation:jackson-core') @@ -120,8 +127,9 @@ dependencies { version101TestImplementation group: 'com.typesafe.akka', name: 'akka-http_2.12', version: '10.1.+' version101TestImplementation group: 'com.typesafe.akka', name: 'akka-http2-support_2.12', version: '10.1.+' version101TestImplementation group: 'com.typesafe.akka', name: 'akka-stream_2.12', version: '2.5.+' + version101TestImplementation group: 'com.typesafe.akka', name: 'akka-http-jackson_2.12', version: '10.1.+' + version101TestImplementation group: 'com.typesafe.akka', name: 'akka-http-spray-json_2.12', version: '10.1.+' - version101IastTestImplementation group: 'com.typesafe.akka', name: 'akka-http-jackson_2.12', version: '10.1.+' version101IastTestImplementation(testFixtures(project(':dd-java-agent:agent-iast'))) version102IastTestImplementation deps.scala212 @@ -130,10 +138,18 @@ dependencies { version102IastTestImplementation group: 'com.typesafe.akka', name: 'akka-http-jackson_2.12', version: '10.2.+' version102IastTestImplementation(testFixtures(project(':dd-java-agent:agent-iast'))) + version102Scala213TestImplementation deps.scala213 + version102Scala213TestImplementation group: 'com.typesafe.akka', name: 'akka-http_2.13', version: '10.2.+' + version102Scala213TestImplementation group: 'com.typesafe.akka', name: 'akka-stream_2.13', version: '2.6.+' + version102Scala213TestImplementation group: 'com.typesafe.akka', name: 'akka-http-jackson_2.13', version: '10.2.+' + version102Scala213TestImplementation group: 'com.typesafe.akka', name: 'akka-http-spray-json_2.13', version: '10.2.+' + latestDepTestImplementation deps.scala213 - latestDepTestImplementation group: 'com.typesafe.akka', name: 'akka-http_2.13', version: '10.2.+' + latestDepTestImplementation group: 'com.typesafe.akka', name: 'akka-http_2.13', version: '10.5.+' // http2 support is included in akka-http since 10.2.x - latestDepTestImplementation group: 'com.typesafe.akka', name: 'akka-stream_2.13', version: '2.6.+' + latestDepTestImplementation group: 'com.typesafe.akka', name: 'akka-stream_2.13', version: '2.7.0' + latestDepTestImplementation group: 'com.typesafe.akka', name: 'akka-http-jackson_2.13', version: '10.5.+' + latestDepTestImplementation group: 'com.typesafe.akka', name: 'akka-http-spray-json_2.13', version: '10.5.+' // TODO: test with Scala 3 latestDepIastTestImplementation deps.scala213 @@ -152,20 +168,12 @@ dependencies { lagomTestImplementation group: 'com.lightbend.lagom', name: 'lagom-javadsl-testkit_2.11', version: '1.4.0' } -tasks.named("test").configure { - dependsOn "baseTest" - dependsOn "version101Test" - dependsOn "lagomTest" - dependsOn "iastTest" - dependsOn "version101IastTest" - dependsOn "version102IastTest" -} - -tasks.named('latestDepTest').configure { - dependsOn "latestDepIastTest" +compileBaseTestGroovy { + classpath = classpath.plus(files(compileBaseTestScala.destinationDirectory)) + dependsOn "compileBaseTestScala" } -compileBaseTestGroovy { +compileBaseForkedTestGroovy { classpath = classpath.plus(files(compileBaseTestScala.destinationDirectory)) dependsOn "compileBaseTestScala" } @@ -175,6 +183,16 @@ compileVersion101TestGroovy { dependsOn "compileVersion101TestScala" } +compileVersion101ForkedTestGroovy { + classpath = classpath.plus(files(compileVersion101TestScala.destinationDirectory)) + dependsOn "compileVersion101TestScala" +} + +compileVersion102Scala213TestGroovy { + classpath = classpath.plus(files(compileVersion102Scala213TestScala.destinationDirectory)) + dependsOn "compileVersion102Scala213TestScala" +} + compileLatestDepTestGroovy { classpath = classpath.plus(files(compileLatestDepTestScala.destinationDirectory)) dependsOn "compileLatestDepTestScala" diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/baseTest/groovy/AkkaHttpClientInstrumentationTest.groovy b/dd-java-agent/instrumentation/akka-http-10.0/src/baseTest/groovy/AkkaHttpClientInstrumentationTest.groovy index 6bb82efb06b..6ae3c72deaf 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/baseTest/groovy/AkkaHttpClientInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/baseTest/groovy/AkkaHttpClientInstrumentationTest.groovy @@ -88,21 +88,18 @@ abstract class AkkaHttpClientInstrumentationTest extends HttpClientTest { span { parent() operationName operation() - resourceName "akka-http.client.request" + resourceName operation() // resource name is not set so defaults to operationName spanType DDSpanTypes.HTTP_CLIENT errored true tags { "$Tags.COMPONENT" "akka-http-client" "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT errorTags(exception) - defaultTags() + defaultTags(false, false) } } } } - - where: - renameService << [false, true] } } diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/baseTest/groovy/AkkaHttpServerInstrumentationTest.groovy b/dd-java-agent/instrumentation/akka-http-10.0/src/baseTest/groovy/AkkaHttpServerInstrumentationTest.groovy index 4b4160930e1..762b34a863d 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/baseTest/groovy/AkkaHttpServerInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/baseTest/groovy/AkkaHttpServerInstrumentationTest.groovy @@ -3,11 +3,18 @@ import datadog.trace.agent.test.base.HttpServerTest import datadog.trace.agent.test.naming.TestingGenericHttpNamingConventions import datadog.trace.agent.test.utils.ThreadUtils import datadog.trace.instrumentation.akkahttp.AkkaHttpServerDecorator +import okhttp3.HttpUrl +import okhttp3.MultipartBody import okhttp3.Request +import okhttp3.RequestBody import spock.lang.Shared import java.util.concurrent.atomic.AtomicInteger +import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.BODY_JSON +import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.BODY_MULTIPART +import static org.junit.Assume.assumeTrue + abstract class AkkaHttpServerInstrumentationTest extends HttpServerTest { @Override @@ -35,11 +42,51 @@ abstract class AkkaHttpServerInstrumentationTest extends HttpServerTest ActorSystem(name) case Some(config) => ActorSystem(name, config) } } - implicit val materializer = ActorMaterializer() + implicit val materializer: ActorMaterializer = ActorMaterializer() private var port: Int = 0 - private var portBinding: Future[ServerBinding] = null + private var portBinding: Future[ServerBinding] = _ override def start(): Unit = { portBinding = Await.ready(binder.bind(0), 10 seconds) @@ -61,8 +71,8 @@ object AkkaHttpTestWebServer { def config: Option[Config] = None def bind(port: Int)( - implicit system: ActorSystem, - materializer: Materializer + implicit system: ActorSystem, + materializer: Materializer ): Future[ServerBinding] } @@ -70,8 +80,8 @@ object AkkaHttpTestWebServer { override def name: String = "bind-and-handle" override def bind(port: Int)( - implicit system: ActorSystem, - materializer: Materializer + implicit system: ActorSystem, + materializer: Materializer ): Future[ServerBinding] = { import materializer.executionContext Http().bindAndHandle(route, "localhost", port) @@ -81,9 +91,15 @@ object AkkaHttpTestWebServer { val BindAndHandleAsyncWithRouteAsyncHandler: Binder = new Binder { override def name: String = "bind-and-handle-async-with-route-async-handler" + override def config: Option[Config] = Some( + ConfigFactory.load() + .withValue("akka.http.server.request-timeout", ConfigValueFactory.fromAnyRef("300 s")) + .withValue("akka.http.server.idle-timeout", ConfigValueFactory.fromAnyRef("300 s")) + ) + override def bind(port: Int)( - implicit system: ActorSystem, - materializer: Materializer + implicit system: ActorSystem, + materializer: Materializer ): Future[ServerBinding] = { import materializer.executionContext Http().bindAndHandleAsync(Route.asyncHandler(route), "localhost", port) @@ -94,8 +110,8 @@ object AkkaHttpTestWebServer { override def name: String = "bind-and-handle-sync" override def bind(port: Int)( - implicit system: ActorSystem, - materializer: Materializer + implicit system: ActorSystem, + materializer: Materializer ): Future[ServerBinding] = { Http().bindAndHandleSync(syncHandler, "localhost", port) } @@ -105,8 +121,8 @@ object AkkaHttpTestWebServer { override def name: String = "bind-and-handle-async" override def bind(port: Int)( - implicit system: ActorSystem, - materializer: Materializer + implicit system: ActorSystem, + materializer: Materializer ): Future[ServerBinding] = { import materializer.executionContext Http().bindAndHandleAsync(asyncHandler, "localhost", port) @@ -117,8 +133,8 @@ object AkkaHttpTestWebServer { override def name: String = "bind-and-handle-async-http2" override def bind(port: Int)( - implicit system: ActorSystem, - materializer: Materializer + implicit system: ActorSystem, + materializer: Materializer ): Future[ServerBinding] = { import materializer.executionContext val serverSettings = enableHttp2(ServerSettings(system)) @@ -134,7 +150,7 @@ object AkkaHttpTestWebServer { // This part defines the routes using the Scala routing DSL // ---------------------------------------------------------------------- // private val exceptionHandler = ExceptionHandler { - case e: Exception => + case e : Exception if !e.isInstanceOf[BlockingException] => val span = activeSpan() TraceUtils.handleException(span, e) complete( @@ -170,50 +186,140 @@ object AkkaHttpTestWebServer { private val defaultHeader = RawHeader(HttpServerTest.getIG_RESPONSE_HEADER, HttpServerTest.getIG_RESPONSE_HEADER_VALUE) + // force a rejection due to BlockingException to throw so that the error + // can be recorded in the span + private val blockingRejectionHandler: RejectionHandler = RejectionHandler.newBuilder() + .handle({ + case MalformedRequestContentRejection(_, cause: BlockingException) => + throw cause + }).result() + def route(implicit ec: ExecutionContext): Route = withController { - respondWithDefaultHeader(defaultHeader) { - get { - path(SUCCESS.relativePath) { - complete( - HttpResponse(status = SUCCESS.getStatus, entity = SUCCESS.getBody) - ) - } ~ path(FORWARDED.relativePath) { - headerValueByName("x-forwarded-for") { address => + handleRejections(blockingRejectionHandler) { + respondWithDefaultHeader(defaultHeader) { + get { + path(SUCCESS.relativePath) { complete( - HttpResponse(status = FORWARDED.getStatus, entity = address) + HttpResponse(status = SUCCESS.getStatus, entity = SUCCESS.getBody) ) - } - } ~ path( - QUERY_PARAM.relativePath | QUERY_ENCODED_BOTH.relativePath | QUERY_ENCODED_QUERY.relativePath - ) { - parameter("some") { query => - complete( - HttpResponse( - status = QUERY_PARAM.getStatus, - entity = s"some=$query" + } ~ path(FORWARDED.relativePath) { + headerValueByName("x-forwarded-for") { address => + complete( + HttpResponse(status = FORWARDED.getStatus, entity = address) + ) + } + } ~ path( + QUERY_PARAM.relativePath | QUERY_ENCODED_BOTH.relativePath | QUERY_ENCODED_QUERY.relativePath + ) { + parameter("some") { query => + complete( + HttpResponse( + status = QUERY_PARAM.getStatus, + entity = s"some=$query" + ) ) + } + } ~ path(REDIRECT.relativePath) { + redirect(Uri(REDIRECT.getBody), StatusCodes.Found) + } ~ path(USER_BLOCK.relativePath) { + Blocking.forUser("user-to-block").blockIfMatch() + complete( + HttpResponse(status = SUCCESS.getStatus, entity = "Should not be reached") ) - } - } ~ path(REDIRECT.relativePath) { - redirect(Uri(REDIRECT.getBody), StatusCodes.Found) - } ~ path(ERROR.relativePath) { - complete(HttpResponse(status = ERROR.getStatus, entity = ERROR.getBody)) - } ~ path(EXCEPTION.relativePath) { - throw new Exception(EXCEPTION.getBody) - } ~ pathPrefix("injected-id") { - path("ping" / IntNumber) { id => - val traceId = AgentTracer.activeSpan().getTraceId - complete(s"pong $id -> $traceId") - } ~ path("fing" / IntNumber) { id => - // force the response to happen on another thread or in another context - onSuccess(Future { - Thread.sleep(10); - id - }) { fid => + } ~ path(ERROR.relativePath) { + complete(HttpResponse(status = ERROR.getStatus, entity = ERROR.getBody)) + } ~ path(EXCEPTION.relativePath) { + throw new Exception(EXCEPTION.getBody) + } ~ pathPrefix("injected-id") { + path("ping" / IntNumber) { id => val traceId = AgentTracer.activeSpan().getTraceId - complete(s"fong $fid -> $traceId") + complete(s"pong $id -> $traceId") + } ~ path("fing" / IntNumber) { id => + // force the response to happen on another thread or in another context + onSuccess(Future { + Thread.sleep(10) + id + }) { fid => + val traceId = AgentTracer.activeSpan().getTraceId + complete(s"fong $fid -> $traceId") + } } + } ~ path(USER_BLOCK.relativePath()) { + Blocking.forUser("user-to-block").blockIfMatch() + complete(HttpResponse(status = 200, entity = "should never be reached")) } + } ~ post { + path(CREATED.relativePath()) { + entity(as[String]) { s => + complete( + HttpResponse( + status = CREATED.getStatus, + entity = s"created: $s" + ) + ) + } + } ~ + path(BODY_URLENCODED.relativePath()) { + formFieldMultiMap { m => + complete( + HttpResponse( + status = BODY_URLENCODED.getStatus, + entity = m.toStringAsGroovy + ) + ) + } + } ~ + path(BODY_JSON.relativePath()) { + parameter(Symbol("variant") ?) { + case Some("spray") => + entity(Unmarshaller.messageUnmarshallerFromEntityUnmarshaller(sprayMapUnmarshaller)) { m => + complete( + HttpResponse( + status = BODY_JSON.getStatus, + entity = SprayMapFormat.write(m).compactPrint + ) + ) + } + case _ => // jackson + entity(Unmarshaller.messageUnmarshallerFromEntityUnmarshaller(jacksonMapUnmarshaller)) { m => + complete( + HttpResponse( + status = BODY_JSON.getStatus, + entity = SprayMapFormat.write(m).compactPrint + ) + ) + } + } + } ~ + path(BODY_MULTIPART.relativePath()) { + parameter(Symbol("variant") ?) { + case Some("strictUnmarshaller") => + entity(as[Multipart.FormData.Strict]) { formData => + val m = formData.strictParts + .groupBy(_.name) + .mapValues( + _.map((bp: BodyPart.Strict) => + bp.entity.data.utf8String + ).toList + ) + complete( + HttpResponse( + status = BODY_MULTIPART.getStatus, + entity = m.toStringAsGroovy + ) + ) + } + case _ => + formFieldMultiMap { m => + complete( + HttpResponse( + status = BODY_MULTIPART.getStatus, + entity = m.toStringAsGroovy + ) + ) + } + } + } } } } @@ -223,8 +329,8 @@ object AkkaHttpTestWebServer { // ---------------------------------------------------------------------- // val syncHandler: HttpRequest => HttpResponse = { - case HttpRequest(GET, uri: Uri, _, _, _) => { - val path = uri.path.toString() + case HttpRequest(GET, uri: Uri, _, _, _) => + val path = uri.path.toString() val endpoint = HttpServerTest.ServerEndpoint.forPath(path) HttpServerTest .controller( @@ -233,20 +339,25 @@ object AkkaHttpTestWebServer { def doCall(): HttpResponse = { val resp = HttpResponse(status = endpoint.getStatus) endpoint match { - case SUCCESS => resp.withEntity(endpoint.getBody) + case SUCCESS => resp.withEntity(endpoint.getBody) case FORWARDED => resp.withEntity(endpoint.getBody) // cheating case QUERY_PARAM | QUERY_ENCODED_BOTH | QUERY_ENCODED_QUERY => resp.withEntity(uri.queryString().orNull) case REDIRECT => resp.withHeaders(headers.Location(endpoint.getBody)) - case ERROR => resp.withEntity(endpoint.getBody) + case ERROR => resp.withEntity(endpoint.getBody) case EXCEPTION => throw new Exception(endpoint.getBody) + case USER_BLOCK => { + Blocking.forUser("user-to-block").blockIfMatch() + // should never be output: + resp.withEntity("should never be reached") + } case _ => if (path.startsWith("/injected-id/")) { val groups = path.split('/') - if (groups.size == 4) { // The path starts with a / and has 3 segments + if (groups.length == 4) { // The path starts with a / and has 3 segments val traceId = AgentTracer.activeSpan().getTraceId - val id = groups(3).toInt + val id = groups(3).toInt groups(2) match { case "ping" => return HttpResponse(entity = s"pong $id -> $traceId") @@ -263,13 +374,57 @@ object AkkaHttpTestWebServer { } ) .withDefaultHeaders(defaultHeader) - } } def asyncHandler( - implicit ec: ExecutionContext - ): HttpRequest => Future[HttpResponse] = { request => - Future { + implicit ec: ExecutionContext, + mat: Materializer + ): HttpRequest => Future[HttpResponse] = { + case request@HttpRequest(POST, uri, _, entity, _) => + val path = request.uri.path.toString + val endpoint = HttpServerTest.ServerEndpoint.forPath(path) + + endpoint match { + case CREATED => + Unmarshal(entity).to[String].map { bodyStr => + HttpResponse(status = CREATED.getStatus) + .withEntity(s"${CREATED.getBody}: $bodyStr") + } + case BODY_MULTIPART => + uri.query().get("variant") match { + case Some("strictUnmarshaller") => + val eventualStrict = Unmarshal(entity).to[FormData.Strict] + eventualStrict.map { s => + HttpResponse(status = BODY_MULTIPART.getStatus) + .withEntity(s.toStringAsGroovy) + } + case _ => + val fd = Unmarshal(entity).to[Multipart.FormData] + val eventualStrict = fd.flatMap(_.toStrict(500 millis)) + eventualStrict.map { s => + HttpResponse(status = BODY_MULTIPART.getStatus) + .withEntity(s.toStringAsGroovy) + } + } + case BODY_URLENCODED => + val eventualData = Unmarshal(entity).to[model.FormData] + eventualData.map { d => + HttpResponse(status = BODY_URLENCODED.getStatus) + .withEntity(d.toStringAsGroovy) + } + case BODY_JSON => + val unmarshaller = uri.query().get("variant") match { + case Some("spray") => sprayMapUnmarshaller + case _ => jacksonMapUnmarshaller + } + val eventualData = Unmarshal(entity).to[Map[String, String]](unmarshaller, ec, mat) + eventualData.map { d => + HttpResponse(status = BODY_URLENCODED.getStatus) + .withEntity(SprayMapFormat.write(d).compactPrint) + } + case _ => Future.successful(HttpResponse(404)) + } + case request => Future { syncHandler(request) } } @@ -279,4 +434,70 @@ object AkkaHttpTestWebServer { serverSettings.previewServerSettings.withEnableHttp2(true) serverSettings.withPreviewServerSettings(previewServerSettings) } + + implicit class MapExtensions[A](m: Iterable[(String, A)]) { + def toStringAsGroovy: String = { + def valueToString(value: Object) : String = value match { + case seq: Seq[_] => seq.map(x => valueToString(x.asInstanceOf[Object])).mkString("[", ",", "]") + case other => other.toString + } + + m.map { case (key, value) => s"$key:${valueToString(value.asInstanceOf[Object])}" } + .mkString("[", ",", "]") + } + } + + implicit class MultipartFormDataStrictExtensions(strict: Multipart.FormData.Strict) { + def toStringAsGroovy: String = + strict.strictParts + .groupBy(_.name) + .mapValues( + _.map((bp: BodyPart.Strict) => + bp.entity.data.utf8String + ).toList + ).toStringAsGroovy + } + + implicit class FormDataExtensions(formData: model.FormData) { + def toStringAsGroovy: String = formData.fields.toMultiMap.toStringAsGroovy + } + + implicit def strictMultipartFormDataUnmarshaller: FromEntityUnmarshaller[Multipart.FormData.Strict] = { + val toStrictUnmarshaller = Unmarshaller.withMaterializer[HttpEntity, HttpEntity.Strict] { + implicit ec => + implicit mat => + entity => + entity.toStrict(1000.millis) + } + val toFormDataUnmarshaller = MultipartUnmarshallers.multipartFormDataUnmarshaller + val downcastUnmarshaller = Unmarshaller.strict[Multipart.FormData, Multipart.FormData.Strict] { + case strict: Multipart.FormData.Strict => strict + case _ => throw new RuntimeException("Expected Strict form data at this point") + } + + toStrictUnmarshaller.andThen(toFormDataUnmarshaller).andThen(downcastUnmarshaller) + } + + val jacksonMapUnmarshaller: FromEntityUnmarshaller[Map[String,String]] = { + Jackson.unmarshaller(classOf[java.util.Map[String, String]]).asScala.map( + javaMap => { + import scala.collection.JavaConverters._ + javaMap.asScala.toMap + } + ) + } + + object SprayMapFormat extends RootJsonFormat[Map[String, String]] { + def write(map: Map[String, String]): JsObject = JsObject(map.mapValues(JsString(_)).toMap) + + def read(value: JsValue): Map[String, String] = value match { + case JsObject(fields) => fields.collect { + case (k, JsString(v)) => k -> v + } + case _ => deserializationError("Expected a JSON object") + } + } + + val sprayMapUnmarshaller: FromEntityUnmarshaller[Map[String, String]] = + SprayJsonSupport.sprayJsonUnmarshaller[Map[String, String]](SprayMapFormat) } diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/latestDepTest/groovy/AkkaHttp102ServerInstrumentationTests.groovy b/dd-java-agent/instrumentation/akka-http-10.0/src/latestDepTest/groovy/AkkaHttp102ServerInstrumentationTests.groovy index 1b21641eb23..a8c4134f9f5 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/latestDepTest/groovy/AkkaHttp102ServerInstrumentationTests.groovy +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/latestDepTest/groovy/AkkaHttp102ServerInstrumentationTests.groovy @@ -24,6 +24,27 @@ class AkkaHttp102ServerInstrumentationBindSyncTest extends AkkaHttpServerInstrum HttpServer server() { return new AkkaHttpTestWebServer(AkkaHttp102TestWebServer.ServerBuilderBindSync()) } + + // we test body endpoints only on the async tests + @Override + boolean testRequestBody() { + false + } + + @Override + boolean testBodyMultipart() { + false + } + + @Override + boolean testBodyJson() { + false + } + + @Override + boolean testBodyUrlencoded() { + false + } } class AkkaHttp102ServerInstrumentationBindAsyncHttp2Test extends AkkaHttpServerInstrumentationTest { diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttp2ServerInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttp2ServerInstrumentation.java index 3c7091b6fce..e0363d1f130 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttp2ServerInstrumentation.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttp2ServerInstrumentation.java @@ -9,6 +9,8 @@ import akka.stream.Materializer; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.muzzle.Reference; +import datadog.trace.instrumentation.akkahttp.appsec.ScalaListCollectorMuzzleReferences; import net.bytebuddy.asm.Advice; import scala.Function1; import scala.concurrent.Future; @@ -38,10 +40,19 @@ public String[] helperClassNames() { packageName + ".DatadogAsyncHandlerWrapper$2", packageName + ".AkkaHttpServerHeaders", packageName + ".AkkaHttpServerDecorator", + packageName + ".RecoverFromBlockedExceptionPF", packageName + ".UriAdapter", + packageName + ".appsec.AkkaBlockResponseFunction", + packageName + ".appsec.BlockingResponseHelper", + packageName + ".appsec.ScalaListCollector", }; } + @Override + public Reference[] additionalMuzzleReferences() { + return ScalaListCollectorMuzzleReferences.additionalMuzzleReferences(); + } + @Override public void adviceTransformations(AdviceTransformation transformation) { transformation.applyAdvice( @@ -70,7 +81,7 @@ public static void enter( @Advice.Argument(value = 0, readOnly = false) Function1> handler, @Advice.Argument(value = 7) final Materializer materializer) { - handler = new DatadogAsyncHandlerWrapper(handler, materializer.executionContext()); + handler = new DatadogAsyncHandlerWrapper(handler, materializer); } } @@ -80,7 +91,7 @@ public static void enter( @Advice.Argument(value = 0, readOnly = false) Function1> handler, @Advice.Argument(value = 6) final Materializer materializer) { - handler = new DatadogAsyncHandlerWrapper(handler, materializer.executionContext()); + handler = new DatadogAsyncHandlerWrapper(handler, materializer); } } @@ -90,7 +101,7 @@ public static void enter( @Advice.Argument(value = 0, readOnly = false) Function1> handler, @Advice.Argument(value = 5) final Materializer materializer) { - handler = new DatadogAsyncHandlerWrapper(handler, materializer.executionContext()); + handler = new DatadogAsyncHandlerWrapper(handler, materializer); } } } diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java index 8b7290747ef..0d6dd4fed69 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerDecorator.java @@ -2,10 +2,12 @@ import akka.http.scaladsl.model.HttpRequest; import akka.http.scaladsl.model.HttpResponse; +import datadog.trace.api.gateway.BlockResponseFunction; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; import datadog.trace.bootstrap.instrumentation.api.URIDataAdapter; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.bootstrap.instrumentation.decorator.HttpServerDecorator; +import datadog.trace.instrumentation.akkahttp.appsec.AkkaBlockResponseFunction; public class AkkaHttpServerDecorator extends HttpServerDecorator { @@ -64,4 +66,15 @@ protected int peerPort(final HttpRequest httpRequest) { protected int status(final HttpResponse httpResponse) { return httpResponse.status().intValue(); } + + @Override + protected boolean isAppSecOnResponseSeparate() { + return true; + } + + @Override + protected BlockResponseFunction createBlockResponseFunction( + HttpRequest httpRequest, HttpRequest httpRequest2) { + return new AkkaBlockResponseFunction(httpRequest); + } } diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerInstrumentation.java index 6984362b902..363c3b26805 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerInstrumentation.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/AkkaHttpServerInstrumentation.java @@ -11,6 +11,8 @@ import akka.stream.scaladsl.Flow; import com.google.auto.service.AutoService; import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.muzzle.Reference; +import datadog.trace.instrumentation.akkahttp.appsec.ScalaListCollectorMuzzleReferences; import net.bytebuddy.asm.Advice; /** @@ -71,9 +73,18 @@ public String[] helperClassNames() { packageName + ".AkkaHttpServerHeaders", packageName + ".AkkaHttpServerDecorator", packageName + ".UriAdapter", + packageName + ".RecoverFromBlockedExceptionPF", + packageName + ".appsec.BlockingResponseHelper", + packageName + ".appsec.ScalaListCollector", + packageName + ".appsec.AkkaBlockResponseFunction", }; } + @Override + public Reference[] additionalMuzzleReferences() { + return ScalaListCollectorMuzzleReferences.additionalMuzzleReferences(); + } + @Override public void adviceTransformations(AdviceTransformation transformation) { transformation.applyAdvice( @@ -87,6 +98,7 @@ public static void enter( @Advice.Argument(value = 0, readOnly = false) Flow handler, @Advice.Argument(value = 4, readOnly = false) ServerSettings settings) { + handler = handler.asJava().recover(RecoverFromBlockedExceptionPF.INSTANCE).asScala(); final BidiFlow wrapper = BidiFlow.fromGraph(new DatadogServerRequestResponseFlowWrapper(settings)); handler = wrapper.reversed().join(handler.asJava()).asScala(); diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DatadogAsyncHandlerWrapper.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DatadogAsyncHandlerWrapper.java index c59577694d8..56f0e5c44a0 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DatadogAsyncHandlerWrapper.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DatadogAsyncHandlerWrapper.java @@ -2,52 +2,81 @@ import akka.http.scaladsl.model.HttpRequest; import akka.http.scaladsl.model.HttpResponse; +import akka.http.scaladsl.util.FastFuture$; +import akka.stream.Materializer; +import datadog.trace.api.gateway.Flow; import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.instrumentation.akkahttp.appsec.BlockingResponseHelper; import scala.Function1; -import scala.concurrent.ExecutionContext; import scala.concurrent.Future; import scala.runtime.AbstractFunction1; public class DatadogAsyncHandlerWrapper extends AbstractFunction1> { private final Function1> userHandler; - private final ExecutionContext executionContext; + private final Materializer materializer; public DatadogAsyncHandlerWrapper( final Function1> userHandler, - final ExecutionContext executionContext) { + final Materializer materializer) { this.userHandler = userHandler; - this.executionContext = executionContext; + this.materializer = materializer; } @Override public Future apply(final HttpRequest request) { final AgentScope scope = DatadogWrapperHelper.createSpan(request); - Future futureResponse = null; + AgentSpan span = scope.span(); + Future futureResponse; + + // handle blocking in the beginning of the request + Flow.Action.RequestBlockingAction rba; + if ((rba = span.getRequestBlockingAction()) != null) { + request.discardEntityBytes(materializer); + HttpResponse response = BlockingResponseHelper.maybeCreateBlockingResponse(rba, request); + span.getRequestContext().getTraceSegment().effectivelyBlocked(); + DatadogWrapperHelper.finishSpan(span, response); + return FastFuture$.MODULE$.successful().apply(response); + } + try { futureResponse = userHandler.apply(request); } catch (final Throwable t) { scope.close(); - DatadogWrapperHelper.finishSpan(scope.span(), t); + DatadogWrapperHelper.finishSpan(span, t); throw t; } + final Future wrapped = - futureResponse.transform( - new AbstractFunction1() { - @Override - public HttpResponse apply(final HttpResponse response) { - DatadogWrapperHelper.finishSpan(scope.span(), response); - return response; - } - }, - new AbstractFunction1() { - @Override - public Throwable apply(final Throwable t) { - DatadogWrapperHelper.finishSpan(scope.span(), t); - return t; - } - }, - executionContext); + futureResponse + .recoverWith( + RecoverFromBlockedExceptionPF.INSTANCE_FUTURE, materializer.executionContext()) + .transform( + new AbstractFunction1() { + @Override + public HttpResponse apply(HttpResponse response) { + // handle blocking at the middle/end of the request + HttpResponse newResponse = + BlockingResponseHelper.handleFinishForWaf(span, response); + if (newResponse != response) { + span.getRequestContext().getTraceSegment().effectivelyBlocked(); + response.entity().discardBytes(materializer); + response = newResponse; + } + + DatadogWrapperHelper.finishSpan(span, response); + return response; + } + }, + new AbstractFunction1() { + @Override + public Throwable apply(final Throwable t) { + DatadogWrapperHelper.finishSpan(span, t); + return t; + } + }, + materializer.executionContext()); scope.close(); return wrapped; } diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DatadogServerRequestResponseFlowWrapper.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DatadogServerRequestResponseFlowWrapper.java index f168a91ba62..45ce4b88c00 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DatadogServerRequestResponseFlowWrapper.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DatadogServerRequestResponseFlowWrapper.java @@ -13,7 +13,10 @@ import akka.stream.stage.AbstractOutHandler; import akka.stream.stage.GraphStage; import akka.stream.stage.GraphStageLogic; +import datadog.trace.api.gateway.RequestContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.instrumentation.akkahttp.appsec.BlockingResponseHelper; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; @@ -54,6 +57,7 @@ public GraphStageLogic createLogic(final Attributes inheritedAttributes) throws // close the span at the front of the queue when we receive the response // from the user code, since it will match up to the request for that span. final Queue scopes = new ArrayBlockingQueue<>(pipeliningLimit); + boolean[] skipNextPull = new boolean[] {false}; // This is where the request comes in from the server and TCP layer setHandler( @@ -63,6 +67,23 @@ public GraphStageLogic createLogic(final Attributes inheritedAttributes) throws public void onPush() throws Exception { final HttpRequest request = grab(requestInlet); final AgentScope scope = DatadogWrapperHelper.createSpan(request); + AgentSpan span = scope.span(); + RequestContext requestContext = span.getRequestContext(); + if (requestContext != null) { + HttpResponse response = + BlockingResponseHelper.maybeCreateBlockingResponse(span, request); + if (response != null) { + request.discardEntityBytes(materializer()); + skipNextPull[0] = true; + requestContext.getTraceSegment().effectivelyBlocked(); + emit(responseOutlet, response); + DatadogWrapperHelper.finishSpan(scope.span(), response); + pull(requestInlet); + scope.close(); + return; + } + } + scopes.add(scope); push(requestOutlet, request); // Since we haven't instrumented the akka stream state machine, we can't rely @@ -109,10 +130,18 @@ public void onDownstreamFinish() throws Exception { new AbstractInHandler() { @Override public void onPush() throws Exception { - final HttpResponse response = grab(responseInlet); + HttpResponse response = grab(responseInlet); final AgentScope scope = scopes.poll(); if (scope != null) { - DatadogWrapperHelper.finishSpan(scope.span(), response); + AgentSpan span = scope.span(); + HttpResponse newResponse = + BlockingResponseHelper.handleFinishForWaf(span, response); + if (newResponse != response) { + span.getRequestContext().getTraceSegment().effectivelyBlocked(); + response.discardEntityBytes(materializer()); + response = newResponse; + } + DatadogWrapperHelper.finishSpan(span, response); // Check if the active scope is still the scope from when the request came in, // and close it. If it's not, then it will be cleaned up actor message // processing instrumentation that drives this state machine @@ -160,7 +189,17 @@ public void onUpstreamFailure(final Throwable ex) throws Exception { new AbstractOutHandler() { @Override public void onPull() throws Exception { - pull(responseInlet); + if (isClosed(responseInlet)) { + fail(responseOutlet, new RuntimeException("Failed earlier")); + } + // condition is needed when we emit() directly to the outlet + // The value was not pushed through the response inlet, so we need not + // request more data through the inlet + if (skipNextPull[0]) { + skipNextPull[0] = false; + } else { + pull(responseInlet); + } } @Override diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DefaultExceptionHandlerInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DefaultExceptionHandlerInstrumentation.java new file mode 100644 index 00000000000..5eaabbf87dd --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/DefaultExceptionHandlerInstrumentation.java @@ -0,0 +1,51 @@ +package datadog.trace.instrumentation.akkahttp; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import akka.http.scaladsl.server.ExceptionHandler; +import akka.http.scaladsl.server.ExceptionHandler$; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import net.bytebuddy.asm.Advice; + +@AutoService(Instrumenter.class) +public class DefaultExceptionHandlerInstrumentation extends Instrumenter.AppSec + implements Instrumenter.ForSingleType { + public DefaultExceptionHandlerInstrumentation() { + super("akka-http", "akka-http-server"); + } + + @Override + public String instrumentedType() { + return "akka.http.scaladsl.server.ExceptionHandler$"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".MarkSpanAsErroredPF", + }; + } + + @Override + public void adviceTransformations(AdviceTransformation transformation) { + transformation.applyAdvice( + isMethod() + .and(returns(named("akka.http.scaladsl.server.ExceptionHandler"))) + .and(takesArguments(1)) + .and(takesArgument(0, named("akka.http.scaladsl.settings.RoutingSettings"))), + DefaultExceptionHandlerInstrumentation.class.getName() + "$DefaultHandlerAdvice"); + } + + static class DefaultHandlerAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void after( + @Advice.This ExceptionHandler$ eh, @Advice.Return(readOnly = false) ExceptionHandler ret) { + ret = eh.apply(MarkSpanAsErroredPF.INSTANCE).withFallback(ret); + } + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/MarkSpanAsErroredPF.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/MarkSpanAsErroredPF.java new file mode 100644 index 00000000000..5709c5025cd --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/MarkSpanAsErroredPF.java @@ -0,0 +1,31 @@ +package datadog.trace.instrumentation.akkahttp; + +import akka.http.scaladsl.server.RequestContext; +import akka.http.scaladsl.server.RouteResult; +import akka.japi.JavaPartialFunction; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import scala.Function1; +import scala.concurrent.Future; + +/** + * Runs before the default exception handler in {@link + * akka.http.scaladsl.server.ExceptionHandler$#default}, which usually completes with a 500, that + * the exception may be recorded. + */ +public class MarkSpanAsErroredPF + extends JavaPartialFunction>> { + public static final JavaPartialFunction INSTANCE = new MarkSpanAsErroredPF(); + + private MarkSpanAsErroredPF() {} + + @Override + public Function1> apply(Throwable x, boolean isCheck) + throws Exception, Exception { + AgentSpan agentSpan = AgentTracer.activeSpan(); + if (agentSpan != null) { + agentSpan.addThrowable(x); + } + throw noMatch(); + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/RecoverFromBlockedExceptionPF.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/RecoverFromBlockedExceptionPF.java new file mode 100644 index 00000000000..96bfb55194d --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/RecoverFromBlockedExceptionPF.java @@ -0,0 +1,52 @@ +package datadog.trace.instrumentation.akkahttp; + +import akka.http.scaladsl.model.HttpEntity$; +import akka.http.scaladsl.model.HttpProtocols; +import akka.http.scaladsl.model.HttpResponse; +import akka.http.scaladsl.model.StatusCode; +import akka.http.scaladsl.util.FastFuture$; +import akka.japi.JavaPartialFunction; +import datadog.appsec.api.blocking.BlockingException; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import scala.PartialFunction; +import scala.collection.immutable.List$; +import scala.compat.java8.JFunction1; +import scala.concurrent.Future; + +public class RecoverFromBlockedExceptionPF extends JavaPartialFunction { + public static final PartialFunction INSTANCE = + new RecoverFromBlockedExceptionPF(); + public static final PartialFunction> INSTANCE_FUTURE; + + static { + JFunction1> f = RecoverFromBlockedExceptionPF::valueToFuture; + INSTANCE_FUTURE = INSTANCE.andThen(f); + } + + @Override + public HttpResponse apply(Throwable x, boolean isCheck) throws Exception { + if (x instanceof BlockingException) { + if (isCheck) { + return null; + } + AgentSpan agentSpan = AgentTracer.activeSpan(); + if (agentSpan != null) { + agentSpan.addThrowable(x); + } + + // will be replaced anyway + return new HttpResponse( + StatusCode.int2StatusCode(500), + List$.MODULE$.empty(), + HttpEntity$.MODULE$.Empty(), + HttpProtocols.HTTP$div1$u002E1()); + } else { + throw noMatch(); + } + } + + private static Future valueToFuture(V value) { + return FastFuture$.MODULE$.successful().apply(value); + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/AkkaBlockResponseFunction.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/AkkaBlockResponseFunction.java new file mode 100644 index 00000000000..7421a8f9e01 --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/AkkaBlockResponseFunction.java @@ -0,0 +1,69 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import akka.http.scaladsl.model.HttpRequest; +import akka.http.scaladsl.model.HttpResponse; +import datadog.appsec.api.blocking.BlockingContentType; +import datadog.trace.api.gateway.BlockResponseFunction; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.internal.TraceSegment; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.util.Map; + +/** + * This block response function only saves the request blocking action. Usually the blocking request + * function directly commits a response. + * + * @see BlockingResponseHelper#handleFinishForWaf(AgentSpan, HttpResponse) + */ +public class AkkaBlockResponseFunction implements BlockResponseFunction { + private final HttpRequest request; + private Flow.Action.RequestBlockingAction rba; + private boolean unmarshallBlock; + private TraceSegment traceSegment; + + public AkkaBlockResponseFunction(HttpRequest request) { + this.request = request; + } + + public boolean isBlocking() { + return rba != null; + } + + public boolean isUnmarshallBlock() { + return unmarshallBlock; + } + + public void setUnmarshallBlock(boolean unmarshallBlock) { + this.unmarshallBlock = unmarshallBlock; + } + + public HttpResponse maybeCreateAlternativeResponse() { + if (!isBlocking()) { + return null; + } + + HttpResponse httpResponse = BlockingResponseHelper.maybeCreateBlockingResponse(rba, request); + if (httpResponse != null) { + traceSegment.effectivelyBlocked(); + } + return httpResponse; + } + + @Override + public boolean tryCommitBlockingResponse( + TraceSegment segment, + int statusCode, + BlockingContentType templateType, + Map extraHeaders) { + AgentSpan agentSpan = AgentTracer.activeSpan(); + if (agentSpan == null) { + return false; + } + if (rba == null) { + rba = new Flow.Action.RequestBlockingAction(statusCode, templateType, extraHeaders); + this.traceSegment = segment; + } + return true; + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/BlockingResponseHelper.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/BlockingResponseHelper.java new file mode 100644 index 00000000000..064448c764f --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/BlockingResponseHelper.java @@ -0,0 +1,101 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static datadog.trace.instrumentation.akkahttp.AkkaHttpServerDecorator.DECORATE; + +import akka.http.javadsl.model.HttpHeader; +import akka.http.javadsl.model.headers.RawHeader; +import akka.http.scaladsl.model.ContentTypes; +import akka.http.scaladsl.model.HttpEntity$; +import akka.http.scaladsl.model.HttpRequest; +import akka.http.scaladsl.model.HttpResponse; +import akka.http.scaladsl.model.ResponseEntity; +import akka.http.scaladsl.model.StatusCode; +import akka.util.ByteString; +import datadog.appsec.api.blocking.BlockingContentType; +import datadog.trace.api.gateway.BlockResponseFunction; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.bootstrap.blocking.BlockingActionHelper; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.instrumentation.akkahttp.AkkaHttpServerHeaders; +import java.util.Optional; +import scala.collection.immutable.List; + +public class BlockingResponseHelper { + private BlockingResponseHelper() {} + + public static HttpResponse handleFinishForWaf(final AgentSpan span, final HttpResponse response) { + RequestContext requestContext = span.getRequestContext(); + BlockResponseFunction brf = requestContext.getBlockResponseFunction(); + if (brf instanceof AkkaBlockResponseFunction) { + HttpResponse altResponse = ((AkkaBlockResponseFunction) brf).maybeCreateAlternativeResponse(); + if (altResponse != null) { + // we already blocked during the request + return altResponse; + } + } + Flow flow = + DECORATE.callIGCallbackResponseAndHeaders( + span, response, response.status().intValue(), AkkaHttpServerHeaders.responseGetter()); + Flow.Action action = flow.getAction(); + if (action instanceof Flow.Action.RequestBlockingAction) { + Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) action; + if (brf instanceof AkkaBlockResponseFunction) { + brf.tryCommitBlockingResponse( + requestContext.getTraceSegment(), + rba.getStatusCode(), + rba.getBlockingContentType(), + rba.getExtraHeaders()); + HttpResponse altResponse = + ((AkkaBlockResponseFunction) brf).maybeCreateAlternativeResponse(); + if (altResponse != null) { + return altResponse; + } + } + } + + return response; + } + + public static HttpResponse maybeCreateBlockingResponse(AgentSpan span, HttpRequest request) { + return maybeCreateBlockingResponse(span.getRequestBlockingAction(), request); + } + + public static HttpResponse maybeCreateBlockingResponse( + Flow.Action.RequestBlockingAction rba, HttpRequest request) { + if (rba == null) { + return null; + } + Optional accept = request.getHeader("accept"); + BlockingContentType bct = rba.getBlockingContentType(); + int httpCode = BlockingActionHelper.getHttpCode(rba.getStatusCode()); + ResponseEntity entity; + if (bct != BlockingContentType.NONE) { + BlockingActionHelper.TemplateType tt = + BlockingActionHelper.determineTemplateType(bct, accept.map(h -> h.value()).orElse(null)); + byte[] template = BlockingActionHelper.getTemplate(tt); + if (tt == BlockingActionHelper.TemplateType.HTML) { + entity = + HttpEntity$.MODULE$.apply( + ContentTypes.text$divhtml$u0028UTF$minus8$u0029(), ByteString.fromArray(template)); + } else { // json + entity = + HttpEntity$.MODULE$.apply( + ContentTypes.application$divjson(), ByteString.fromArray(template)); + } + } else { + entity = HttpEntity$.MODULE$.Empty(); + } + + List headersList = + rba.getExtraHeaders().entrySet().stream() + .map( + e -> + (akka.http.scaladsl.model.HttpHeader) + RawHeader.create(e.getKey(), e.getValue())) + .collect(ScalaListCollector.toScalaList()); + + return HttpResponse.apply( + StatusCode.int2StatusCode(httpCode), headersList, entity, request.protocol()); + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/Bug4304Instrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/Bug4304Instrumentation.java new file mode 100644 index 00000000000..22c4f2bbba1 --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/Bug4304Instrumentation.java @@ -0,0 +1,113 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.declaresField; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameStartsWith; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; + +import akka.stream.stage.GraphStageLogic; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers; +import datadog.trace.agent.tooling.muzzle.Reference; +import datadog.trace.api.gateway.BlockResponseFunction; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.lang.reflect.Field; +import java.util.regex.Pattern; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; + +/** See https://github.com/akka/akka-http/issues/4304 */ +@AutoService(Instrumenter.class) +public class Bug4304Instrumentation extends Instrumenter.AppSec + implements Instrumenter.ForTypeHierarchy, Instrumenter.WithTypeStructure { + public Bug4304Instrumentation() { + super("akka-http"); + } + + @Override + public String hierarchyMarkerType() { + return "akka.http.impl.engine.server.HttpServerBluePrint"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".AkkaBlockResponseFunction", + packageName + ".BlockingResponseHelper", + packageName + ".ScalaListCollector", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerDecorator", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerHeaders", + "datadog.trace.instrumentation.akkahttp.UriAdapter", + }; + } + + @Override + public Reference[] additionalMuzzleReferences() { + return ScalaListCollectorMuzzleReferences.additionalMuzzleReferences(); + } + + @Override + public ElementMatcher hierarchyMatcher() { + return nameStartsWith("akka.http.impl.engine.server.HttpServerBluePrint$ControllerStage$$anon$") + .and(HierarchyMatchers.extendsClass(named("akka.stream.stage.GraphStageLogic"))) + .and(MatchesOneHundredContinueStageAnonClass.INSTANCE); + } + + public static class MatchesOneHundredContinueStageAnonClass + implements ElementMatcher { + public static final ElementMatcher INSTANCE = + new MatchesOneHundredContinueStageAnonClass(); + + private MatchesOneHundredContinueStageAnonClass() {} + + private static final Pattern ANON_CLASS_PATTERN = + Pattern.compile( + "akka\\.http\\.impl\\.engine\\.server\\.HttpServerBluePrint\\$ControllerStage\\$\\$anon\\$" + + "\\d+\\$OneHundredContinueStage\\$\\$anon\\$\\d+"); + + @Override + public boolean matches(TypeDescription td) { + return ANON_CLASS_PATTERN.matcher(td.getName()).matches(); + } + } + + @Override + public ElementMatcher structureMatcher() { + return declaresField(named("oneHundredContinueSent")); + } + + @Override + public void adviceTransformations(AdviceTransformation transformation) { + transformation.applyAdvice( + isConstructor(), Bug4304Instrumentation.class.getName() + "$GraphStageLogicAdvice"); + } + + static class GraphStageLogicAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void after(@Advice.This GraphStageLogic thiz) + throws NoSuchFieldException, IllegalAccessException { + AgentSpan span = activeSpan(); + RequestContext reqCtx; + if (span == null + || (reqCtx = span.getRequestContext()) == null + || reqCtx.getData(RequestContextSlot.APPSEC) == null) { + return; + } + + BlockResponseFunction brf = reqCtx.getBlockResponseFunction(); + if (brf instanceof AkkaBlockResponseFunction) { + AkkaBlockResponseFunction abrf = (AkkaBlockResponseFunction) brf; + if (abrf.isBlocking() && abrf.isUnmarshallBlock()) { + Field f = thiz.getClass().getDeclaredField("oneHundredContinueSent"); + f.setAccessible(true); + f.set(thiz, true); + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/FormDataToStrictInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/FormDataToStrictInstrumentation.java new file mode 100644 index 00000000000..eb5ea2a7d7b --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/FormDataToStrictInstrumentation.java @@ -0,0 +1,65 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isStatic; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import akka.stream.Materializer; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import net.bytebuddy.asm.Advice; +import scala.concurrent.duration.FiniteDuration; + +/** @see akka.http.scaladsl.model.Multipart.FormData#toStrict(FiniteDuration, Materializer) */ +@AutoService(Instrumenter.class) +public class FormDataToStrictInstrumentation extends Instrumenter.AppSec + implements Instrumenter.ForSingleType, ScalaListCollectorMuzzleReferences { + public FormDataToStrictInstrumentation() { + super("akka-http"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".UnmarshallerHelpers", + packageName + ".AkkaBlockResponseFunction", + packageName + ".BlockingResponseHelper", + packageName + ".ScalaListCollector", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerDecorator", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerHeaders", + "datadog.trace.instrumentation.akkahttp.UriAdapter", + }; + } + + @Override + public String instrumentedType() { + return "akka.http.scaladsl.model.Multipart$FormData"; + } + + @Override + public void adviceTransformations(AdviceTransformation transformation) { + transformation.applyAdvice( + isMethod() + .and(not(isStatic())) + .and(named("toStrict")) + .and(takesArguments(2)) + .and(takesArgument(0, named("scala.concurrent.duration.FiniteDuration"))) + .and(takesArgument(1, named("akka.stream.Materializer"))) + .and(returns(named("scala.concurrent.Future"))), + FormDataToStrictInstrumentation.class.getName() + "$ToStrictAdvice"); + } + + static class ToStrictAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void before( + @Advice.Return(readOnly = false) + scala.concurrent.Future fut, + @Advice.Argument(1) Materializer mat) { + fut = UnmarshallerHelpers.transformMultiPartFormDataToStrictFuture(fut, mat); + } + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/JacksonUnmarshallerInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/JacksonUnmarshallerInstrumentation.java new file mode 100644 index 00000000000..a1758956e70 --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/JacksonUnmarshallerInstrumentation.java @@ -0,0 +1,66 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isStatic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import akka.http.javadsl.unmarshalling.Unmarshaller; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.muzzle.Reference; +import net.bytebuddy.asm.Advice; + +@AutoService(Instrumenter.class) +public class JacksonUnmarshallerInstrumentation extends Instrumenter.AppSec + implements Instrumenter.ForSingleType { + + public JacksonUnmarshallerInstrumentation() { + super("akka-http"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".UnmarshallerHelpers", + packageName + ".AkkaBlockResponseFunction", + packageName + ".BlockingResponseHelper", + packageName + ".ScalaListCollector", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerDecorator", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerHeaders", + "datadog.trace.instrumentation.akkahttp.UriAdapter", + }; + } + + @Override + public Reference[] additionalMuzzleReferences() { + return ScalaListCollectorMuzzleReferences.additionalMuzzleReferences(); + } + + @Override + public String instrumentedType() { + return "akka.http.javadsl.marshallers.jackson.Jackson"; + } + + @Override + public void adviceTransformations(AdviceTransformation transformation) { + transformation.applyAdvice( + isMethod() + .and(isStatic()) + .and(returns(named("akka.http.javadsl.unmarshalling.Unmarshaller"))) + .and(named("byteStringUnmarshaller").or(named("unmarshaller"))) + .and(takesArguments(2)) + .and(takesArgument(0, named("com.fasterxml.jackson.databind.ObjectMapper"))) + .and(takesArgument(1, Class.class)), + JacksonUnmarshallerInstrumentation.class.getName() + "$UnmarshallerAdvice"); + } + + static class UnmarshallerAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void after(@Advice.Return(readOnly = false) Unmarshaller ret) { + ret = UnmarshallerHelpers.transformJacksonUnmarshaller(ret); + } + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/MultipartUnmarshallersInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/MultipartUnmarshallersInstrumentation.java new file mode 100644 index 00000000000..bf96d650dac --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/MultipartUnmarshallersInstrumentation.java @@ -0,0 +1,63 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.instrumentation.akkahttp.iast.TraitMethodMatchers.isTraitMethod; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import akka.http.scaladsl.unmarshalling.MultipartUnmarshallers; +import akka.http.scaladsl.unmarshalling.Unmarshaller; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import net.bytebuddy.asm.Advice; + +/** @see MultipartUnmarshallers */ +@AutoService(Instrumenter.class) +public class MultipartUnmarshallersInstrumentation extends Instrumenter.AppSec + implements Instrumenter.ForKnownTypes { + + private static final String TRAIT_NAME = + "akka.http.scaladsl.unmarshalling.MultipartUnmarshallers"; + + public MultipartUnmarshallersInstrumentation() { + super("akka-http"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".UnmarshallerHelpers", + packageName + ".AkkaBlockResponseFunction", + packageName + ".BlockingResponseHelper", + packageName + ".ScalaListCollector", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerDecorator", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerHeaders", + "datadog.trace.instrumentation.akkahttp.UriAdapter", + }; + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + TRAIT_NAME, TRAIT_NAME + "$class", + }; + } + + @Override + public void adviceTransformations(AdviceTransformation transformation) { + transformation.applyAdvice( + isTraitMethod( + TRAIT_NAME, + "multipartFormDataUnmarshaller", + "akka.event.LoggingAdapter", + "akka.http.scaladsl.settings.ParserSettings") + .and(returns(named("akka.http.scaladsl.unmarshalling.Unmarshaller"))), + MultipartUnmarshallersInstrumentation.class.getName() + "$UnmarshallerWrappingAdvice"); + } + + static class UnmarshallerWrappingAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void after(@Advice.Return(readOnly = false) Unmarshaller unmarshaller) { + unmarshaller = UnmarshallerHelpers.transformMultipartFormDataUnmarshaller(unmarshaller); + } + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/PredefinedFromEntityUnmarshallersInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/PredefinedFromEntityUnmarshallersInstrumentation.java new file mode 100644 index 00000000000..72c52c057c2 --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/PredefinedFromEntityUnmarshallersInstrumentation.java @@ -0,0 +1,88 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; +import static datadog.trace.instrumentation.akkahttp.iast.TraitMethodMatchers.isTraitMethod; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers; +import akka.http.scaladsl.unmarshalling.Unmarshaller; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.muzzle.Reference; +import net.bytebuddy.asm.Advice; +import scala.collection.Seq; + +/** + * @see PredefinedFromEntityUnmarshallers#urlEncodedFormDataUnmarshaller(Seq) + * @see PredefinedFromEntityUnmarshallers#stringUnmarshaller() + */ +@AutoService(Instrumenter.class) +public class PredefinedFromEntityUnmarshallersInstrumentation extends Instrumenter.AppSec + implements Instrumenter.ForKnownTypes { + + private static final String TRAIT_NAME = + "akka.http.scaladsl.unmarshalling.PredefinedFromEntityUnmarshallers"; + + public PredefinedFromEntityUnmarshallersInstrumentation() { + super("akka-http"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".UnmarshallerHelpers", + packageName + ".AkkaBlockResponseFunction", + packageName + ".BlockingResponseHelper", + packageName + ".ScalaListCollector", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerDecorator", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerHeaders", + "datadog.trace.instrumentation.akkahttp.UriAdapter", + }; + } + + @Override + public Reference[] additionalMuzzleReferences() { + return ScalaListCollectorMuzzleReferences.additionalMuzzleReferences(); + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + TRAIT_NAME, TRAIT_NAME + "$class", + }; + } + + @Override + public void adviceTransformations(AdviceTransformation transformation) { + transformation.applyAdvice( + isTraitMethod( + TRAIT_NAME, + "urlEncodedFormDataUnmarshaller", + namedOneOf("scala.collection.Seq", "scala.collection.immutable.Seq")) + .and(returns(named("akka.http.scaladsl.unmarshalling.Unmarshaller"))), + PredefinedFromEntityUnmarshallersInstrumentation.class.getName() + + "$UrlEncodedUnmarshallerWrappingAdvice"); + transformation.applyAdvice( + isTraitMethod(TRAIT_NAME, "stringUnmarshaller") + .and(returns(named("akka.http.scaladsl.unmarshalling.Unmarshaller"))), + PredefinedFromEntityUnmarshallersInstrumentation.class.getName() + + "$StringUnmarshallerWrappingAdvice"); + } + + static class UrlEncodedUnmarshallerWrappingAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void after(@Advice.Return(readOnly = false) Unmarshaller unmarshaller) { + unmarshaller = UnmarshallerHelpers.transformUrlEncodedUnmarshaller(unmarshaller); + } + } + + static class StringUnmarshallerWrappingAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void after( + @Advice.Return(readOnly = false) + Unmarshaller unmarshaller) { + unmarshaller = UnmarshallerHelpers.transformStringUnmarshaller(unmarshaller); + } + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/ScalaListCollector.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/ScalaListCollector.java new file mode 100644 index 00000000000..6410d125c5d --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/ScalaListCollector.java @@ -0,0 +1,108 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static java.lang.invoke.MethodHandles.lookup; +import static java.lang.invoke.MethodType.methodType; + +import java.lang.invoke.MethodHandle; +import java.util.Collections; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collector; +import scala.collection.immutable.List; +import scala.collection.mutable.ListBuffer; + +public class ScalaListCollector implements Collector, List> { + + private static final Collector INSTANCE_TO_LIST; + private static final MethodHandle PLUS_EQ; + private static final MethodHandle PLUS_PLUS_EQ; + + static { + ClassLoader classLoader = ScalaListCollector.class.getClassLoader(); + if (classLoader == null) { + classLoader = ClassLoader.getSystemClassLoader(); + } + + MethodHandle plusEq; + MethodHandle plusPlusEq; + try { + plusEq = + lookup() + .findVirtual( + ListBuffer.class, "$plus$eq", methodType(ListBuffer.class, Object.class)); + Class traversableOnceCls = classLoader.loadClass("scala.collection.TraversableOnce"); + plusPlusEq = + lookup() + .findVirtual( + ListBuffer.class, + "$plus$plus$eq", + methodType(ListBuffer.class, traversableOnceCls)); + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException e) { + try { + plusEq = + lookup() + .findVirtual( + ListBuffer.class, "addOne", methodType(ListBuffer.class, Object.class)); + Class iterableOnceCls = classLoader.loadClass("scala.collection.IterableOnce"); + plusPlusEq = + lookup() + .findVirtual( + ListBuffer.class, "addAll", methodType(ListBuffer.class, iterableOnceCls)); + } catch (NoSuchMethodException | IllegalAccessException | ClassNotFoundException ex) { + throw new RuntimeException(ex); + } + } + + PLUS_EQ = plusEq; + PLUS_PLUS_EQ = plusPlusEq; + INSTANCE_TO_LIST = new ScalaListCollector(); + } + + public static Collector> toScalaList() { + return INSTANCE_TO_LIST; + } + + private static ListBuffer addOne(ListBuffer list, T object) { + try { + return (ListBuffer) PLUS_EQ.invoke(list, object); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + private static ListBuffer addAll(ListBuffer list, ListBuffer otherList) { + try { + return (ListBuffer) PLUS_PLUS_EQ.invoke(list, otherList); + } catch (Throwable e) { + throw new RuntimeException(e); + } + } + + @Override + public Supplier> supplier() { + return scala.collection.mutable.ListBuffer::new; + } + + @Override + public BiConsumer, T> accumulator() { + return ScalaListCollector::addOne; + } + + @Override + public BinaryOperator> combiner() { + return ScalaListCollector::addAll; + } + + @Override + public Function, List> finisher() { + return scala.collection.mutable.ListBuffer::toList; + } + + @Override + public Set characteristics() { + return Collections.emptySet(); + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/ScalaListCollectorMuzzleReferences.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/ScalaListCollectorMuzzleReferences.java new file mode 100644 index 00000000000..08f73d2c7fa --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/ScalaListCollectorMuzzleReferences.java @@ -0,0 +1,38 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import datadog.trace.agent.tooling.muzzle.Reference; + +public interface ScalaListCollectorMuzzleReferences { + Reference SCALA_LIST_COLLECTOR = + new Reference.Builder("scala.collection.mutable.ListBuffer") + .withMethod( + new String[0], + Reference.EXPECTS_NON_STATIC | Reference.EXPECTS_PUBLIC, + "$plus$eq", + "Lscala/collection/mutable/ListBuffer;", + "Ljava/lang/Object;") + .withMethod( + new String[0], + Reference.EXPECTS_NON_STATIC | Reference.EXPECTS_PUBLIC, + "$plus$plus$eq", + "Lscala/collection/mutable/ListBuffer;", + "Lscala/collection/TraversableOnce;") + .or() + .withMethod( + new String[0], + Reference.EXPECTS_NON_STATIC | Reference.EXPECTS_PUBLIC, + "addOne", + "Lscala/collection/mutable/ListBuffer;", + "Ljava/lang/Object;") + .withMethod( + new String[0], + Reference.EXPECTS_NON_STATIC | Reference.EXPECTS_PUBLIC, + "addAll", + "Lscala/collection/mutable/ListBuffer;", + "Lscala/collection/IterableOnce;") + .build(); + + static Reference[] additionalMuzzleReferences() { + return new Reference[] {SCALA_LIST_COLLECTOR}; + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/SprayUnmarshallerInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/SprayUnmarshallerInstrumentation.java new file mode 100644 index 00000000000..ecf150e7d17 --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/SprayUnmarshallerInstrumentation.java @@ -0,0 +1,72 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.instrumentation.akkahttp.iast.TraitMethodMatchers.isTraitMethod; +import static net.bytebuddy.matcher.ElementMatchers.returns; + +import akka.http.scaladsl.unmarshalling.Unmarshaller; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.muzzle.Reference; +import net.bytebuddy.asm.Advice; + +// TODO: move to separate module and have better support +@AutoService(Instrumenter.class) +public class SprayUnmarshallerInstrumentation extends Instrumenter.AppSec + implements Instrumenter.ForKnownTypes { + + private static final String TRAIT_NAME = + "akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport"; + + public SprayUnmarshallerInstrumentation() { + super("akka-http"); + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + TRAIT_NAME, TRAIT_NAME + "$class", + }; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".UnmarshallerHelpers", + packageName + ".AkkaBlockResponseFunction", + packageName + ".BlockingResponseHelper", + packageName + ".ScalaListCollector", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerDecorator", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerHeaders", + "datadog.trace.instrumentation.akkahttp.UriAdapter", + }; + } + + @Override + public Reference[] additionalMuzzleReferences() { + return ScalaListCollectorMuzzleReferences.additionalMuzzleReferences(); + } + + @Override + public void adviceTransformations(AdviceTransformation transformation) { + transformation.applyAdvice( + isTraitMethod(TRAIT_NAME, "sprayJsonUnmarshaller", "spray.json.RootJsonReader") + .and(returns(named("akka.http.scaladsl.unmarshalling.Unmarshaller"))) + .or( + isTraitMethod( + TRAIT_NAME, "sprayJsonByteStringUnmarshaller", "spray.json.RootJsonReader") + .and(returns(named("akka.http.scaladsl.unmarshalling.Unmarshaller")))), + SprayUnmarshallerInstrumentation.class.getName() + "$ArbitraryTypeAdvice"); + // support is basic: + // * Source[T, NotUsed] is not intercepted + // * neither is the conversion into JsValue. It would need to wrap the JsValue + // to intercept calls to the methods in play.api.libs.json.JsReadable + } + + static class ArbitraryTypeAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void after(@Advice.Return(readOnly = false) Unmarshaller ret) { + ret = UnmarshallerHelpers.transformArbitrarySprayUnmarshaller(ret); + } + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/StrictFormCompanionInstrumentation.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/StrictFormCompanionInstrumentation.java new file mode 100644 index 00000000000..2e34ee6cfdf --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/StrictFormCompanionInstrumentation.java @@ -0,0 +1,69 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isStatic; +import static net.bytebuddy.matcher.ElementMatchers.not; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import akka.http.scaladsl.common.StrictForm; +import akka.http.scaladsl.model.HttpEntity; +import akka.http.scaladsl.unmarshalling.Unmarshaller; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.muzzle.Reference; +import net.bytebuddy.asm.Advice; + +/** @see akka.http.scaladsl.common.StrictForm$#unmarshaller(Unmarshaller, Unmarshaller) */ +@AutoService(Instrumenter.class) +public class StrictFormCompanionInstrumentation extends Instrumenter.AppSec + implements Instrumenter.ForSingleType { + public StrictFormCompanionInstrumentation() { + super("akka-http"); + } + + @Override + public String instrumentedType() { + return "akka.http.scaladsl.common.StrictForm$"; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".UnmarshallerHelpers", + packageName + ".AkkaBlockResponseFunction", + packageName + ".BlockingResponseHelper", + packageName + ".ScalaListCollector", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerDecorator", + "datadog.trace.instrumentation.akkahttp.AkkaHttpServerHeaders", + "datadog.trace.instrumentation.akkahttp.UriAdapter", + }; + } + + @Override + public Reference[] additionalMuzzleReferences() { + return ScalaListCollectorMuzzleReferences.additionalMuzzleReferences(); + } + + @Override + public void adviceTransformations(AdviceTransformation transformation) { + transformation.applyAdvice( + isMethod() + .and(not(isStatic())) + .and(named("unmarshaller")) + .and(returns(named("akka.http.scaladsl.unmarshalling.Unmarshaller"))) + .and(takesArguments(2)) + .and(takesArgument(0, named("akka.http.scaladsl.unmarshalling.Unmarshaller"))) + .and(takesArgument(1, named("akka.http.scaladsl.unmarshalling.Unmarshaller"))), + StrictFormCompanionInstrumentation.class.getName() + "$UnmarshallerAdvice"); + } + + static class UnmarshallerAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + static void after(@Advice.Return(readOnly = false) Unmarshaller ret) { + ret = UnmarshallerHelpers.transformStrictFormUnmarshaller(ret); + } + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/UnmarshallerHelpers.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/UnmarshallerHelpers.java new file mode 100644 index 00000000000..5620bb9e23a --- /dev/null +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/appsec/UnmarshallerHelpers.java @@ -0,0 +1,445 @@ +package datadog.trace.instrumentation.akkahttp.appsec; + +import static datadog.trace.api.gateway.Events.EVENTS; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; + +import akka.http.javadsl.model.ContentType; +import akka.http.javadsl.model.MediaType; +import akka.http.javadsl.model.MediaTypes; +import akka.http.scaladsl.common.StrictForm; +import akka.http.scaladsl.model.FormData; +import akka.http.scaladsl.model.HttpEntity; +import akka.http.scaladsl.unmarshalling.Unmarshaller; +import akka.http.scaladsl.unmarshalling.Unmarshaller$; +import akka.stream.Materializer; +import datadog.appsec.api.blocking.BlockingException; +import datadog.trace.api.gateway.BlockResponseFunction; +import datadog.trace.api.gateway.CallbackProvider; +import datadog.trace.api.gateway.Flow; +import datadog.trace.api.gateway.RequestContext; +import datadog.trace.api.gateway.RequestContextSlot; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.WeakHashMap; +import java.util.function.BiFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Function1; +import scala.Tuple2; +import scala.collection.Iterable; +import scala.collection.Iterator; +import scala.compat.java8.JFunction1; +import scala.compat.java8.JFunction2; +import scala.concurrent.ExecutionContext; +import scala.concurrent.Future; + +public class UnmarshallerHelpers { + + public static final int MAX_CONVERSION_DEPTH = 10; + private static final Logger log = LoggerFactory.getLogger(UnmarshallerHelpers.class); + + private static final MediaType APPLICATION_X_WWW_FORM_URLENCODED; + + static { + MediaType t = null; + try { + // subtype of MediaType changes between 10.0 and 10.1 + Field f = MediaTypes.class.getField("APPLICATION_X_WWW_FORM_URLENCODED"); + t = (MediaType) f.get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + } + APPLICATION_X_WWW_FORM_URLENCODED = t; + } + + private UnmarshallerHelpers() {} + + public static Unmarshaller + transformUrlEncodedUnmarshaller( + Unmarshaller original) { + JFunction1 mapf = + formData -> { + try { + handleFormData(formData); + } catch (Exception e) { + handleException(e, "transformUrlEncodedMarshaller"); + } + + return formData; + }; + + return original.map(mapf); + } + + private static void handleFormData(FormData formData) { + AgentSpan span = activeSpan(); + RequestContext reqCtx; + if (span == null + || (reqCtx = span.getRequestContext()) == null + || reqCtx.getData(RequestContextSlot.APPSEC) == null + || isStrictFormOngoing(span)) { + return; + } + + CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC); + BiFunction> callback = + cbp.getCallback(EVENTS.requestBodyProcessed()); + if (callback == null) { + return; + } + + Iterator> fieldsIter = formData.fields().iterator(); + Map> conv = new HashMap<>(); + while (fieldsIter.hasNext()) { + Tuple2 pair = fieldsIter.next(); + + String key = pair._1; + List values = conv.get(key); + if (values == null) { + values = new ArrayList<>(); + conv.put(key, values); + } + values.add(pair._2); + } + + if (conv.isEmpty()) { + return; + } + + // callback execution + executeCallback(reqCtx, callback, conv, "urlEncodedFormDataUnmarshaller"); + } + + private static void executeCallback( + RequestContext reqCtx, + BiFunction> callback, + Object conv, + String details) { + Flow flow = callback.apply(reqCtx, conv); + Flow.Action action = flow.getAction(); + if (action instanceof Flow.Action.RequestBlockingAction) { + Flow.Action.RequestBlockingAction rba = (Flow.Action.RequestBlockingAction) action; + BlockResponseFunction blockResponseFunction = reqCtx.getBlockResponseFunction(); + if (blockResponseFunction != null) { + boolean success = + blockResponseFunction.tryCommitBlockingResponse( + reqCtx.getTraceSegment(), + rba.getStatusCode(), + rba.getBlockingContentType(), + rba.getExtraHeaders()); + if (success) { + if (blockResponseFunction instanceof AkkaBlockResponseFunction) { + AkkaBlockResponseFunction abrf = (AkkaBlockResponseFunction) blockResponseFunction; + abrf.setUnmarshallBlock(true); + } + throw new BlockingException("Blocked request (for " + details + ")"); + } + } + } + } + + public static Unmarshaller transformMultipartFormDataUnmarshaller(Unmarshaller original) { + JFunction1< + akka.http.scaladsl.model.Multipart.FormData, + akka.http.scaladsl.model.Multipart.FormData> + mapf = + t -> { + if (!(t instanceof akka.http.scaladsl.model.Multipart$FormData$Strict)) { + // data not loaded yet... + // it's not practical to wrap the object + // rely on instrumentation on toStrict + return t; + } + + try { + handleMultipartStrictFormData( + (akka.http.scaladsl.model.Multipart$FormData$Strict) t); + } catch (Exception e) { + handleException(e, "Error in handleMultipartStrictFormData"); + } + + return t; + }; + + return original.map(mapf); + } + + public static scala.concurrent.Future + transformMultiPartFormDataToStrictFuture( + scala.concurrent.Future future, + Materializer materializer) { + JFunction1< + akka.http.scaladsl.model.Multipart$FormData$Strict, + akka.http.scaladsl.model.Multipart$FormData$Strict> + mapf = + t -> { + try { + AgentSpan span = activeSpan(); + if (span != null && !isStrictFormOngoing(span)) { + handleMultipartStrictFormData(t); + } + } catch (Exception e) { + handleException(e, "Error in transformMultiPartFormDataToStrictFuture"); + } + return t; + }; + return future.map(mapf, materializer.executionContext()); + } + + private static void handleMultipartStrictFormData( + akka.http.scaladsl.model.Multipart$FormData$Strict st) { + AgentSpan span = activeSpan(); + RequestContext reqCtx; + if (span == null + || (reqCtx = span.getRequestContext()) == null + || reqCtx.getData(RequestContextSlot.APPSEC) == null) { + return; + } + + CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC); + BiFunction> callback = + cbp.getCallback(EVENTS.requestBodyProcessed()); + if (callback == null) { + return; + } + + // conversion to map string -> list of string + java.lang.Iterable strictParts = + st.getStrictParts(); + Map> conv = new HashMap<>(); + for (akka.http.javadsl.model.Multipart.FormData.BodyPart.Strict part : strictParts) { + akka.http.javadsl.model.HttpEntity.Strict entity = part.getEntity(); + if (!(entity instanceof HttpEntity.Strict)) { + continue; + } + + HttpEntity.Strict sentity = (HttpEntity.Strict) entity; + + String name = part.getName(); + List curStrings = conv.get(name); + if (curStrings == null) { + curStrings = new ArrayList<>(); + conv.put(name, curStrings); + } + + String s = + sentity + .getData() + .decodeString( + Unmarshaller$.MODULE$.bestUnmarshallingCharsetFor(sentity).nioCharset()); + curStrings.add(s); + } + + // callback execution + executeCallback(reqCtx, callback, conv, "multipartFormDataUnmarshaller"); + } + + public static Unmarshaller transformStringUnmarshaller( + Unmarshaller original) { + Unmarshaller.EnhancedUnmarshaller enhancedOriginal = + new Unmarshaller.EnhancedUnmarshaller<>(original); + JFunction2 f2 = + (entity, str) -> { + try { + AgentSpan agentSpan = activeSpan(); + if (agentSpan == null || isStrictFormOngoing(agentSpan)) { + return str; + } + + ContentType contentType = entity.getContentType(); + MediaType mediaType = contentType.mediaType(); + if (mediaType != MediaTypes.APPLICATION_JSON + && mediaType != MediaTypes.MULTIPART_FORM_DATA + && mediaType != APPLICATION_X_WWW_FORM_URLENCODED) { + handleArbitraryPostData(str, "HttpEntity -> String unmarshaller"); + } + } catch (Exception e) { + handleException(e, "Error in transformStringUnmarshaller"); + } + + return str; + }; + + return enhancedOriginal.mapWithInput(f2); + } + + public static akka.http.javadsl.unmarshalling.Unmarshaller transformJacksonUnmarshaller( + akka.http.javadsl.unmarshalling.Unmarshaller original) { + return original.thenApply( + ret -> { + try { + handleArbitraryPostData(ret, "jackson unmarshaller"); + } catch (Exception e) { + handleException(e, "Error in transformJacksonUnmarshaller"); + } + return ret; + }); + } + + public static Unmarshaller transformArbitrarySprayUnmarshaller(Unmarshaller original) { + JFunction1 f = + ret -> { + Object conv = tryConvertingScalaContainers(ret, MAX_CONVERSION_DEPTH); + try { + handleArbitraryPostData(conv, "spray unmarshaller"); + } catch (Exception e) { + handleException(e, "Error in transformArbitrarySprayUnmarshaller"); + } + return ret; + }; + return original.map(f); + } + + private static final WeakHashMap STRICT_FORM_SERIALIZATION_ONGOING = + new WeakHashMap<>(); + + private static void markStrictFormOngoing(AgentSpan agentSpan) { + synchronized (STRICT_FORM_SERIALIZATION_ONGOING) { + STRICT_FORM_SERIALIZATION_ONGOING.put(agentSpan.getRequestContext(), Boolean.TRUE); + } + } + + private static boolean isStrictFormOngoing(AgentSpan agentSpan) { + synchronized (STRICT_FORM_SERIALIZATION_ONGOING) { + return STRICT_FORM_SERIALIZATION_ONGOING.getOrDefault( + agentSpan.getRequestContext(), Boolean.FALSE); + } + } + + private static JFunction1 STRICT_FORM_DATA_POST_TRANSF = + sf -> { + try { + handleStrictFormData(sf); + } catch (Exception e) { + handleException(e, "Error in transformStrictFromUnmarshaller"); + } + // we do not remove the span from STRICT_FORM_SERIALIZATION_ONGOING, + // as the string unmarshaller can still run afterwards. This way, the + // advice will still be skipped + return sf; + }; + + public static Unmarshaller transformStrictFormUnmarshaller( + Unmarshaller original) { + JFunction1>>> + wrappedBeforeF = + ec -> { + JFunction1>> g = + mat -> { + JFunction1> h = + entity -> { + AgentSpan agentSpan = activeSpan(); + if (agentSpan != null) { + markStrictFormOngoing(agentSpan); + } + return original.apply(entity, ec, mat); + }; + return h; + }; + return g; + }; + Unmarshaller wrappedBeforeU = + Unmarshaller$.MODULE$.withMaterializer(wrappedBeforeF); + + return wrappedBeforeU.map(STRICT_FORM_DATA_POST_TRANSF); + } + + private static void handleStrictFormData(StrictForm sf) { + Iterator> iterator = sf.fields().iterator(); + Map> conv = new HashMap<>(); + while (iterator.hasNext()) { + Tuple2 next = iterator.next(); + String fieldName = next._1(); + StrictForm.Field field = next._2(); + + List strings = conv.get(fieldName); + if (strings == null) { + strings = new ArrayList<>(); + conv.put(fieldName, strings); + } + + Object strictFieldValue; + try { + Field f = field.getClass().getDeclaredField("value"); + f.setAccessible(true); + strictFieldValue = f.get(field); + } catch (NoSuchFieldException | IllegalAccessException e) { + continue; + } + + if (strictFieldValue instanceof String) { + strings.add((String) strictFieldValue); + } else if (strictFieldValue + instanceof akka.http.scaladsl.model.Multipart$FormData$BodyPart$Strict) { + HttpEntity.Strict sentity = + ((akka.http.scaladsl.model.Multipart$FormData$BodyPart$Strict) strictFieldValue) + .entity(); + String s = + sentity + .getData() + .decodeString( + Unmarshaller$.MODULE$.bestUnmarshallingCharsetFor(sentity).nioCharset()); + strings.add(s); + } + } + + handleArbitraryPostData(conv, "HttpEntity -> StrictForm unmarshaller"); + } + + private static Object tryConvertingScalaContainers(Object obj, int depth) { + if (depth == 0) { + return obj; + } + if (obj instanceof scala.collection.Map) { + scala.collection.Map map = (scala.collection.Map) obj; + Map ret = new HashMap<>(); + Iterator iterator = map.iterator(); + while (iterator.hasNext()) { + Tuple2 next = iterator.next(); + ret.put(next._1(), tryConvertingScalaContainers(next._2(), depth - 1)); + } + return ret; + } else if (obj instanceof scala.collection.Iterable) { + List ret = new ArrayList<>(); + Iterator iterator = ((Iterable) obj).iterator(); + while (iterator.hasNext()) { + Object next = iterator.next(); + ret.add(tryConvertingScalaContainers(next, depth - 1)); + } + return ret; + } + return obj; + } + + private static void handleArbitraryPostData(Object o, String source) { + AgentSpan span = activeSpan(); + RequestContext reqCtx; + if (span == null + || (reqCtx = span.getRequestContext()) == null + || reqCtx.getData(RequestContextSlot.APPSEC) == null) { + return; + } + + CallbackProvider cbp = AgentTracer.get().getCallbackProvider(RequestContextSlot.APPSEC); + BiFunction> callback = + cbp.getCallback(EVENTS.requestBodyProcessed()); + if (callback == null) { + return; + } + + // callback execution + executeCallback(reqCtx, callback, o, source); + } + + private static void handleException(Exception e, String logMessage) { + if (e instanceof BlockingException) { + throw (BlockingException) e; + } + + log.warn(logMessage, e); + } +} diff --git a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/iast/TraitMethodMatchers.java b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/iast/TraitMethodMatchers.java index 2d74073aedf..d9db76ece90 100644 --- a/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/iast/TraitMethodMatchers.java +++ b/dd-java-agent/instrumentation/akka-http-10.0/src/main/java/datadog/trace/instrumentation/akkahttp/iast/TraitMethodMatchers.java @@ -9,11 +9,12 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArguments; import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; public class TraitMethodMatchers { - public static ElementMatcher.Junction isTraitDirectiveMethod( - String traitName, String name, String... argumentTypes) { + public static ElementMatcher.Junction isTraitMethod( + String traitName, String name, Object... argumentTypes) { ElementMatcher.Junction scalaOldArgs = isStatic() @@ -23,13 +24,24 @@ public static ElementMatcher.Junction isTraitDirectiveMethod( not(isStatic()).and(takesArguments(argumentTypes.length)); for (int i = 0; i < argumentTypes.length; i++) { - scalaOldArgs = scalaOldArgs.and(takesArgument(i + 1, named(argumentTypes[i]))); - scalaNewArgs = scalaNewArgs.and(takesArgument(i, named(argumentTypes[i]))); + Object argumentType = argumentTypes[i]; + ElementMatcher matcher; + if (argumentType instanceof ElementMatcher) { + matcher = (ElementMatcher) argumentType; + } else { + matcher = named((String) argumentType); + } + scalaOldArgs = scalaOldArgs.and(takesArgument(i + 1, matcher)); + scalaNewArgs = scalaNewArgs.and(takesArgument(i, matcher)); } - return isMethod() - .and(named(name)) - .and(returns(named("akka.http.scaladsl.server.Directive"))) - .and(scalaOldArgs.or(scalaNewArgs)); + return isMethod().and(named(name)).and(scalaOldArgs.or(scalaNewArgs)); + } + + public static ElementMatcher.Junction isTraitDirectiveMethod( + String traitName, String name, String... argumentTypes) { + + return isTraitMethod(traitName, name, (Object[]) argumentTypes) + .and(returns(named("akka.http.scaladsl.server.Directive"))); } } diff --git a/dd-java-agent/instrumentation/glassfish/src/test/groovy/GlassFishServerTest.groovy b/dd-java-agent/instrumentation/glassfish/src/test/groovy/GlassFishServerTest.groovy index 8a8f86421bf..8f9d3502271 100644 --- a/dd-java-agent/instrumentation/glassfish/src/test/groovy/GlassFishServerTest.groovy +++ b/dd-java-agent/instrumentation/glassfish/src/test/groovy/GlassFishServerTest.groovy @@ -124,11 +124,14 @@ class GlassFishServerTest extends HttpServerTest { @Override boolean testBlocking() { - true - } - - @Override - boolean testUserBlocking() { + // TODO: the servlet instrumentation has no blocking request function yet + // Relying on grizzly or grizzly-filterchain instrumentations doesn't work + // for different reasons: the version of grizzly-http is too old for + // glassfish 4 and for glassfish 5 the span is created after the servlet span. + // The grizzly instrumentation doesn't work for a different reason: the blocking + // exception throw on parseRequestParameters is gobbled inside + // org.glassfish.grizzly.http.server.Request#parseRequestParameters and never + // propagates. false } diff --git a/dd-java-agent/instrumentation/servlet/request-3/src/test/groovy/JettyServlet3Test.groovy b/dd-java-agent/instrumentation/servlet/request-3/src/test/groovy/JettyServlet3Test.groovy index c8216e77079..48f29338d03 100644 --- a/dd-java-agent/instrumentation/servlet/request-3/src/test/groovy/JettyServlet3Test.groovy +++ b/dd-java-agent/instrumentation/servlet/request-3/src/test/groovy/JettyServlet3Test.groovy @@ -297,6 +297,13 @@ class JettyServlet3TestInclude extends JettyServlet3Test { return true } + @Override + boolean testBlocking() { + // setting response code from included dispatches is not supported by servlet, + // and would require version-dependent hacks on Jetty + false + } + @Override boolean testUserBlocking() { false diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy index e5dd1bbd393..d04d06f2f1e 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/HttpServerTest.groovy @@ -1632,7 +1632,7 @@ abstract class HttpServerTest extends WithHttpServer { def 'test blocking of request for request body variant #variant'() { setup: - assumeTrue(testUserBlocking()) + assumeTrue(testBlocking()) assumeTrue(executeTest) def request = request( diff --git a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/WithHttpServer.groovy b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/WithHttpServer.groovy index 01dee85bbfe..235aa2a7933 100644 --- a/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/WithHttpServer.groovy +++ b/dd-java-agent/testing/src/main/groovy/datadog/trace/agent/test/base/WithHttpServer.groovy @@ -8,6 +8,8 @@ import okhttp3.OkHttpClient import spock.lang.Shared import spock.lang.Subject +import java.lang.management.ManagementFactory +import java.lang.management.RuntimeMXBean import java.util.concurrent.TimeUnit import java.util.concurrent.TimeoutException @@ -16,8 +18,12 @@ abstract class WithHttpServer extends VersionedNamingTestBase { @Shared @Subject HttpServer server + + @Lazy + private static int timeoutValue = debugging ? 1500 : 15 + @Shared - OkHttpClient client = OkHttpUtils.client(1500, 1500, TimeUnit.SECONDS) + OkHttpClient client = OkHttpUtils.client(timeoutValue, timeoutValue, TimeUnit.SECONDS) @Shared URI address = null @@ -93,4 +99,15 @@ abstract class WithHttpServer extends VersionedNamingTestBase { void stopServer(SERVER server) { throw new UnsupportedOperationException() } + + private static boolean isDebugging() { + RuntimeMXBean runtimeMXBean = ManagementFactory.runtimeMXBean + List inputArguments = runtimeMXBean.inputArguments + for (String arg : inputArguments) { + if (arg.contains("-agentlib:jdwp")) { + return true + } + } + false + } }