Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
wconti27 committed Jun 22, 2023
1 parent a054991 commit a3ca78d
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 13 deletions.
84 changes: 71 additions & 13 deletions ddapm_test_agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import sys
from typing import Awaitable
from typing import Callable
from typing import Dict
from typing import List
from typing import Literal
from typing import Optional
Expand Down Expand Up @@ -159,7 +160,8 @@ def __init__(self):
# Token to be used if running test cases synchronously
self._requests: List[Request] = []
self._rc_server = RemoteConfigServer()
self._trace_failures: List[str] = []
self._trace_failures: Dict[str, List[str]] = {"default": []}
self._trace_check_results_by_check: Dict[str, Dict[str, int]] = {}
self._forward_endpoints: List[str] = [
"/v0.4/traces",
"/v0.5/traces",
Expand Down Expand Up @@ -187,17 +189,56 @@ async def traces(self) -> TraceMap:
_traces[trace_id].append(s)
return _traces

async def clear_trace_check_failures(self, request: Request) -> web.Response:
"""Clear traces by session token or all traces if none is provided."""
token = request["session_token"]
if token:
trace_failures = self._trace_failures.get(token, [])
log.info(f"Clearing {len(trace_failures)} Trace Check Failures for Token {token}")
log.info(trace_failures)
del self._trace_failures[token]
self._trace_check_results_by_check = {}
else:
trace_check_failures = [
fails for fails_by_session in self._trace_failures.values() for fails in fails_by_session
]
n_failures = len(trace_check_failures)
log.info(f"Clearing {n_failures} Trace Check Failures for Default Session (No Token).")
log.info(self._trace_failures)
log.info(self._trace_check_results_by_check)
self._trace_failures = {"default": []}
self._trace_check_results_by_check = {}
return web.HTTPOk()

def get_trace_check_failures(self, request: Request) -> web.Response:
"""Return the Trace Check failures that occurred, if pooling is enabled as a request."""
trace_failures = self._trace_failures
token = request["session_token"]

if token:
trace_check_failures = self._trace_failures.get(token, [])
n_failures = len(trace_check_failures)
log.info(f"{n_failures} Trace Failures Occurred for Token {token}.")
else:
trace_check_failures = [
fails for fails_by_session in self._trace_failures.values() for fails in fails_by_session
]
n_failures = len(trace_check_failures)
log.info(f"{n_failures} Trace Failures Occurred for Default Session (No Token, counting all checks)")

if len(trace_failures) > 0:
failure_message = f"APM Test Agent Validation failed with {len(trace_failures)} Trace Check failures.\n"
for trace_check_message in trace_failures:
trace_check_full_results = self._trace_check_results_by_check

if n_failures > 0:
failure_message = f"APM Test Agent Validation failed with {n_failures} Trace Check failures.\n"
for trace_check_message in trace_check_failures:
failure_message += trace_check_message

failure_message += f"\nAPM Test Agent Trace Check Results by Check -------------------------------"
failure_message += f"\n{json.dumps(trace_check_full_results, indent=4)}"
return web.HTTPBadRequest(text=failure_message)
else:
return web.HTTPOk()
message = f"APM Test Agent Trace Check Results by Check --------------------------------"
message += f"\n{json.dumps(trace_check_full_results, indent=4)}"
return web.HTTPOk(text=message)

async def apmtelemetry(self) -> List[TelemetryEvent]:
"""Return the telemetry events stored by the agent"""
Expand Down Expand Up @@ -718,25 +759,41 @@ async def check_failure_middleware(self, request: Request, handler: _Handler) ->
try:
response = await handler(request)
except AssertionError as e:
token = request["session_token"]
self._trace_check_results_by_check = trace.get_results(self._trace_check_results_by_check)
# only save trace failures to memory if necessary
msg = str(trace) + str(e)
log.error(msg)
if request.app["pool_trace_check_failures"]:
log.info(f"storing failure with message {msg}")
self._trace_failures.append(msg)
log.info(self._trace_failures)
if token:
log.info(f"Storing Trace Check Failure for Session Token: {token}.")
self._trace_failures[token] = (
[msg] if token not in self._trace_failures else self._trace_failures[token].append(msg)
)
else:
log.info(f"Storing Trace Check Failure for Default Session (No Token).")
self._trace_failures["default"].append(msg)
log.error(msg)
return web.HTTPBadRequest(body=msg)
else:
token = request["session_token"]
self._trace_check_results_by_check = trace.get_results(self._trace_check_results_by_check)
if trace.has_fails():
# only save trace failures to memory if necessary
msg = str(trace)
log.error(msg)
if request.app["pool_trace_check_failures"]:
log.info(f"storing failure with message {msg}")
self._trace_failures.append(msg)
if token:
log.info(f"Storing Trace Check Failure for Session Token: {token}.")
self._trace_failures[token] = (
[msg] if token not in self._trace_failures else self._trace_failures[token].append(msg)
)
else:
log.info(f"Storing Trace Check Failure for Default Session (No Token).")
self._trace_failures["default"].append(msg)
log.error(msg)
if request.app["disable_error_responses"]:
return response
return web.HTTPBadRequest(body=msg)
self._trace_check_results_by_check = trace.get_results(self._trace_check_results_by_check)
return response


Expand Down Expand Up @@ -791,6 +848,7 @@ def make_app(
# web.get("/test/benchmark", agent.handle_test_traces),
web.get("/test/trace/analyze", agent.handle_trace_analyze),
web.get("/test/trace_check/failures", agent.get_trace_check_failures),
web.get("/test/trace_check/clear", agent.clear_trace_check_failures),
]
)
checks = Checks(
Expand Down
47 changes: 47 additions & 0 deletions ddapm_test_agent/checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dataclasses
import textwrap
from typing import Any
from typing import Dict
from typing import Generator
from typing import List
from typing import Tuple
Expand Down Expand Up @@ -39,6 +40,36 @@ def has_fails(self) -> bool:
return True
return False

def get_results(self, results: Dict[str, Dict[str, int]]) -> Dict[str, Dict[str, int]]:
"""
results = {
check.name: {
"Passed_Checks": int
"Failed_Checks": int
"Skipped_Checks": int
}
...
}
"""
for c in self._checks:
if c.failed:
if c.name in results:
results[c.name]["Failed_Checks"] += 1
else:
results[c.name] = {"Passed_Checks": 0, "Failed_Checks": 1, "Skipped_Checks": 0}
elif c.skipped:
if c.name in results:
results[c.name]["Skipped_Checks"] += 1
else:
results[c.name] = {"Passed_Checks": 0, "Failed_Checks": 0, "Skipped_Checks": 1}
else:
if c.name in results:
results[c.name]["Passed_Checks"] += 1
else:
results[c.name] = {"Passed_Checks": 1, "Failed_Checks": 0, "Skipped_Checks": 0}
return results

def __repr__(self) -> str:
return f"<CheckTraceFrame name='{self._name}' children={len(self._children)}>"

Expand Down Expand Up @@ -85,6 +116,11 @@ def frames_dfs(self) -> Generator[Tuple[CheckTraceFrame, int], None, None]:
def has_fails(self) -> bool:
return len([f for f in self.frames() if f.has_fails()]) > 0

def get_results(self, results: Dict[str, Dict[str, int]]) -> Dict[str, Dict[str, int]]:
for f in self.frames():
results = f.get_results(results)
return results

def __str__(self) -> str:
s = ""
# TODO?: only include frames that have fails
Expand Down Expand Up @@ -165,6 +201,17 @@ async def check(self, name: str, *args: Any, **kwargs: Any) -> None:
else:
check.check(*args, **kwargs)

def get_check_results(self) -> Dict[str, Dict[str, int]]:
results: Dict[str, Dict[str, int]] = {}
for c in self.checks:
if hasattr(c, "passed_checks"):
results[c.name] = {
"passed_checks": c.passed_checks,
"failed_checks": c.failed_checks,
"skipped_checks": c.skipped_checks,
}
return results


def start_trace(msg: str) -> CheckTrace:
trace = CheckTrace(CheckTraceFrame(name=msg))
Expand Down

0 comments on commit a3ca78d

Please sign in to comment.