Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch retry #13264

Merged
merged 10 commits into from
Aug 31, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@
StorageRequestHook,
StorageResponseHook,
StorageLoggingPolicy,
StorageHosts, ExponentialRetry,
StorageHosts,
StorageRetryPolicy,
seankane-msft marked this conversation as resolved.
Show resolved Hide resolved
)
from ._error import _process_table_error
from ._models import PartialBatchErrorException
Expand Down Expand Up @@ -391,7 +392,7 @@ def create_configuration(**kwargs):
config.headers_policy = StorageHeadersPolicy(**kwargs)
config.user_agent_policy = UserAgentPolicy(sdk_moniker=SDK_MONIKER, **kwargs)
# sdk_moniker="storage-{}/{}".format(kwargs.pop('storage_sdk'), VERSION), **kwargs)
config.retry_policy = kwargs.get("retry_policy") or ExponentialRetry(**kwargs)
config.retry_policy = kwargs.get("retry_policy") or StorageRetryPolicy(**kwargs)
config.logging_policy = StorageLoggingPolicy(**kwargs)
config.proxy_policy = ProxyPolicy(**kwargs)

Expand Down
80 changes: 57 additions & 23 deletions sdk/tables/azure-data-tables/azure/data/tables/_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
SansIOHTTPPolicy,
NetworkTraceLoggingPolicy,
HTTPPolicy,
RequestHistory
RequestHistory,
RetryPolicy
)
from azure.core.exceptions import AzureError, ServiceRequestError, ServiceResponseError

Expand Down Expand Up @@ -353,18 +354,61 @@ def on_response(self, request, response):
)


class StorageRetryPolicy(HTTPPolicy):
class StorageRetryPolicy(RetryPolicy):
"""
The base class for Exponential and Linear retries containing shared code.
A base class for retry policies for the Table Client and Table Service Client
"""
def __init__(
self,
initial_backoff=15, # type: int
increment_base=3, # type: int
retry_total=10, # type: int
retry_to_secondary=False, # type: bool
random_jitter_range=3, # type: int
**kwargs # type: Any
):
"""
Build a StorageRetryPolicy object.

def __init__(self, **kwargs):
self.total_retries = kwargs.pop('retry_total', 10)
:param int initial_backoff:
The initial backoff interval, in seconds, for the first retry.
:param int increment_base:
The base, in seconds, to increment the initial_backoff by after the
first retry.
:param int retry_total: total number of retries
:param bool retry_to_secondary:
Whether the request should be retried to secondary, if able. This should
only be enabled of RA-GRS accounts are used and potentially stale data
can be handled.
:param int random_jitter_range:
A number in seconds which indicates a range to jitter/randomize for the back-off interval.
For example, a random_jitter_range of 3 results in the back-off interval x to vary between x+3 and x-3.
"""
self.initial_backoff = initial_backoff
self.increment_base = increment_base
self.random_jitter_range = random_jitter_range
self.total_retries = retry_total
self.connect_retries = kwargs.pop('retry_connect', 3)
self.read_retries = kwargs.pop('retry_read', 3)
self.status_retries = kwargs.pop('retry_status', 3)
self.retry_to_secondary = kwargs.pop('retry_to_secondary', False)
super(StorageRetryPolicy, self).__init__()
self.retry_to_secondary = retry_to_secondary
seankane-msft marked this conversation as resolved.
Show resolved Hide resolved
super(StorageRetryPolicy, self).__init__(**kwargs)

def get_backoff_time(self, settings):
"""
Calculates how long to sleep before retrying.
:param dict settings:
:keyword callable cls: A custom type or function that will be passed the direct response
:return:
An integer indicating how long to wait before retrying the request,
or None to indicate no retry should be performed.
:rtype: int or None
"""
random_generator = random.Random()
backoff = self.initial_backoff + (0 if settings['count'] == 0 else pow(self.increment_base, settings['count']))
random_range_start = backoff - self.random_jitter_range if backoff > self.random_jitter_range else 0
random_range_end = backoff + self.random_jitter_range
return random_generator.uniform(random_range_start, random_range_end)

