Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(llmobs): submit span events rather than llmobs records #8339

Merged
merged 38 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
3f5a59a
Make DD_API_KEY, DD_APP_KEY, DD_SITE global configs
Yun-Kim Jan 31, 2024
cca1be4
Fix openai/langchain/bedrock tests for api/app key config
Yun-Kim Feb 1, 2024
3741ee0
Add DD_API_KEY, DD_APP_KEY, DD_SITE to test/utils override global config
Yun-Kim Feb 1, 2024
d8addd6
Move llmobs_enabled, llmobs_sample_rate to global config
Yun-Kim Feb 1, 2024
2839a5b
Add auto LLMObs enabling to ddtrace-run
Yun-Kim Feb 6, 2024
c2095ba
Merge branch 'main' into yunkim/llmobs-enable-flag
Yun-Kim Feb 6, 2024
0364144
revert enabling in ddtrace-run
Yun-Kim Feb 6, 2024
7a9fed5
fmt
Yun-Kim Feb 6, 2024
6b0a08e
wip: llmobs service
Yun-Kim Feb 6, 2024
01396e0
Merge branch 'main' into yunkim/llmobs-chain
Yun-Kim Feb 7, 2024
0857615
Enable on openai patch, remove span starting logic so far
Yun-Kim Feb 7, 2024
e7d10a2
Enable bedrock llmobs service, service properly initializes
Yun-Kim Feb 8, 2024
5004dd9
fmt
Yun-Kim Feb 8, 2024
40a445c
fmt
Yun-Kim Feb 8, 2024
b58ef37
Move ddtrace/internal/llmobs to ddtrace/llmobs/, set up new test suite
Yun-Kim Feb 8, 2024
70608c0
Revert setting llmobs span type to future PR
Yun-Kim Feb 8, 2024
496e100
fmt
Yun-Kim Feb 8, 2024
285865b
Missed import changes
Yun-Kim Feb 8, 2024
f085b03
fmt
Yun-Kim Feb 8, 2024
24b8762
missing import changes
Yun-Kim Feb 8, 2024
59acf84
Rename test cassette files, riot lockfiles
Yun-Kim Feb 8, 2024
3ba3f95
Revert to using pytest 7.4.4
Yun-Kim Feb 9, 2024
f5e7aad
Add tests for llmobs service enabling
Yun-Kim Feb 9, 2024
2117e30
fmt
Yun-Kim Feb 9, 2024
b6b774e
Revert LLMObsTraceProcessor logic for now
Yun-Kim Feb 9, 2024
e63a110
fmt
Yun-Kim Feb 9, 2024
57a3c4b
fix test
Yun-Kim Feb 9, 2024
6d727eb
Submit spans rather than llmobs records:
Yun-Kim Feb 9, 2024
f8a6a60
Change llmobs writer tests to submit spans not records
Yun-Kim Feb 12, 2024
0288f00
Fix openai/bedrock/llmobs writer tests to expect LLMObs spans (not
Yun-Kim Feb 13, 2024
4105e1d
Set LLM span type on completion/chat-completion spans
Yun-Kim Feb 13, 2024
bed9879
Merge branch 'main' into yunkim/llmobs-span
Yun-Kim Feb 13, 2024
b90c62f
fmt
Yun-Kim Feb 13, 2024
6bee4e8
Revert any complex trace processor behavior other than extracting LLM
Yun-Kim Feb 13, 2024
985c4b7
Merge branch 'main' into yunkim/llmobs-span
emmettbutler Feb 14, 2024
32f3bff
Rename variable to match span event rather than removed llm record
Yun-Kim Feb 14, 2024
c7d15a1
Span struct changes:
Yun-Kim Feb 14, 2024
5010eb8
Merge branch 'main' into yunkim/llmobs-span
Yun-Kim Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ddtrace/contrib/botocore/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ def unpatch():
botocore.client._datadog_patch = False
unwrap(botocore.parsers.ResponseParser, "parse")
unwrap(botocore.client.BaseClient, "_make_api_call")
if LLMObs.enabled:
LLMObs.disable()


def patch_submodules(submodules):
Expand Down
17 changes: 10 additions & 7 deletions ddtrace/contrib/botocore/services/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from typing import Optional

from ddtrace._trace.span import Span
from ddtrace.ext import SpanTypes
from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._integrations import BedrockIntegration
from ddtrace.vendor import wrapt
Expand Down Expand Up @@ -106,17 +107,17 @@ def _process_response(self, formatted_response: Dict[str, Any], metadata: Dict[s
"bedrock.response.choices.{}.finish_reason".format(i), str(formatted_response["finish_reason"][i])
)
if self._datadog_integration.is_pc_sampled_llmobs(self._datadog_span):
self._datadog_integration.generate_llm_record(
self._datadog_integration.llmobs_set_tags(
self._datadog_span, formatted_response=formatted_response, prompt=self._prompt
)


def _handle_exception(span, integration, prompt, exc_info):
"""Helper method to finish the span on stream read error."""
span.set_exc_info(*exc_info)
span.finish()
if integration.is_pc_sampled_llmobs(span):
integration.generate_llm_record(span, formatted_response=None, prompt=prompt, err=1)
integration.llmobs_set_tags(span, formatted_response=None, prompt=prompt, err=True)
span.finish()


def _extract_request_params(params: Dict[str, Any], provider: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -291,10 +292,11 @@ def handle_bedrock_request(span: Span, integration: BedrockIntegration, params:
span.set_tag_str("bedrock.request.model", model_name)
prompt = None
for k, v in request_params.items():
if k == "prompt" and integration.is_pc_sampled_span(span):
v = integration.trunc(str(v))
if k == "prompt" and integration.is_pc_sampled_llmobs(span):
prompt = v
if k == "prompt":
if integration.is_pc_sampled_llmobs(span):
prompt = v
if integration.is_pc_sampled_span(span):
v = integration.trunc(str(v))
span.set_tag_str("bedrock.request.{}".format(k), str(v))
return prompt

Expand Down Expand Up @@ -329,6 +331,7 @@ def patched_bedrock_api_call(original_func, instance, args, kwargs, function_var
service=schematize_service_name("{}.{}".format(pin.service, endpoint_name)),
resource=operation,
activate=False,
span_type=SpanTypes.LLM,
)
prompt = None
try:
Expand Down
8 changes: 6 additions & 2 deletions ddtrace/contrib/openai/_endpoint_hooks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from ddtrace.ext import SpanTypes

from .utils import _compute_prompt_token_count
from .utils import _format_openai_api_key
from .utils import _is_async_generator
Expand Down Expand Up @@ -203,6 +205,7 @@ class _CompletionHook(_BaseCompletionHook):

def _record_request(self, pin, integration, span, args, kwargs):
super()._record_request(pin, integration, span, args, kwargs)
span.span_type = SpanTypes.LLM
Yun-Kim marked this conversation as resolved.
Show resolved Hide resolved
if integration.is_pc_sampled_span(span):
prompt = kwargs.get("prompt", "")
if isinstance(prompt, str):
Expand All @@ -225,7 +228,7 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):
span, "info" if error is None else "error", "sampled %s" % self.OPERATION_ID, attrs=attrs_dict
)
if integration.is_pc_sampled_llmobs(span):
integration.generate_completion_llm_records(resp, error, span, kwargs)
integration.llmobs_set_tags("completion", resp, error, span, kwargs)
if not resp:
return
for choice in resp.choices:
Expand Down Expand Up @@ -258,6 +261,7 @@ class _ChatCompletionHook(_BaseCompletionHook):

def _record_request(self, pin, integration, span, args, kwargs):
super()._record_request(pin, integration, span, args, kwargs)
span.span_type = SpanTypes.LLM
for idx, m in enumerate(kwargs.get("messages", [])):
if integration.is_pc_sampled_span(span):
span.set_tag_str(
Expand All @@ -279,7 +283,7 @@ def _record_response(self, pin, integration, span, args, kwargs, resp, error):
span, "info" if error is None else "error", "sampled %s" % self.OPERATION_ID, attrs=attrs_dict
)
if integration.is_pc_sampled_llmobs(span):
integration.generate_chat_llm_records(resp, error, span, kwargs)
integration.llmobs_set_tags("chat", resp, error, span, kwargs)
if not resp:
return
for choice in resp.choices:
Expand Down
3 changes: 2 additions & 1 deletion ddtrace/contrib/openai/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ def patch():
def unpatch():
# FIXME: add unpatching. The current wrapping.unwrap method requires
# the wrapper function to be provided which we don't keep a reference to.
pass
if LLMObs.enabled:
LLMObs.disable()


def _patched_client_init(openai, integration):
Expand Down
1 change: 1 addition & 0 deletions ddtrace/ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class SpanTypes(object):
WORKER = "worker"
AUTH = "auth"
SYSTEM = "system"
LLM = "llm"


class SpanKind(object):
Expand Down
49 changes: 0 additions & 49 deletions ddtrace/llmobs/_integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from ddtrace.internal.hostname import get_hostname
from ddtrace.internal.utils.formats import asbool
from ddtrace.llmobs._log_writer import V2LogWriter
from ddtrace.llmobs._writer import LLMObsWriter
from ddtrace.sampler import RateSampler
from ddtrace.settings import IntegrationConfig

Expand All @@ -30,7 +29,6 @@ def __init__(self, integration_config: IntegrationConfig) -> None:
# Ideally the metrics client should live on the tracer or some other core
# object that is strongly linked with configuration.
self._log_writer = None
self._llmobs_writer = None
self._statsd = None
self.integration_config = integration_config
self._span_pc_sampler = RateSampler(sample_rate=integration_config.span_prompt_completion_sample_rate)
Expand All @@ -52,29 +50,8 @@ def __init__(self, integration_config: IntegrationConfig) -> None:
)
self._log_pc_sampler = RateSampler(sample_rate=integration_config.log_prompt_completion_sample_rate)
self.start_log_writer()

if self.llmobs_enabled:
if not config._dd_api_key:
raise ValueError(
f"DD_API_KEY is required for sending LLMObs data from the {self._integration_name} integration. "
f"To use the {self._integration_name} integration without LLMObs, "
f"set `DD_{self._integration_name.upper()}_LLMOBS_ENABLED=false`."
)
if not config._dd_app_key:
raise ValueError(
f"DD_APP_KEY is required for sending LLMObs payloads from the {self._integration_name} integration."
f" To use the {self._integration_name} integration without LLMObs, "
f"set `DD_{self._integration_name.upper()}_LLMOBS_ENABLED=false`."
)
self._llmobs_writer = LLMObsWriter(
site=config._dd_site,
api_key=config._dd_api_key,
app_key=config._dd_app_key,
interval=float(os.getenv("_DD_%s_LLM_WRITER_INTERVAL" % self._integration_name.upper(), "1.0")),
timeout=float(os.getenv("_DD_%s_LLM_WRITER_TIMEOUT" % self._integration_name.upper(), "2.0")),
)
self._llmobs_pc_sampler = RateSampler(sample_rate=config._llmobs_sample_rate)
self.start_llm_writer()

@property
def metrics_enabled(self) -> bool:
Expand Down Expand Up @@ -116,11 +93,6 @@ def start_log_writer(self) -> None:
return
self._log_writer.start()

def start_llm_writer(self) -> None:
if not self.llmobs_enabled or self._llmobs_writer is None:
return
self._llmobs_writer.start()

@abc.abstractmethod
def _set_base_span_tags(self, span: Span, **kwargs) -> None:
"""Set default LLM span attributes when possible."""
Expand Down Expand Up @@ -202,24 +174,3 @@ def trunc(self, text: str) -> str:
if len(text) > self.integration_config.span_char_limit:
text = text[: self.integration_config.span_char_limit] + "..."
return text

@classmethod
@abc.abstractmethod
def _llmobs_tags(cls, span: Span) -> List[str]:
"""Generate a list of llmobs tags from a given span."""
return []

def llm_record(self, span: Span, attrs: Dict[str, Any], tags: Optional[List[str]] = None) -> None:
"""Create a LLM record to send to the LLM Obs intake."""
if not self.llmobs_enabled or self._llmobs_writer is None:
return
llmobs_tags = self._llmobs_tags(span)
if span is not None and span.sampled:
# FIXME: this is a temporary workaround until we figure out why 128 bit trace IDs are stored as decimals.
llmobs_tags.insert(0, "dd.trace_id:{:x}".format(span.trace_id))
# llmobs_tags.insert(0, "dd.trace_id:{}".format(span.trace_id))
llmobs_tags.insert(1, "dd.span_id:{}".format(span.span_id))
if tags:
llmobs_tags += tags
attrs["ddtags"] = llmobs_tags
self._llmobs_writer.enqueue(attrs) # type: ignore[arg-type]
90 changes: 31 additions & 59 deletions ddtrace/llmobs/_integrations/bedrock.py
Original file line number Diff line number Diff line change
@@ -1,80 +1,52 @@
import time
import json
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
import uuid

from ddtrace import config
from ddtrace._trace.span import Span

from .base import BaseLLMIntegration
from ddtrace.llmobs._integrations import BaseLLMIntegration


class BedrockIntegration(BaseLLMIntegration):
_integration_name = "bedrock"

@classmethod
def _llmobs_tags(cls, span: Span) -> List[str]:
tags = [
"version:%s" % (config.version or ""),
"env:%s" % (config.env or ""),
"service:%s" % (span.service or ""),
"source:integration",
"model_name:%s" % (span.get_tag("bedrock.request.model") or ""),
"model_provider:%s" % (span.get_tag("bedrock.request.model_provider") or ""),
"error:%d" % span.error,
]
err_type = span.get_tag("error.type")
if err_type:
tags.append("error_type:%s" % err_type)
return tags

def generate_llm_record(
def llmobs_set_tags(
self,
span: Span,
formatted_response: Optional[Dict[str, Any]] = None,
prompt: Optional[str] = None,
err: bool = False,
) -> None:
"""Generate payloads for the LLM Obs API from a completion."""
if not self.llmobs_enabled:
return
metrics = self._set_llmobs_metrics(span, formatted_response)
meta = {
"model_name": span.get_tag("bedrock.request.model"),
"model_provider": span.get_tag("bedrock.request.model_provider"),
"kind": "llm",
"input": {
"messages": [{"content": prompt}],
"parameters": {
"temperature": float(span.get_tag("bedrock.request.temperature") or 0.0),
"max_tokens": int(span.get_tag("bedrock.request.max_tokens") or 0),
},
},
}
if err or formatted_response is None:
record = _llmobs_record(span, prompt)
record["id"] = str(uuid.uuid4())
record["output"]["completions"] = [{"content": ""}]
record["output"]["errors"] = [span.get_tag("error.message")]
self.llm_record(span, record)
return
for i in range(len(formatted_response["text"])):
meta["output"] = {"messages": [{"content": ""}]}
else:
meta["output"] = {"messages": [{"content": completion} for completion in formatted_response["text"]]}
# Since span tags have to be strings, we have to json dump the data here and load on the trace processor.
span.set_tag_str("ml_obs.meta", json.dumps(meta))
span.set_tag_str("ml_obs.metrics", json.dumps(metrics))

@staticmethod
def _set_llmobs_metrics(span: Span, formatted_response: Optional[Dict[str, Any]]) -> Dict[str, Any]:
metrics = {}
if formatted_response and formatted_response.get("text"):
prompt_tokens = int(span.get_tag("bedrock.usage.prompt_tokens") or 0)
completion_tokens = int(span.get_tag("bedrock.usage.completion_tokens") or 0)
record = _llmobs_record(span, prompt)
record["id"] = span.get_tag("bedrock.response.id")
record["input"]["prompt_tokens"] = [prompt_tokens]
record["output"]["completions"] = [{"content": formatted_response["text"][i]}]
record["output"]["completion_tokens"] = [completion_tokens]
record["output"]["total_tokens"] = [prompt_tokens + completion_tokens]
self.llm_record(span, record)


def _llmobs_record(span: Span, prompt: Optional[str]) -> Dict[str, Any]:
"""LLMObs bedrock record template."""
now = time.time()
record = {
"type": "completion",
"id": str(uuid.uuid4()),
"timestamp": int(span.start * 1000),
"model": span.get_tag("bedrock.request.model"),
"model_provider": span.get_tag("bedrock.request.model_provider"),
"input": {
"prompts": [prompt],
"temperature": float(span.get_tag("bedrock.request.temperature") or 0.0),
"max_tokens": int(span.get_tag("bedrock.request.max_tokens") or 0),
},
"output": {
"durations": [now - span.start],
},
}
return record
metrics["prompt_tokens"] = prompt_tokens
metrics["completion_tokens"] = completion_tokens
metrics["total_tokens"] = prompt_tokens + completion_tokens
return metrics
Loading
Loading