From 1e46c794cc44f56d891acbaa4d00af0b481af72a Mon Sep 17 00:00:00 2001 From: Tao Chen Date: Thu, 22 Aug 2024 08:34:28 -0700 Subject: [PATCH] Python: Add metrics instrumentation (#8317) ### Motivation and Context We very recently added instrumentation to kernel functions: https://github.com/microsoft/semantic-kernel/pull/8280. That PR added logs and traces for observing a kernel function. This PR adds metrics and modifies the telemetry sample app to show how to export metric instruments to Application Insights. ### Description 1. Kernel function now produces metrics data for observability. 2. Add code to export metric instruments to the telemetry sample. 3. Add Google AI as a second service to the telemetry sample to show case nested spans. 4. Mics optimizations. ![image](https://github.com/user-attachments/assets/141e9bd2-06b2-4399-a913-f8aeb12aa8af) ![image](https://github.com/user-attachments/assets/fd35ad84-c59f-418c-b9e6-84beb4b22eb1) ![image](https://github.com/user-attachments/assets/1f40fac0-23fa-4b3e-b6c9-644e654c0cbd) ### Contribution Checklist - [x] The code builds clean without any errors or warnings - [x] The PR follows the [SK Contribution Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md) and the [pre-submission formatting script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts) raises no violations - [x] All unit tests pass, and I have added new tests where possible - [x] I didn't break anyone :smile: --- .../Functions/KernelFunction.cs | 4 +- .../README.md | 10 +- .../main.py | 106 ++++++++++++++---- .../functions/kernel_function.py | 32 +++++- .../functions/kernel_function_log_messages.py | 4 + 5 files changed, 129 insertions(+), 27 deletions(-) diff --git a/dotnet/src/SemanticKernel.Abstractions/Functions/KernelFunction.cs b/dotnet/src/SemanticKernel.Abstractions/Functions/KernelFunction.cs index b838d7b30261..149dbf108ece 100644 --- a/dotnet/src/SemanticKernel.Abstractions/Functions/KernelFunction.cs +++ b/dotnet/src/SemanticKernel.Abstractions/Functions/KernelFunction.cs @@ -37,7 +37,7 @@ public abstract class KernelFunction private static readonly Histogram s_invocationDuration = s_meter.CreateHistogram( name: "semantic_kernel.function.invocation.duration", unit: "s", - description: "Measures the duration of a function’s execution"); + description: "Measures the duration of a function's execution"); /// to record function streaming duration. /// @@ -47,7 +47,7 @@ public abstract class KernelFunction private static readonly Histogram s_streamingDuration = s_meter.CreateHistogram( name: "semantic_kernel.function.streaming.duration", unit: "s", - description: "Measures the duration of a function’s streaming execution"); + description: "Measures the duration of a function's streaming execution"); /// /// Gets the name of the function. diff --git a/python/samples/demos/telemetry_with_application_insights/README.md b/python/samples/demos/telemetry_with_application_insights/README.md index d5773c71f503..feadef29c0d0 100644 --- a/python/samples/demos/telemetry_with_application_insights/README.md +++ b/python/samples/demos/telemetry_with_application_insights/README.md @@ -38,4 +38,12 @@ pip install azure-monitor-opentelemetry-exporter==1.0.0b24 ### Logs and traces -Go to your Application Insights instance, click on _Transaction search_ on the left menu. Use the operation id output by the program to search for the logs and traces associated with the operation. Click on any of the search result to view the end-to-end transaction details. Read more [here](https://learn.microsoft.com/en-us/azure/azure-monitor/app/transaction-search-and-diagnostics?tabs=transaction-search). \ No newline at end of file +Go to your Application Insights instance, click on _Transaction search_ on the left menu. Use the operation id output by the program to search for the logs and traces associated with the operation. Click on any of the search result to view the end-to-end transaction details. Read more [here](https://learn.microsoft.com/en-us/azure/azure-monitor/app/transaction-search-and-diagnostics?tabs=transaction-search). + +### Metrics + +Running the application once will only generate one set of measurements (for each metrics). Run the application a couple times to generate more sets of measurements. + +> Note: Make sure not to run the program too frequently. Otherwise, you may get throttled. + +Please refer to here on how to analyze metrics in [Azure Monitor](https://learn.microsoft.com/en-us/azure/azure-monitor/essentials/analyze-metrics). \ No newline at end of file diff --git a/python/samples/demos/telemetry_with_application_insights/main.py b/python/samples/demos/telemetry_with_application_insights/main.py index 7c4837373c8a..724737d2033f 100644 --- a/python/samples/demos/telemetry_with_application_insights/main.py +++ b/python/samples/demos/telemetry_with_application_insights/main.py @@ -3,30 +3,45 @@ import asyncio import logging -from azure.monitor.opentelemetry.exporter import AzureMonitorLogExporter, AzureMonitorTraceExporter +from azure.monitor.opentelemetry.exporter import ( + AzureMonitorLogExporter, + AzureMonitorMetricExporter, + AzureMonitorTraceExporter, +) from opentelemetry import trace from opentelemetry._logs import set_logger_provider +from opentelemetry.metrics import set_meter_provider from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler from opentelemetry.sdk._logs.export import BatchLogRecordProcessor +from opentelemetry.sdk.metrics import MeterProvider +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader +from opentelemetry.sdk.metrics.view import DropAggregation, View +from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.semconv.resource import ResourceAttributes from opentelemetry.trace import set_tracer_provider from samples.demos.telemetry_with_application_insights.repo_utils import get_sample_plugin_path from samples.demos.telemetry_with_application_insights.telemetry_sample_settings import TelemetrySampleSettings +from semantic_kernel.connectors.ai.google.google_ai.services.google_ai_chat_completion import GoogleAIChatCompletion from semantic_kernel.connectors.ai.open_ai.services.open_ai_chat_completion import OpenAIChatCompletion +from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings from semantic_kernel.functions.kernel_arguments import KernelArguments from semantic_kernel.kernel import Kernel # Load settings settings = TelemetrySampleSettings.create() +# Create a resource to represent the service/sample +resource = Resource.create({ResourceAttributes.SERVICE_NAME: "TelemetryExample"}) + def set_up_logging(): log_exporter = AzureMonitorLogExporter(connection_string=settings.connection_string) # Create and set a global logger provider for the application. - logger_provider = LoggerProvider() + logger_provider = LoggerProvider(resource=resource) # Log processors are initialized with an exporter which is responsible # for sending the telemetry data to a particular backend. logger_provider.add_log_record_processor(BatchLogRecordProcessor(log_exporter)) @@ -49,7 +64,7 @@ def set_up_tracing(): trace_exporter = AzureMonitorTraceExporter(connection_string=settings.connection_string) # Initialize a trace provider for the application. This is a factory for creating tracers. - tracer_provider = TracerProvider() + tracer_provider = TracerProvider(resource=resource) # Span processors are initialized with an exporter which is responsible # for sending the telemetry data to a particular backend. tracer_provider.add_span_processor(BatchSpanProcessor(trace_exporter)) @@ -57,14 +72,77 @@ def set_up_tracing(): set_tracer_provider(tracer_provider) +def set_up_metrics(): + metric_exporter = AzureMonitorMetricExporter(connection_string=settings.connection_string) + + # Initialize a metric provider for the application. This is a factory for creating meters. + metric_reader = PeriodicExportingMetricReader(metric_exporter, export_interval_millis=5000) + meter_provider = MeterProvider( + metric_readers=[metric_reader], + resource=resource, + views=[ + # Dropping all instrument names except for those starting with "semantic_kernel" + View(instrument_name="*", aggregation=DropAggregation()), + View(instrument_name="semantic_kernel*"), + ], + ) + # Sets the global default meter provider + set_meter_provider(meter_provider) + + set_up_logging() set_up_tracing() +set_up_metrics() + + +async def run_plugin(kernel: Kernel, plugin_name: str, service_id: str): + """Run a plugin with the given service ID.""" + plugin = kernel.get_plugin(plugin_name) + + poem = await kernel.invoke( + function=plugin["ShortPoem"], + arguments=KernelArguments( + input="Write a poem about John Doe.", + settings={ + service_id: PromptExecutionSettings(service_id=service_id), + }, + ), + ) + print(f"Poem:\n{poem}") + + print("\nTranslated poem:") + async for update in kernel.invoke_stream( + function=plugin["Translate"], + arguments=KernelArguments( + input=poem, + language="Italian", + settings={ + service_id: PromptExecutionSettings(service_id=service_id), + }, + ), + ): + print(update[0].content, end="") + print() + + +async def run_service(kernel: Kernel, plugin_name: str, service_id: str): + """Run a service with the given service ID.""" + print(f"================ Running service {service_id} ================") + + tracer = trace.get_tracer(__name__) + with tracer.start_as_current_span(service_id) as current_span: + try: + await run_plugin(kernel, plugin_name=plugin_name, service_id=service_id) + except Exception as e: + current_span.record_exception(e) + print(f"Error running service {service_id}: {e}") async def main(): # Initialize the kernel kernel = Kernel() - kernel.add_service(OpenAIChatCompletion(service_id="open_ai", ai_model_id="gpt-3.5-turbo")) + kernel.add_service(OpenAIChatCompletion(service_id="open_ai")) + kernel.add_service(GoogleAIChatCompletion(service_id="google_ai")) # Add the sample plugin if (sample_plugin_path := get_sample_plugin_path()) is None: @@ -79,21 +157,11 @@ async def main(): with tracer.start_as_current_span("main") as current_span: print(f"Trace ID: {current_span.get_span_context().trace_id}") - poem = await kernel.invoke( - function=plugin["ShortPoem"], - arguments=KernelArguments(input="Write a poem about John Doe."), - ) - print(f"Poem:\n{poem}") - - print("\nTranslated poem:") - async for update in kernel.invoke_stream( - function=plugin["Translate"], - arguments=KernelArguments( - input=poem, - language="Italian", - ), - ): - print(update[0].content, end="") + # Run the OpenAI service + await run_service(kernel, plugin_name=plugin.name, service_id="open_ai") + + # Run the GoogleAI service + await run_service(kernel, plugin_name=plugin.name, service_id="google_ai") if __name__ == "__main__": diff --git a/python/semantic_kernel/functions/kernel_function.py b/python/semantic_kernel/functions/kernel_function.py index 62620ea920ce..b5d541a758ad 100644 --- a/python/semantic_kernel/functions/kernel_function.py +++ b/python/semantic_kernel/functions/kernel_function.py @@ -8,7 +8,8 @@ from inspect import isasyncgen, isgenerator from typing import TYPE_CHECKING, Any -from opentelemetry import trace +from opentelemetry import metrics, trace +from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE from semantic_kernel.filters.filter_types import FilterTypes from semantic_kernel.filters.functions.function_invocation_context import FunctionInvocationContext @@ -38,8 +39,11 @@ from semantic_kernel.prompt_template.prompt_template_base import PromptTemplateBase from semantic_kernel.prompt_template.prompt_template_config import PromptTemplateConfig +# Logger, tracer and meter for observability logger: logging.Logger = logging.getLogger(__name__) tracer: trace.Tracer = trace.get_tracer(__name__) +meter: metrics.Meter = metrics.get_meter_provider().get_meter(__name__) +MEASUREMENT_FUNCTION_TAG_NAME: str = "semantic_kernel.function.name" TEMPLATE_FORMAT_MAP = { KERNEL_TEMPLATE_FORMAT_NAME: KernelPromptTemplate, @@ -71,6 +75,17 @@ class KernelFunction(KernelBaseModel): metadata: KernelFunctionMetadata + invocation_duration_histogram: metrics.Histogram = meter.create_histogram( + "semantic_kernel.function.invocation.duration", + unit="s", + description="Measures the duration of a function's execution", + ) + streaming_duration_histogram: metrics.Histogram = meter.create_histogram( + "semantic_kernel.function.streaming.duration", + unit="s", + description="Measures the duration of a function's streaming execution", + ) + @classmethod def from_prompt( cls, @@ -212,6 +227,7 @@ async def invoke( KernelFunctionLogMessages.log_function_invoking(logger, self.fully_qualified_name) KernelFunctionLogMessages.log_function_arguments(logger, arguments) + attributes = {MEASUREMENT_FUNCTION_TAG_NAME: self.fully_qualified_name} starting_time_stamp = time.perf_counter() try: stack = kernel.construct_call_stack( @@ -225,10 +241,11 @@ async def invoke( return function_context.result except Exception as e: - self._handle_exception(current_span, e) + self._handle_exception(current_span, e, attributes) raise e finally: duration = time.perf_counter() - starting_time_stamp + self.invocation_duration_histogram.record(duration, attributes) KernelFunctionLogMessages.log_function_completed(logger, duration) @abstractmethod @@ -270,6 +287,7 @@ async def invoke_stream( KernelFunctionLogMessages.log_function_streaming_invoking(logger, self.fully_qualified_name) KernelFunctionLogMessages.log_function_arguments(logger, arguments) + attributes = {MEASUREMENT_FUNCTION_TAG_NAME: self.fully_qualified_name} starting_time_stamp = time.perf_counter() try: stack = kernel.construct_call_stack( @@ -288,10 +306,11 @@ async def invoke_stream( else: yield function_context.result except Exception as e: - self._handle_exception(current_span, e) + self._handle_exception(current_span, e, attributes) raise e finally: duration = time.perf_counter() - starting_time_stamp + self.streaming_duration_histogram.record(duration, attributes) KernelFunctionLogMessages.log_function_streaming_completed(logger, duration) def function_copy(self, plugin_name: str | None = None) -> "KernelFunction": @@ -309,15 +328,18 @@ def function_copy(self, plugin_name: str | None = None) -> "KernelFunction": cop.metadata.plugin_name = plugin_name return cop - def _handle_exception(self, current_span: trace.Span, exception: Exception) -> None: + def _handle_exception(self, current_span: trace.Span, exception: Exception, attributes: dict[str, str]) -> None: """Handle the exception. Args: current_span (trace.Span): The current span. exception (Exception): The exception. + attributes (Attributes): The attributes to be modified. """ + attributes[ERROR_TYPE] = type(exception).__name__ + current_span.record_exception(exception) + current_span.set_attribute(ERROR_TYPE, type(exception).__name__) current_span.set_status(trace.StatusCode.ERROR, description=str(exception)) - current_span.set_attribute("error.type", type(exception).__name__) KernelFunctionLogMessages.log_function_error(logger, exception) diff --git a/python/semantic_kernel/functions/kernel_function_log_messages.py b/python/semantic_kernel/functions/kernel_function_log_messages.py index 834e1c1a2624..a41e7f9adcc4 100644 --- a/python/semantic_kernel/functions/kernel_function_log_messages.py +++ b/python/semantic_kernel/functions/kernel_function_log_messages.py @@ -1,5 +1,6 @@ # Copyright (c) Microsoft. All rights reserved. +import logging from logging import Logger from semantic_kernel.functions.function_result import FunctionResult @@ -30,6 +31,9 @@ def log_function_invoked_success(logger: Logger, kernel_function_name: str): @staticmethod def log_function_result_value(logger: Logger, function_result: FunctionResult | None): """Log message when a kernel function result is returned.""" + if not logger.isEnabledFor(logging.DEBUG): + return + if function_result is not None: try: logger.debug("Function result: %s", function_result)