def _set_next_host_location(self, settings, request): # pylint: disable=no-self-use
"""
Expand All @@ -384,7 +428,7 @@ def _set_next_host_location(self, settings, request): # pylint: disable=no-self
updated = url._replace(netloc=settings['hosts'].get(settings['mode']))
request.url = updated.geturl()

def configure_retries(self, request): # pylint: disable=no-self-use
def configure_retries(self, request): # pylint: disable=no-self-use, arguments-differ
# type: (...)-> dict
"""
:param Any request:
Expand Down Expand Up @@ -414,17 +458,8 @@ def configure_retries(self, request): # pylint: disable=no-self-use
'history': []
}

def get_backoff_time(self, settings, **kwargs): # pylint: disable=unused-argument,no-self-use
""" Formula for computing the current backoff.
Should be calculated by child class.
:param Any settings:
:keyword callable cls: A custom type or function that will be passed the direct response
:rtype: float
"""
return 0

def sleep(self, settings, transport):
# type: (...)->None
def sleep(self, settings, transport): # pylint: disable=arguments-differ
# type: (...) -> None
"""
:param Any settings:
:param Any transport:
Expand All @@ -435,7 +470,7 @@ def sleep(self, settings, transport):
return
transport.sleep(backoff)

def increment(self, settings, request, response=None, error=None, **kwargs): # pylint:disable=W0613
def increment(self, settings, request, response=None, error=None, **kwargs): # pylint:disable=unused-argument, arguments-differ
# type: (...)->None
"""Increment the retry counters.

