Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

instrument obj store methods for traces and spans #1223

Draft
wants to merge 17 commits into
base: local-telemetry-agent
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions runhouse/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,6 @@ def _print_status(status_data: dict, current_cluster: Cluster) -> None:
cluster_name, style=f"link {cluster_link_in_den_ui} white"
)
console.print(cluster_name_hyperlink)
<<<<<<< HEAD

has_cuda: bool = cluster_config.get("has_cuda")
=======
>>>>>>> 8560211c (status cmd prints cluster name as hyperlink to den (#1226))

has_cuda: bool = cluster_config.get("has_cuda")

Expand Down
12 changes: 0 additions & 12 deletions runhouse/resources/hardware/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,18 +343,8 @@ def endpoint(self, external: bool = False):
external (bool, optional): If ``True``, will only return the external url, and will return ``None``
otherwise (e.g. if a tunnel is required). If set to ``False``, will either return the external url
if it exists, or will set up the connection (based on connection_type) and return the internal url
<<<<<<< HEAD
<<<<<<< HEAD
(including the local connected port rather than the sever port). If cluster is not up, returns
`None``. (Default: ``False``)
=======
(including the local connected port rather than the sever port). If cluster is not up, returns None.
(Default: ``False``)
>>>>>>> c2289235 (Update docstrings (#1230))
=======
(including the local connected port rather than the sever port). If cluster is not up, returns
`None``. (Default: ``False``)
>>>>>>> bbbd6411 (More docstrings cleanup (#1232))
"""
if not self.address or self.on_this_cluster():
return None
Expand Down Expand Up @@ -631,7 +621,6 @@ def put(self, key: str, obj: Any, env: str = None):
obj (Any): Object to put in the object store
env (str, optional): Env of the object store to put the object in. (Default: ``None``)
"""

if self.on_this_cluster():
return obj_store.put(key, obj, env=env)
return self.call_client_method(
Expand Down Expand Up @@ -820,7 +809,6 @@ def connect_server_client(self, force_reconnect=False):
system=self,
)


def status(self, send_to_den: bool = False):
"""Load the status of the Runhouse daemon running on a cluster.

Expand Down
8 changes: 0 additions & 8 deletions runhouse/servers/cluster_servlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,6 @@ async def __init__(
threading.Lock()
) # will be used when self.gpu_metrics will be updated by different threads.

if self.cluster_config.get("has_cuda"):
logger.debug("Creating _periodic_gpu_check thread.")
collect_gpu_thread = threading.Thread(
target=self._periodic_gpu_check, daemon=True
)
collect_gpu_thread.start()


if self.cluster_config.get("has_cuda"):
logger.debug("Creating _periodic_gpu_check thread.")
collect_gpu_thread = threading.Thread(
Expand Down
156 changes: 153 additions & 3 deletions runhouse/servers/obj_store.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import copy
import inspect
import json
import logging
import os
import time
Expand All @@ -10,12 +11,19 @@
from typing import Any, Dict, List, Optional, Set, Union

import ray

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from pydantic import BaseModel

import runhouse as rh
from runhouse.logger import get_logger

from runhouse.rns.defaults import req_ctx
from runhouse.rns.utils.api import ResourceVisibility
from runhouse.rns.utils.api import generate_uuid, ResourceVisibility
from runhouse.utils import (
arun_in_thread,
generate_default_name,
Expand Down Expand Up @@ -119,6 +127,86 @@ async def wrapper(self, *args, **kwargs):
return wrapper


def trace_method():
def decorator(func):
@wraps(func)
async def wrapper(self, *args, **kwargs):
tracer = self.tracer
if tracer is None:
return await func(self, *args, **kwargs)

span_id = kwargs.pop("span_id", None) or generate_uuid()
span_name = f"{self.__class__.__name__}.{func.__name__}"
func_name = func.__name__
cluster_config = self.cluster_config

try:
from opentelemetry.trace.status import Status, StatusCode

with tracer.start_as_current_span(span_name) as span:
try:
span.set_attribute("function_name", func_name)
span.set_attribute("span_id", span_id)
for k, v in cluster_config.items():
if isinstance(v, rh.Resource):
v = {
"name": v.name,
"rns_address": v.rns_address,
"resource_type": v.RESOURCE_TYPE,
}
span_key = f"rh.{k}"
try:
json_value = json.dumps(v)
span.set_attribute(span_key, json_value)
except TypeError:
# If the value is not JSON serializable, convert it to a string
span.set_attribute(span_key, str(v))

# Set attributes for arguments
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())
# Ignore the 'self' arg
for i, arg in enumerate(args, start=1):
if i < len(param_names):
span.set_attribute(param_names[i], str(arg))
for arg_name, arg_value in kwargs.items():
span.set_attribute(arg_name, str(arg_value))

# Manually add log to the span
span.add_event(f"Starting execution for func: {func_name}")

result = await func(self, *args, **kwargs)

# Add another log event for successful execution
span.add_event(f"Finished execution for func: {func_name}")
span.set_status(Status(StatusCode.OK))
return result

except Exception as e:
# Log the exception in the span
span.add_event(
"Exception occurred",
{
"exception.type": type(e).__name__,
"exception.message": str(e),
},
)
span.record_exception(e)
span.set_status(Status(StatusCode.ERROR, str(e)))
raise Exception

except Exception as e:
# Catch any OpenTelemetry-related exceptions
logger.warning(f"OpenTelemetry error in {func_name}: {e}")

# Execute the function without tracing
return await func(self, *args, **kwargs)

return wrapper

return decorator


class ObjStore:
"""Class to handle internal IPC and storage for Runhouse.

Expand Down Expand Up @@ -148,10 +236,71 @@ def __init__(self):
self.cluster_config: Optional[Dict[str, Any]] = None
self.imported_modules = {}
self.installed_envs = {} # TODO: consider deleting it?
self._kv_store: Dict[Any, Any] = None
self.env_servlet_cache = {}
self.active_function_calls = {}
self._kv_store: Dict[Any, Any] = None
self._enable_observability = None
self._telemetry_agent = None
self._tracer = None

##############################################
# Telemetry
##############################################
@property
def enable_observability(self):
if self._enable_observability is None:
# If cluster config is None don't enable - means the cluster is not fully set up or in logged out scenario
self._enable_observability = (
self.cluster_config.get("enable_observability", True)
if self.cluster_config
else False
)
return self._enable_observability

@property
def telemetry_agent(self):
if (
rh.here.on_this_cluster()
and self.enable_observability
and self._telemetry_agent is None
):
self._telemetry_agent = self._initialize_telemetry_agent()
return self._telemetry_agent

@property
def tracer(self):
if self.telemetry_agent and self._tracer is None:
self._tracer = self._initialize_tracer()
return self._tracer

def _initialize_telemetry_agent(self):
from runhouse.servers.telemetry import TelemetryAgentExporter

try:
ta = TelemetryAgentExporter()
ta.start()
return ta

except Exception as e:
logger.warning(f"Failed to start telemetry agent: {e}")
return None

def _initialize_tracer(self):
trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

# Export to local agent, which handles sending to the backend collector
span_processor = BatchSpanProcessor(
OTLPSpanExporter(
endpoint=f"localhost:{self.telemetry_agent.agent_config.grpc_port}",
insecure=True,
)
)
trace.get_tracer_provider().add_span_processor(span_processor)

return tracer

# ----------------------------------------------------
async def ainitialize(
self,
servlet_name: Optional[str] = None,
Expand Down Expand Up @@ -765,15 +914,16 @@ def get_local(self, key: Any, default: Optional[Any] = None, remote: bool = Fals
raise KeyError(f"No local store exists; key {key} not found.")
return default

@trace_method()
async def aget(
self,
key: Any,
serialization: Optional[str] = None,
remote: bool = False,
default: Optional[Any] = None,
request_id: Optional[str] = None,
):
env_servlet_name_containing_key = await self.aget_env_servlet_name_for_key(key)

if not env_servlet_name_containing_key:
if default == KeyError:
raise KeyError(f"No local store exists; key {key} not found.")
Expand Down
54 changes: 47 additions & 7 deletions runhouse/servers/telemetry/telemetry_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import tarfile
import time
import urllib

from builtins import bool
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
Expand All @@ -22,6 +24,8 @@
TELEMETRY_COLLECTOR_HOST,
TELEMETRY_COLLECTOR_STATUS_URL,
)
from runhouse.globals import configs

from runhouse.logger import get_logger

logger = get_logger(__name__)
Expand Down Expand Up @@ -85,7 +89,7 @@ def log_dir(self) -> Path:
@property
def executable_path(self) -> str:
"""Path to the otel binary."""
return f"{self.bin_dir}/otelcol"
return f"{self.bin_dir}/otelcol-contrib"

@property
def local_config_path(self) -> str:
Expand All @@ -97,6 +101,21 @@ def agent_status_url(self) -> str:
"""Health check URL of the local agent."""
return f"http://localhost:{self.agent_config.health_check_port}"

@classmethod
def auth_token(cls):
# Use the token saved in the local config file (~/.rh/config.yaml)
# Note: this will be the cluster token for the cluster owner
cluster_token = configs.token
if cluster_token is None:
raise ValueError(
"No cluster token found, cannot configure telemetry agent auth"
)
return cluster_token

@classmethod
def request_headers(cls):
return {"authorization": f"Bearer {cls.auth_token()}"}

def _setup_directories(self):
# Note: use paths that are local to the user's home directory and won't necessitate root access on the cluster
for dir_name in ["bin", "config", "logs"]:
Expand All @@ -113,15 +132,30 @@ def _get_verbosity(self) -> str:

def _create_default_config(self):
"""Base config for the local agent, which forwards the collected telemetry data to the collector backend."""
# Use insecure connection if the collector endpoint is not HTTPS (ex: localhost)
collector_endpoint = self.collector_config.endpoint

# Use insecure connection if the collector is not secured with HTTPS (ex: running on localhost)
insecure = False if TELEMETRY_COLLECTOR_HOST in collector_endpoint else True
service_extensions = ["health_check"]

# Note: the receiver does not have any auth enabled, as the agent will be running on localhost
# The auth is configured on the "exporter", since auth is required to send data to the collector
auth_extension = {}
if not insecure:
auth_extension = {
"bearertokenauth/withscheme": {
"scheme": "Bearer",
"token": TelemetryAgentExporter.auth_token(),
}
}
service_extensions.append("bearertokenauth/withscheme")

otel_config = {
"extensions": {
"health_check": {
"endpoint": f"0.0.0.0:{self.agent_config.health_check_port}"
}
},
**auth_extension,
},
"receivers": {
"otlp": {
Expand All @@ -137,10 +171,15 @@ def _create_default_config(self):
"otlp/grpc": {
"endpoint": collector_endpoint,
"tls": {"insecure": insecure},
**(
{"auth": {"authenticator": "bearertokenauth/withscheme"}}
if not insecure
else {}
),
},
},
"service": {
"extensions": ["health_check"],
"extensions": service_extensions,
"pipelines": {
"traces": {
"receivers": ["otlp"],
Expand Down Expand Up @@ -189,8 +228,8 @@ def _generate_install_url(self):
else:
raise ValueError(f"Unsupported system: {system}")

otel_version = self.agent_config.otel_version
binary_url = f"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v{otel_version}/otelcol_{otel_version}_{system}_{arch}.tar.gz"
# Note: version used by agent must match the collector version
binary_url = f"https://github.com/open-telemetry/opentelemetry-collector-releases/releases/download/v0.108.0/otelcol-contrib_0.108.0_{system}_{arch}.tar.gz"

return binary_url

Expand Down Expand Up @@ -261,6 +300,7 @@ def install(self):
logger.debug("Installing OTel agent")
try:
install_url = self._generate_install_url()

logger.debug(f"Downloading OTel agent from url: {install_url}")

# Download and extract
Expand Down Expand Up @@ -300,7 +340,7 @@ def start(self, force_reinstall=False, reload_config=False, timeout=10):
self.install()

if self.is_up() and not reload_config:
logger.info("Otel agent is already running")
logger.debug("Otel agent is already running")
return True

try:
Expand Down
Loading
Loading