Skip to content

Commit

Permalink
Python: Add metrics instrumentation (microsoft#8317)
Browse files Browse the repository at this point in the history
### Motivation and Context

<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->
We very recently added instrumentation to kernel functions:
microsoft#8280. That PR added
logs and traces for observing a kernel function. This PR adds metrics
and modifies the telemetry sample app to show how to export metric
instruments to Application Insights.

### Description

<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->
1. Kernel function now produces metrics data for observability.
2. Add code to export metric instruments to the telemetry sample.
3. Add Google AI as a second service to the telemetry sample to show
case nested spans.
4. Mics optimizations.


![image](https://github.com/user-attachments/assets/141e9bd2-06b2-4399-a913-f8aeb12aa8af)


![image](https://github.com/user-attachments/assets/fd35ad84-c59f-418c-b9e6-84beb4b22eb1)


![image](https://github.com/user-attachments/assets/1f40fac0-23fa-4b3e-b6c9-644e654c0cbd)


### Contribution Checklist

<!-- Before submitting this PR, please make sure: -->

- [x] The code builds clean without any errors or warnings
- [x] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [x] All unit tests pass, and I have added new tests where possible
- [x] I didn't break anyone 😄
  • Loading branch information
TaoChenOSU committed Aug 22, 2024
1 parent 85b72a3 commit 1e46c79
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class KernelFunction
private static readonly Histogram<double> s_invocationDuration = s_meter.CreateHistogram<double>(
name: "semantic_kernel.function.invocation.duration",
unit: "s",
description: "Measures the duration of a functions execution");
description: "Measures the duration of a function's execution");

/// <summary><see cref="Histogram{T}"/> to record function streaming duration.</summary>
/// <remarks>
Expand All @@ -47,7 +47,7 @@ public abstract class KernelFunction
private static readonly Histogram<double> s_streamingDuration = s_meter.CreateHistogram<double>(
name: "semantic_kernel.function.streaming.duration",
unit: "s",
description: "Measures the duration of a functions streaming execution");
description: "Measures the duration of a function's streaming execution");

/// <summary>
/// Gets the name of the function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,12 @@ pip install azure-monitor-opentelemetry-exporter==1.0.0b24

### Logs and traces

Go to your Application Insights instance, click on _Transaction search_ on the left menu. Use the operation id output by the program to search for the logs and traces associated with the operation. Click on any of the search result to view the end-to-end transaction details. Read more [here](https://learn.microsoft.com/en-us/azure/azure-monitor/app/transaction-search-and-diagnostics?tabs=transaction-search).
Go to your Application Insights instance, click on _Transaction search_ on the left menu. Use the operation id output by the program to search for the logs and traces associated with the operation. Click on any of the search result to view the end-to-end transaction details. Read more [here](https://learn.microsoft.com/en-us/azure/azure-monitor/app/transaction-search-and-diagnostics?tabs=transaction-search).

### Metrics

Running the application once will only generate one set of measurements (for each metrics). Run the application a couple times to generate more sets of measurements.

> Note: Make sure not to run the program too frequently. Otherwise, you may get throttled.
Please refer to here on how to analyze metrics in [Azure Monitor](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/analyze-metrics).
106 changes: 87 additions & 19 deletions python/samples/demos/telemetry_with_application_insights/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,45 @@
import asyncio
import logging

from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter, AzureMonitorTraceExporter
from azure.monitor.opentelemetry.exporter import (
AzureMonitorLogExporter,
AzureMonitorMetricExporter,
AzureMonitorTraceExporter,
)
from opentelemetry import trace
from opentelemetry._logs import set_logger_provider
from opentelemetry.metrics import set_meter_provider
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.metrics.view import DropAggregation, View
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.trace import set_tracer_provider

from samples.demos.telemetry_with_application_insights.repo_utils import get_sample_plugin_path
from samples.demos.telemetry_with_application_insights.telemetry_sample_settings import TelemetrySampleSettings
from semantic_kernel.connectors.ai.google.google_ai.services.google_ai_chat_completion import GoogleAIChatCompletion
from semantic_kernel.connectors.ai.open_ai.services.open_ai_chat_completion import OpenAIChatCompletion
from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings
from semantic_kernel.functions.kernel_arguments import KernelArguments
from semantic_kernel.kernel import Kernel

# Load settings
settings = TelemetrySampleSettings.create()

# Create a resource to represent the service/sample
resource = Resource.create({ResourceAttributes.SERVICE_NAME: "TelemetryExample"})


def set_up_logging():
log_exporter = AzureMonitorLogExporter(connection_string=settings.connection_string)

# Create and set a global logger provider for the application.
logger_provider = LoggerProvider()
logger_provider = LoggerProvider(resource=resource)
# Log processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter))
Expand All @@ -49,22 +64,85 @@ def set_up_tracing():
trace_exporter = AzureMonitorTraceExporter(connection_string=settings.connection_string)

# Initialize a trace provider for the application. This is a factory for creating tracers.
tracer_provider = TracerProvider()
tracer_provider = TracerProvider(resource=resource)
# Span processors are initialized with an exporter which is responsible
# for sending the telemetry data to a particular backend.
tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter))
# Sets the global default tracer provider
set_tracer_provider(tracer_provider)


def set_up_metrics():
metric_exporter = AzureMonitorMetricExporter(connection_string=settings.connection_string)

# Initialize a metric provider for the application. This is a factory for creating meters.
metric_reader = PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000)
meter_provider = MeterProvider(
metric_readers=[metric_reader],
resource=resource,
views=[
# Dropping all instrument names except for those starting with "semantic_kernel"
View(instrument_name="*", aggregation=DropAggregation()),
View(instrument_name="semantic_kernel*"),
],
)
# Sets the global default meter provider
set_meter_provider(meter_provider)


set_up_logging()
set_up_tracing()
set_up_metrics()


async def run_plugin(kernel: Kernel, plugin_name: str, service_id: str):
"""Run a plugin with the given service ID."""
plugin = kernel.get_plugin(plugin_name)

poem = await kernel.invoke(
function=plugin["ShortPoem"],
arguments=KernelArguments(
input="Write a poem about John Doe.",
settings={
service_id: PromptExecutionSettings(service_id=service_id),
},
),
)
print(f"Poem:\n{poem}")

print("\nTranslated poem:")
async for update in kernel.invoke_stream(
function=plugin["Translate"],
arguments=KernelArguments(
input=poem,
language="Italian",
settings={
service_id: PromptExecutionSettings(service_id=service_id),
},
),
):
print(update[0].content, end="")
print()


async def run_service(kernel: Kernel, plugin_name: str, service_id: str):
"""Run a service with the given service ID."""
print(f"================ Running service {service_id} ================")

tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span(service_id) as current_span:
try:
await run_plugin(kernel, plugin_name=plugin_name, service_id=service_id)
except Exception as e:
current_span.record_exception(e)
print(f"Error running service {service_id}: {e}")


async def main():
# Initialize the kernel
kernel = Kernel()
kernel.add_service(OpenAIChatCompletion(service_id="open_ai", ai_model_id="gpt-3.5-turbo"))
kernel.add_service(OpenAIChatCompletion(service_id="open_ai"))
kernel.add_service(GoogleAIChatCompletion(service_id="google_ai"))

# Add the sample plugin
if (sample_plugin_path := get_sample_plugin_path()) is None:
Expand All @@ -79,21 +157,11 @@ async def main():
with tracer.start_as_current_span("main") as current_span:
print(f"Trace ID: {current_span.get_span_context().trace_id}")

poem = await kernel.invoke(
function=plugin["ShortPoem"],
arguments=KernelArguments(input="Write a poem about John Doe."),
)
print(f"Poem:\n{poem}")

print("\nTranslated poem:")
async for update in kernel.invoke_stream(
function=plugin["Translate"],
arguments=KernelArguments(
input=poem,
language="Italian",
),
):
print(update[0].content, end="")
# Run the OpenAI service
await run_service(kernel, plugin_name=plugin.name, service_id="open_ai")

# Run the GoogleAI service
await run_service(kernel, plugin_name=plugin.name, service_id="google_ai")


if __name__ == "__main__":
Expand Down
32 changes: 27 additions & 5 deletions python/semantic_kernel/functions/kernel_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from inspect import isasyncgen, isgenerator
from typing import TYPE_CHECKING, Any

from opentelemetry import trace
from opentelemetry import metrics, trace
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE

from semantic_kernel.filters.filter_types import FilterTypes
from semantic_kernel.filters.functions.function_invocation_context import FunctionInvocationContext
Expand Down Expand Up @@ -38,8 +39,11 @@
from semantic_kernel.prompt_template.prompt_template_base import PromptTemplateBase
from semantic_kernel.prompt_template.prompt_template_config import PromptTemplateConfig

# Logger, tracer and meter for observability
logger: logging.Logger = logging.getLogger(__name__)
tracer: trace.Tracer = trace.get_tracer(__name__)
meter: metrics.Meter = metrics.get_meter_provider().get_meter(__name__)
MEASUREMENT_FUNCTION_TAG_NAME: str = "semantic_kernel.function.name"

TEMPLATE_FORMAT_MAP = {
KERNEL_TEMPLATE_FORMAT_NAME: KernelPromptTemplate,
Expand Down Expand Up @@ -71,6 +75,17 @@ class KernelFunction(KernelBaseModel):

metadata: KernelFunctionMetadata

invocation_duration_histogram: metrics.Histogram = meter.create_histogram(
"semantic_kernel.function.invocation.duration",
unit="s",
description="Measures the duration of a function's execution",
)
streaming_duration_histogram: metrics.Histogram = meter.create_histogram(
"semantic_kernel.function.streaming.duration",
unit="s",
description="Measures the duration of a function's streaming execution",
)

@classmethod
def from_prompt(
cls,
Expand Down Expand Up @@ -212,6 +227,7 @@ async def invoke(
KernelFunctionLogMessages.log_function_invoking(logger, self.fully_qualified_name)
KernelFunctionLogMessages.log_function_arguments(logger, arguments)

attributes = {MEASUREMENT_FUNCTION_TAG_NAME: self.fully_qualified_name}
starting_time_stamp = time.perf_counter()
try:
stack = kernel.construct_call_stack(
Expand All @@ -225,10 +241,11 @@ async def invoke(

return function_context.result
except Exception as e:
self._handle_exception(current_span, e)
self._handle_exception(current_span, e, attributes)
raise e
finally:
duration = time.perf_counter() - starting_time_stamp
self.invocation_duration_histogram.record(duration, attributes)
KernelFunctionLogMessages.log_function_completed(logger, duration)

@abstractmethod
Expand Down Expand Up @@ -270,6 +287,7 @@ async def invoke_stream(
KernelFunctionLogMessages.log_function_streaming_invoking(logger, self.fully_qualified_name)
KernelFunctionLogMessages.log_function_arguments(logger, arguments)

attributes = {MEASUREMENT_FUNCTION_TAG_NAME: self.fully_qualified_name}
starting_time_stamp = time.perf_counter()
try:
stack = kernel.construct_call_stack(
Expand All @@ -288,10 +306,11 @@ async def invoke_stream(
else:
yield function_context.result
except Exception as e:
self._handle_exception(current_span, e)
self._handle_exception(current_span, e, attributes)
raise e
finally:
duration = time.perf_counter() - starting_time_stamp
self.streaming_duration_histogram.record(duration, attributes)
KernelFunctionLogMessages.log_function_streaming_completed(logger, duration)

def function_copy(self, plugin_name: str | None = None) -> "KernelFunction":
Expand All @@ -309,15 +328,18 @@ def function_copy(self, plugin_name: str | None = None) -> "KernelFunction":
cop.metadata.plugin_name = plugin_name
return cop

def _handle_exception(self, current_span: trace.Span, exception: Exception) -> None:
def _handle_exception(self, current_span: trace.Span, exception: Exception, attributes: dict[str, str]) -> None:
"""Handle the exception.
Args:
current_span (trace.Span): The current span.
exception (Exception): The exception.
attributes (Attributes): The attributes to be modified.
"""
attributes[ERROR_TYPE] = type(exception).__name__

current_span.record_exception(exception)
current_span.set_attribute(ERROR_TYPE, type(exception).__name__)
current_span.set_status(trace.StatusCode.ERROR, description=str(exception))
current_span.set_attribute("error.type", type(exception).__name__)

KernelFunctionLogMessages.log_function_error(logger, exception)
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (c) Microsoft. All rights reserved.

import logging
from logging import Logger

from semantic_kernel.functions.function_result import FunctionResult
Expand Down Expand Up @@ -30,6 +31,9 @@ def log_function_invoked_success(logger: Logger, kernel_function_name: str):
@staticmethod
def log_function_result_value(logger: Logger, function_result: FunctionResult | None):
"""Log message when a kernel function result is returned."""
if not logger.isEnabledFor(logging.DEBUG):
return

if function_result is not None:
try:
logger.debug("Function result: %s", function_result)
Expand Down

0 comments on commit 1e46c79

Please sign in to comment.