Expand Down Expand Up @@ -565,10 +600,9 @@ def __init__(self, initial_backoff=15, increment_base=3, retry_total=3,
super(ExponentialRetry, self).__init__(
retry_total=retry_total, retry_to_secondary=retry_to_secondary, **kwargs)

def get_backoff_time(self, settings, **kwargs):
def get_backoff_time(self, settings):
"""
Calculates how long to sleep before retrying.
:param **kwargs:
:param dict settings:
:keyword callable cls: A custom type or function that will be passed the direct response
:return:
Expand Down Expand Up @@ -608,7 +642,7 @@ def __init__(self, backoff=15, retry_total=3, retry_to_secondary=False, random_j
super(LinearRetry, self).__init__(
annatisch marked this conversation as resolved.
Show resolved Hide resolved
retry_total=retry_total, retry_to_secondary=retry_to_secondary, **kwargs)

def get_backoff_time(self, settings, **kwargs):
def get_backoff_time(self, settings):
"""
Calculates how long to sleep before retrying.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import logging
from typing import Any, TYPE_CHECKING

from azure.core.pipeline.policies import AsyncHTTPPolicy
from azure.core.pipeline.policies import AsyncHTTPPolicy, AsyncRetryPolicy
from azure.core.exceptions import AzureError

from .._policies import is_retry, StorageRetryPolicy
Expand Down Expand Up @@ -78,12 +78,56 @@ async def send(self, request):
request.context['response_callback'] = response_callback
return response

class AsyncStorageRetryPolicy(StorageRetryPolicy):
"""
The base class for Exponential and Linear retries containing shared code.
"""

async def sleep(self, settings, transport): # pylint: disable =W0236
class AsyncStorageRetryPolicy(AsyncRetryPolicy, StorageRetryPolicy):
"""Exponential retry."""

def __init__(self, initial_backoff=15, increment_base=3, retry_total=3,
retry_to_secondary=False, random_jitter_range=3, **kwargs):
'''
Constructs an Exponential retry object. The initial_backoff is used for
the first retry. Subsequent retries are retried after initial_backoff +
increment_power^retry_count seconds. For example, by default the first retry
occurs after 15 seconds, the second after (15+3^1) = 18 seconds, and the
third after (15+3^2) = 24 seconds.

:param int initial_backoff:
The initial backoff interval, in seconds, for the first retry.
:param int increment_base:
The base, in seconds, to increment the initial_backoff by after the
first retry.
:param int max_attempts:
The maximum number of retry attempts.
:param bool retry_to_secondary:
Whether the request should be retried to secondary, if able. This should
only be enabled of RA-GRS accounts are used and potentially stale data
can be handled.
:param int random_jitter_range:
A number in seconds which indicates a range to jitter/randomize for the back-off interval.
For example, a random_jitter_range of 3 results in the back-off interval x to vary between x+3 and x-3.
'''
self.initial_backoff = initial_backoff
self.increment_base = increment_base
self.random_jitter_range = random_jitter_range
super(AsyncStorageRetryPolicy, self).__init__(
retry_total=retry_total, retry_to_secondary=retry_to_secondary, **kwargs)

def get_backoff_time(self, settings):
"""
Calculates how long to sleep before retrying.

:return:
An integer indicating how long to wait before retrying the request,
or None to indicate no retry should be performed.
:rtype: int or None
"""
random_generator = random.Random()
backoff = self.initial_backoff + (0 if settings['count'] == 0 else pow(self.increment_base, settings['count']))
random_range_start = backoff - self.random_jitter_range if backoff > self.random_jitter_range else 0
random_range_end = backoff + self.random_jitter_range
return random_generator.uniform(random_range_start, random_range_end)

async def sleep(self, settings, transport): # pylint: disable=W0236, arguments-differ
backoff = self.get_backoff_time(settings)
if not backoff or backoff < 0:
return
Expand All @@ -99,7 +143,6 @@ async def send(self, request): # pylint: disable =W0236
if is_retry(response, retry_settings['mode']):
retries_remaining = self.increment(
retry_settings,
request=request.http_request,
response=response.http_response)
if retries_remaining:
await retry_hook(
Expand All @@ -111,8 +154,7 @@ async def send(self, request): # pylint: disable =W0236
continue
break
except AzureError as err:
retries_remaining = self.increment(
retry_settings, request=request.http_request, error=err)
retries_remaining = self.increment(retry_settings, error=err)
if retries_remaining:
await retry_hook(
retry_settings,
Expand Down Expand Up @@ -161,11 +203,10 @@ def __init__(self, initial_backoff=15, increment_base=3, retry_total=3,
super(ExponentialRetry, self).__init__(
retry_total=retry_total, retry_to_secondary=retry_to_secondary, **kwargs)

def get_backoff_time(self, settings, **kwargs):
def get_backoff_time(self, settings):
"""
Calculates how long to sleep before retrying.

:param **kwargs:
:return:
An integer indicating how long to wait before retrying the request,
or None to indicate no retry should be performed.
Expand Down Expand Up @@ -202,7 +243,7 @@ def __init__(self, backoff=15, retry_total=3, retry_to_secondary=False, random_j
super(LinearRetry, self).__init__(
retry_total=retry_total, retry_to_secondary=retry_to_secondary, **kwargs)

def get_backoff_time(self, settings, **kwargs):
def get_backoff_time(self, settings, **kwargs): # pylint: disable=unused-argument
"""
Calculates how long to sleep before retrying.

Expand Down
18 changes: 9 additions & 9 deletions sdk/tables/azure-data-tables/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
timedelta,
)


from azure.data.tables import (
ResourceTypes,
AccountSasPermissions,
TableSasPermissions,
CorsRule,
RetentionPolicy,
UpdateMode,
AccessPolicy,
TableAnalyticsLogging,
ResourceTypes,
AccountSasPermissions,
TableSasPermissions,
CorsRule,
RetentionPolicy,
UpdateMode,
AccessPolicy,
TableAnalyticsLogging,
Metrics
)
from azure.core.pipeline import Pipeline
Expand Down
2 changes: 1 addition & 1 deletion sdk/tables/azure-data-tables/tests/test_table_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ async def test_set_table_acl_with_empty_signed_identifiers(self, resource_group,

@pytest.mark.skip("pending")
@GlobalStorageAccountPreparer()
async def test_set_table_acl_with_empty_signed_identifier(self, resource_group, location, storage_account,
async def test_set_table_acl_with_none_signed_identifier(self, resource_group, location, storage_account,
storage_account_key):
# Arrange
url = self.account_url(storage_account, "table")
Expand Down