Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(iast): duplicated Flask headers [backport 2.8] #9021

Merged
merged 1 commit into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions ddtrace/appsec/_iast/_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ def if_iast_taint_yield_tuple_for(origins, wrapped, instance, args, kwargs):
if not AppSecIastSpanProcessor.is_span_analyzed():
for key, value in wrapped(*args, **kwargs):
yield key, value

for key, value in wrapped(*args, **kwargs):
new_key = taint_pyobject(pyobject=key, source_name=key, source_value=key, source_origin=origins[0])
new_value = taint_pyobject(pyobject=value, source_name=key, source_value=value, source_origin=origins[1])
yield new_key, new_value
else:
for key, value in wrapped(*args, **kwargs):
new_key = taint_pyobject(pyobject=key, source_name=key, source_value=key, source_origin=origins[0])
new_value = taint_pyobject(
pyobject=value, source_name=key, source_value=value, source_origin=origins[1]
)
yield new_key, new_value

else:
for key, value in wrapped(*args, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
fixes:
- |
Code Security: Ensure that when tainting the headers of a Flask application, iterating over the headers
(i.e., with `headers.items()`) does not duplicate them.
9 changes: 9 additions & 0 deletions tests/appsec/iast_tdd_propagation/flask_propagation_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from flask_propagation_views import create_app

from ddtrace import auto # noqa: F401


app = create_app()

if __name__ == "__main__":
app.run(debug=False, port=8000)
47 changes: 47 additions & 0 deletions tests/appsec/iast_tdd_propagation/flask_propagation_views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import sys

from flask import Flask
from flask import request

from ddtrace import tracer
from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted


class ResultResponse:
param = ""
sources = ""
vulnerabilities = ""

def __init__(self, param):
self.param = param

def json(self):
return {
"param": self.param,
"sources": self.sources,
"vulnerabilities": self.vulnerabilities,
"params_are_tainted": is_pyobject_tainted(self.param),
}


def create_app():
app = Flask(__name__)

@app.route("/shutdown")
def shutdown():
tracer.shutdown()
sys.exit(0)

@app.route("/")
def index():
return "OK"

@app.route("/check-headers")
def check_headers():
headers = list(request.headers.items())

response = ResultResponse(headers)

return response.json()

return app
4 changes: 0 additions & 4 deletions tests/appsec/iast_tdd_propagation/flask_taint_sinks_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
#!/usr/bin/env python3

""" This Flask application is imported on tests.appsec.appsec_utils.gunicorn_server
"""
from flask_taint_sinks_views import create_app

from ddtrace import auto # noqa: F401
Expand Down
25 changes: 25 additions & 0 deletions tests/appsec/iast_tdd_propagation/test_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def test_iast_flask_orm(orm, xfail):


def test_iast_flask_weak_cipher():
"""Verify a segmentation fault on pycriptodome and AES"""
with flask_server(
iast_enabled="true",
tracer_enabled="true",
Expand Down Expand Up @@ -83,3 +84,27 @@ def test_iast_flask_weak_cipher():
assert content["sources"] == ""
assert content["vulnerabilities"] == "WEAK_CIPHER"
assert content["params_are_tainted"] is True


def test_iast_flask_headers():
"""Verify duplicated headers in the request"""
with flask_server(
iast_enabled="true",
tracer_enabled="true",
remote_configuration_enabled="false",
token=None,
# app="tests/appsec/iast_tdd_propagation/flask_propagation_app.py",
app="flask_propagation_app.py",
) as context:
server_process, client, pid = context
tainted_response = client.get("/check-headers", headers={"Accept-Encoding": "gzip, deflate, br"})

assert tainted_response.status_code == 200
content = json.loads(tainted_response.content)
assert content["param"] == [
["Host", "0.0.0.0:8000"],
["User-Agent", "python-requests/2.31.0"],
["Accept-Encoding", "gzip, deflate, br"],
["Accept", "*/*"],
["Connection", "keep-alive"],
]
Loading