Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Exporter] Support redirect response in exporter #20489

Merged
merged 3 commits into from
Sep 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Release History

**Features**
- Support stamp specific redirect in exporters
([#20489](https://github.com/Azure/azure-sdk-for-python/pull/20489))

**Breaking Changes**
- Change exporter OT to AI mapping fields following common schema
([#20445](https://github.com/Azure/azure-sdk-for-python/pull/20445))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import tempfile
from enum import Enum
from typing import List, Any
from urllib.parse import urlparse

from opentelemetry.sdk.trace.export import SpanExportResult

from azure.core.exceptions import HttpResponseError, ServiceRequestError
from azure.core.pipeline.policies import ContentDecodePolicy, HttpLoggingPolicy, RequestIdPolicy
from azure.core.pipeline.policies import ContentDecodePolicy, HttpLoggingPolicy, RedirectPolicy, RequestIdPolicy
from azure.monitor.opentelemetry.exporter._generated import AzureMonitorClient
from azure.monitor.opentelemetry.exporter._generated._configuration import AzureMonitorClientConfiguration
from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
Expand Down Expand Up @@ -43,6 +44,7 @@ def __init__(self, **kwargs: Any) -> None:
self._instrumentation_key = parsed_connection_string.instrumentation_key
self._timeout = 10.0 # networking timeout in seconds
self._api_version = kwargs.get('api_version') or _SERVICE_API_LATEST
self._consecutive_redirects = 0 # To prevent circular redirects

temp_suffix = self._instrumentation_key or ""
default_storage_path = os.path.join(
Expand All @@ -56,7 +58,8 @@ def __init__(self, **kwargs: Any) -> None:
config.user_agent_policy,
config.proxy_policy,
ContentDecodePolicy(**kwargs),
config.redirect_policy,
# Handle redirects in exporter, set new endpoint if redirected
RedirectPolicy(permit_redirects=False),
config.retry_policy,
config.authentication_policy,
config.custom_hook_policy,
Expand Down Expand Up @@ -100,6 +103,7 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
try:
track_response = self.client.track(envelopes)
if not track_response.errors:
self._consecutive_redirects = 0
logger.info("Transmission succeeded: Item received: %s. Items accepted: %s",
track_response.items_received, track_response.items_accepted)
return ExportResult.SUCCESS
Expand All @@ -120,11 +124,33 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
envelopes_to_store = [x.as_dict()
for x in resend_envelopes]
self.storage.put(envelopes_to_store)
self._consecutive_redirects = 0
return ExportResult.FAILED_RETRYABLE

except HttpResponseError as response_error:
if _is_retryable_code(response_error.status_code):
return ExportResult.FAILED_RETRYABLE
if _is_redirect_code(response_error.status_code):
self._consecutive_redirects = self._consecutive_redirects + 1
if self._consecutive_redirects < self.client._config.redirect_policy.max_redirects: # pylint: disable=W0212
if response_error.response and response_error.response.headers:
location = response_error.response.headers.get("location")
if location:
url = urlparse(location)
if url.scheme and url.netloc:
# Change the host to the new redirected host
self.client._config.host = "{}://{}".format(url.scheme, url.netloc) # pylint: disable=W0212
# Attempt to export again
return self._transmit(envelopes)
logger.error(
"Error parsing redirect information."
)
return ExportResult.FAILED_NOT_RETRYABLE
logger.error(
"Error sending telemetry because of circular redirects." \
"Please check the integrity of your connection string."
)
return ExportResult.FAILED_NOT_RETRYABLE
return ExportResult.FAILED_NOT_RETRYABLE
except ServiceRequestError as request_error:
# Errors when we're fairly sure that the server did not receive the
Expand All @@ -140,9 +166,20 @@ def _transmit(self, envelopes: List[TelemetryItem]) -> ExportResult:
return ExportResult.FAILED_NOT_RETRYABLE
return ExportResult.FAILED_NOT_RETRYABLE
# No spans to export
self._consecutive_redirects = 0
return ExportResult.SUCCESS


def _is_redirect_code(response_code: int) -> bool:
"""
Determine if response is a redirect response.
"""
return bool(response_code in(
307, # Temporary redirect
308, # Permanent redirect
))


def _is_retryable_code(response_code: int) -> bool:
"""
Determine if response is retryable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
from opentelemetry.sdk.trace.export import SpanExportResult

from azure.core.exceptions import HttpResponseError, ServiceRequestError
from azure.core.pipeline.transport import HttpResponse
from azure.monitor.opentelemetry.exporter.export._base import (
BaseExporter,
ExportResult,
get_trace_export_result,
)
from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem
from azure.monitor.opentelemetry.exporter._generated import AzureMonitorClient
from azure.monitor.opentelemetry.exporter._generated.models import TelemetryItem, TrackResponse


def throw(exc_type, *args, **kwargs):
Expand Down Expand Up @@ -115,44 +117,54 @@ def test_transmit_from_storage_lease_failure(self, requests_mock):
self._base._transmit_from_storage()
self.assertTrue(self._base.storage.get())

def test_transmit_request_timeout(self):
with mock.patch("requests.Session.request", throw(requests.Timeout)):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_RETRYABLE)

def test_transmit_http_error_retryable(self):
with mock.patch("azure.monitor.opentelemetry.exporter.export._base._is_retryable_code") as m:
m.return_value = True
with mock.patch("requests.Session.request", throw(HttpResponseError)):
with mock.patch.object(AzureMonitorClient, 'track', throw(HttpResponseError)):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_RETRYABLE)

def test_transmit_http_error_retryable(self):
def test_transmit_http_error_not_retryable(self):
with mock.patch("azure.monitor.opentelemetry.exporter.export._base._is_retryable_code") as m:
m.return_value = False
with mock.patch("requests.Session.request", throw(HttpResponseError)):
with mock.patch.object(AzureMonitorClient, 'track', throw(HttpResponseError)):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)

def test_transmit_http_error_redirect(self):
response = HttpResponse(None, None)
response.status_code = 307
response.headers = {"location":"https://example.com"}
prev_redirects = self._base.client._config.redirect_policy.max_redirects
self._base.client._config.redirect_policy.max_redirects = 2
prev_host = self._base.client._config.host
error = HttpResponseError(response=response)
with mock.patch.object(AzureMonitorClient, 'track') as post:
post.side_effect = error
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)
self.assertEqual(post.call_count, 2)
self.assertEqual(self._base.client._config.host, "https://example.com")
self._base.client._config.redirect_policy.max_redirects = prev_redirects
self._base.client._config.host = prev_host

def test_transmit_request_error(self):
with mock.patch("requests.Session.request", throw(ServiceRequestError, message="error")):
with mock.patch.object(AzureMonitorClient, 'track', throw(ServiceRequestError, message="error")):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_RETRYABLE)

def test_transmit_request_exception(self):
with mock.patch("requests.Session.request", throw(Exception)):
with mock.patch.object(AzureMonitorClient, 'track', throw(Exception)):
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.FAILED_NOT_RETRYABLE)

def test_transmission_200(self):
with mock.patch("requests.Session.request") as post:
post.return_value = MockResponse(200, json.dumps(
{
"itemsReceived": 1,
"itemsAccepted": 1,
"errors": [],
}
), reason="OK", content="")
with mock.patch.object(AzureMonitorClient, 'track') as post:
post.return_value = TrackResponse(
items_received=1,
items_accepted=1,
errors=[],
)
result = self._base._transmit(self._envelopes_to_export)
self.assertEqual(result, ExportResult.SUCCESS)

Expand Down