Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse arrays of objects in AWS WAF logs #459

Merged
merged 5 commits into from
Aug 17, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions aws/logs_monitoring/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from parsing import (
parse,
separate_security_hub_findings,
parse_aws_waf_logs,
)
from telemetry import (
DD_FORWARDER_TELEMETRY_NAMESPACE_PREFIX,
Expand Down Expand Up @@ -194,6 +195,10 @@ def transform(events):
events.remove(event)
events.extend(findings)

waf = parse_aws_waf_logs(event)
if waf != event:
events.remove(event)
events.append(waf)
return events


Expand Down
113 changes: 113 additions & 0 deletions aws/logs_monitoring/parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,119 @@ def cwevent_handler(event, metadata):
yield data


def parse_aws_waf_logs(event):
"""Parse out complex arrays of objects in AWS WAF logs

Attributes to convert:
httpRequest.headers
nonTerminatingMatchingRules
rateBasedRuleList
ruleGroupList

This prevents having an unparsable array of objects in the final log.
"""
if event.get(DD_SOURCE) != "waf":
return event

event_copy = copy.deepcopy(event)

message = event_copy.get("message", {})
if isinstance(message, str):
try:
message = json.loads(message)
except json.JSONDecodeError:
logger.debug("Failed to decode waf message")
return

headers = message.get("httpRequest", {}).get("headers")
if headers and isinstance(headers, list):
header_obj = {}
for header in headers:
if "name" in header and "value" in header:
header_obj.update({header["name"]: header["value"]})
message["httpRequest"]["headers"] = header_obj

# Iterate through rules in ruleGroupList and nest them under the group id
# ruleGroupList has three attributes that need to be handled separately
rule_groups = message.get("ruleGroupList", {})
if rule_groups and isinstance(rule_groups, list):
message["ruleGroupList"] = {}
for rule_group in rule_groups:
group_id = None
if "ruleGroupId" in rule_group and rule_group["ruleGroupId"]:
group_id = rule_group.pop("ruleGroupId", None)
if not group_id in message["ruleGroupList"]:
NoisomePossum marked this conversation as resolved.
Show resolved Hide resolved
message["ruleGroupList"][group_id] = {}

# Extract the terminating rule and nest it under its own id
Copy link
Contributor

@hghotra hghotra Jun 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's refactor the three following blocks (if conditions) into separate functions to reduce the length & complexity of the parse_aws_waf_logs function.

if "terminatingRule" in rule_group and rule_group["terminatingRule"]:
terminating_rule = rule_group.pop("terminatingRule", None)
if "ruleId" in terminating_rule and terminating_rule["ruleId"]:
rule_id = terminating_rule.pop("ruleId", None)
if not "terminatingRule" in message["ruleGroupList"][group_id]:
message["ruleGroupList"][group_id]["terminatingRule"] = {}
message["ruleGroupList"][group_id]["terminatingRule"].update(
{rule_id: terminating_rule}
)

# Iterate through array of non-terminating rules and nest each under its own id
if "nonTerminatingMatchingRules" in rule_group and isinstance(
rule_group["nonTerminatingMatchingRules"], list
):
non_terminating_rules = rule_group.pop(
"nonTerminatingMatchingRules", None
)
if non_terminating_rules and isinstance(non_terminating_rules, list):
for rule in non_terminating_rules:
if "ruleId" in rule and rule["ruleId"]:
rule_id = rule.pop("ruleId", None)
if (
not "nonTerminatingMatchingRules"
in message["ruleGroupList"][group_id]
):
message["ruleGroupList"][group_id][
"nonTerminatingMatchingRules"
] = {}
message["ruleGroupList"][group_id][
"nonTerminatingMatchingRules"
].update({rule_id: rule})

# Iterate through array of excluded rules and nest each under its own id
if "excludedRules" in rule_group and isinstance(
rule_group["excludedRules"], list
):
excluded_rules = rule_group.pop("excludedRules", None)
for rule in excluded_rules:
if "ruleId" in rule and rule["ruleId"]:
rule_id = rule.pop("ruleId", None)
if not "excludedRules" in message["ruleGroupList"][group_id]:
message["ruleGroupList"][group_id]["excludedRules"] = {}
message["ruleGroupList"][group_id]["excludedRules"].update(
{rule_id: rule}
)

rate_based_rules = message.get("rateBasedRuleList", {})
if rate_based_rules and isinstance(rate_based_rules, list):
rule_obj = {}
for rule in rate_based_rules:
if "rateBasedRuleName" in rule and rule["rateBasedRuleName"]:
name = rule.pop("rateBasedRuleName", None)
rule_obj.update({name: rule})
message["rateBasedRuleList"] = rule_obj

non_terminating_rules = message.get("nonTerminatingMatchingRules", {})
if non_terminating_rules and isinstance(non_terminating_rules, list):
rule_obj = {}
for rule in non_terminating_rules:
if "ruleId" in rule and rule["ruleId"]:
rule_id = rule.pop("ruleId", None)
rule_obj.update({rule_id: rule})
message["nonTerminatingMatchingRules"] = rule_obj

event_copy["message"] = message
return event_copy


def separate_security_hub_findings(event):
"""Replace Security Hub event with series of events based on findings

Expand Down
242 changes: 241 additions & 1 deletion aws/logs_monitoring/tests/test_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@

env_patch = patch.dict(os.environ, {"DD_API_KEY": "11111111111111111111111111111111"})
env_patch.start()
from parsing import parse_event_source, separate_security_hub_findings
from parsing import (
parse_event_source,
separate_security_hub_findings,
parse_aws_waf_logs,
)

env_patch.stop()

Expand Down Expand Up @@ -233,6 +237,242 @@ def test_s3_source_if_none_found(self):
self.assertEqual(parse_event_source({"Records": ["logs-from-s3"]}, ""), "s3")


class TestParseAwsWafLogs(unittest.TestCase):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add two more cases here where event comes in as a string:

  • valid JSON
  • invalid JSON

def test_waf_headers(self):
event = {
"ddsource": "waf",
"message": {
"httpRequest": {
"headers": [
{"name": "header1", "value": "value1"},
{"name": "header2", "value": "value2"},
]
}
},
}
self.assertEqual(
parse_aws_waf_logs(event),
{
"ddsource": "waf",
"message": {
"httpRequest": {
"headers": {"header1": "value1", "header2": "value2"}
}
},
},
)

def test_waf_non_terminating_matching_rules(self):
event = {
"ddsource": "waf",
"message": {
"nonTerminatingMatchingRules": [
{"ruleId": "nonterminating1", "action": "COUNT"},
{"ruleId": "nonterminating2", "action": "COUNT"},
]
},
}
self.assertEqual(
parse_aws_waf_logs(event),
{
"ddsource": "waf",
"message": {
"nonTerminatingMatchingRules": {
"nonterminating2": {"action": "COUNT"},
"nonterminating1": {"action": "COUNT"},
}
},
},
)

def test_waf_rate_based_rules(self):
event = {
"ddsource": "waf",
"message": {
"rateBasedRuleList": [
{
"limitValue": "195.154.122.189",
"rateBasedRuleName": "tf-rate-limit-5-min",
"rateBasedRuleId": "arn:aws:wafv2:ap-southeast-2:068133125972_MANAGED:regional/ipset/0f94bd8b-0fa5-4865-81ce-d11a60051fb4_fef50279-8b9a-4062-b733-88ecd1cfd889_IPV4/fef50279-8b9a-4062-b733-88ecd1cfd889",
"maxRateAllowed": 300,
"limitKey": "IP",
},
{
"limitValue": "195.154.122.189",
"rateBasedRuleName": "no-rate-limit",
"rateBasedRuleId": "arn:aws:wafv2:ap-southeast-2:068133125972_MANAGED:regional/ipset/0f94bd8b-0fa5-4865-81ce-d11a60051fb4_fef50279-8b9a-4062-b733-88ecd1cfd889_IPV4/fef50279-8b9a-4062-b733-88ecd1cfd889",
"maxRateAllowed": 300,
"limitKey": "IP",
},
]
},
}
self.assertEqual(
parse_aws_waf_logs(event),
{
"ddsource": "waf",
"message": {
"rateBasedRuleList": {
"tf-rate-limit-5-min": {
"rateBasedRuleId": "arn:aws:wafv2:ap-southeast-2:068133125972_MANAGED:regional/ipset/0f94bd8b-0fa5-4865-81ce-d11a60051fb4_fef50279-8b9a-4062-b733-88ecd1cfd889_IPV4/fef50279-8b9a-4062-b733-88ecd1cfd889",
"limitValue": "195.154.122.189",
"maxRateAllowed": 300,
"limitKey": "IP",
},
"no-rate-limit": {
"rateBasedRuleId": "arn:aws:wafv2:ap-southeast-2:068133125972_MANAGED:regional/ipset/0f94bd8b-0fa5-4865-81ce-d11a60051fb4_fef50279-8b9a-4062-b733-88ecd1cfd889_IPV4/fef50279-8b9a-4062-b733-88ecd1cfd889",
"limitValue": "195.154.122.189",
"maxRateAllowed": 300,
"limitKey": "IP",
},
}
},
},
)

def test_waf_rule_group_with_excluded_and_nonterminating_rules(self):
event = {
"ddsource": "waf",
"message": {
"ruleGroupList": [
{
"ruleGroupId": "AWS#AWSManagedRulesSQLiRuleSet",
"terminatingRule": {
"ruleId": "SQLi_QUERYARGUMENTS",
"action": "BLOCK",
},
"nonTerminatingMatchingRules": [
{
"exclusionType": "REGULAR",
"ruleId": "first_nonterminating",
},
{
"exclusionType": "REGULAR",
"ruleId": "second_nonterminating",
},
],
"excludedRules": [
{
"exclusionType": "EXCLUDED_AS_COUNT",
"ruleId": "GenericRFI_BODY",
},
{
"exclusionType": "EXCLUDED_AS_COUNT",
"ruleId": "second_exclude",
},
],
}
]
},
}
self.assertEqual(
parse_aws_waf_logs(event),
{
"ddsource": "waf",
"message": {
"ruleGroupList": {
"AWS#AWSManagedRulesSQLiRuleSet": {
"nonTerminatingMatchingRules": {
"second_nonterminating": {"exclusionType": "REGULAR"},
"first_nonterminating": {"exclusionType": "REGULAR"},
},
"excludedRules": {
"GenericRFI_BODY": {
"exclusionType": "EXCLUDED_AS_COUNT"
},
"second_exclude": {
"exclusionType": "EXCLUDED_AS_COUNT"
},
},
"terminatingRule": {
"SQLi_QUERYARGUMENTS": {"action": "BLOCK"}
},
}
}
},
},
)

def test_waf_rule_group_two_rules_same_group_id(self):
event = {
"ddsource": "waf",
"message": {
"ruleGroupList": [
{
"ruleGroupId": "AWS#AWSManagedRulesSQLiRuleSet",
"terminatingRule": {
"ruleId": "SQLi_QUERYARGUMENTS",
"action": "BLOCK",
},
},
{
"ruleGroupId": "AWS#AWSManagedRulesSQLiRuleSet",
"terminatingRule": {"ruleId": "secondRULE", "action": "BLOCK"},
},
]
},
}
self.assertEqual(
parse_aws_waf_logs(event),
{
"ddsource": "waf",
"message": {
"ruleGroupList": {
"AWS#AWSManagedRulesSQLiRuleSet": {
"terminatingRule": {
"SQLi_QUERYARGUMENTS": {"action": "BLOCK"},
"secondRULE": {"action": "BLOCK"},
}
}
}
},
},
)

def test_waf_rule_group_three_rules_two_group_ids(self):
event = {
"ddsource": "waf",
"message": {
"ruleGroupList": [
{
"ruleGroupId": "AWS#AWSManagedRulesSQLiRuleSet",
"terminatingRule": {
"ruleId": "SQLi_QUERYARGUMENTS",
"action": "BLOCK",
},
},
{
"ruleGroupId": "AWS#AWSManagedRulesSQLiRuleSet",
"terminatingRule": {"ruleId": "secondRULE", "action": "BLOCK"},
},
{
"ruleGroupId": "A_DIFFERENT_ID",
"terminatingRule": {"ruleId": "thirdRULE", "action": "BLOCK"},
},
]
},
}
self.assertEqual(
parse_aws_waf_logs(event),
{
"ddsource": "waf",
"message": {
"ruleGroupList": {
"AWS#AWSManagedRulesSQLiRuleSet": {
"terminatingRule": {
"SQLi_QUERYARGUMENTS": {"action": "BLOCK"},
"secondRULE": {"action": "BLOCK"},
}
},
"A_DIFFERENT_ID": {
"terminatingRule": {"thirdRULE": {"action": "BLOCK"}}
},
}
},
},
)


class TestParseSecurityHubEvents(unittest.TestCase):
def test_security_hub_no_findings(self):
event = {"ddsource": "securityhub"}
Expand Down