Skip to content

Commit

Permalink
update per_call_policies & per_retry_policies (#18406)
Browse files Browse the repository at this point in the history
* update dev doc to add per_call_policies & per_retry_policies

* updates

* update
  • Loading branch information
xiangyan99 committed May 4, 2021
1 parent c1b88c2 commit f545697
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 34 deletions.
18 changes: 16 additions & 2 deletions sdk/core/azure-core/CLIENT_LIBRARY_DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class FooServiceClient:
```

An end user consuming this SDK may write code like so:

```python
from azure.core.credentials import FooCredentials
from azure.foo import FooServiceClient
Expand Down Expand Up @@ -93,23 +94,27 @@ response = client.get_foo_properties()
| Parameters | Description |
| --- | --- |
| `pipeline` | While `PipelineClient` will create a default pipeline, users can opt to use their own pipeline by passing in a `Pipeline` object. If passed in, the other configurations will be ignored. |
| `policies` | While `PipelineClient` will create a default list of `policies`, users can opt to use their own policies by passing in a `policies` object. If passed in, `config` will be ignored |
| `config` | While `PipelineClient` will create a default `Configuration`, users can opt to use their own configuration by passing in a `Configuration` object. If passed in, it will be used to create a `Pipeline` object. |
| `per_call_policies` | If a default `pipeline` is needed and no `policies` is passed in, `per_call_policies` will be added before the `Retry` policy |
| `per_retry_policies` | If a default `pipeline` is needed and no `policies` is passed in, `per_retry_policies` will be added after the `Retry` policy. If there is no `RetryPolicy` in the pipeline, a `ValueError` will be raised |
| `transport` | While `PipelineClient` will create a default `RequestsTransport`, users can opt to use their own transport by passing in a `RequestsTransport` object. If it is omitted, `PipelineClient` will honor the other described [transport customizations](#transport). |


### Transport

Various combinations of sync/async HTTP libraries as well as alternative event loop implementations are available. Therefore to support the widest range of customer scenarios, we must allow a customer to easily swap out the HTTP transport layer to one of those supported.

The transport is the last node in the pipeline, and adheres to the same basic API as any policy within the pipeline.
The only currently available transport for synchronous pipelines uses the `Requests` library:

```python
from azure.core.pipeline.transport import RequestsTransport
synchronous_transport = RequestsTransport()
```

For asynchronous pipelines a couple of transport options are available. Each of these transports are interchangable depending on whether the user has installed various 3rd party dependencies (i.e. aiohttp or trio), and the user
should easily be able to specify their chosen transport. SDK developers should use the `aiohttp` transport as the default for asynchronous pipelines where the user has not specified an alternative.

```python
from azure.foo.aio import FooServiceClient
from azure.core.pipeline.transport import (
Expand All @@ -131,6 +136,7 @@ response = await client.get_foo_properties()

Some common properties can be configured on all transports. They must be passed
as kwargs arguments while building the transport instance. These include the following properties:

```python
transport = AioHttpTransport(
# The connect and read timeout value. Defaults to 100 seconds.
Expand Down Expand Up @@ -190,6 +196,7 @@ proxy_policy.proxies = {'https': 'http://user:password@10.10.1.10:1180/'}
The HttpRequest and HttpResponse objects represent a generic concept of HTTP request and response constructs and are in no way tied to a particular transport or HTTP library.

The HttpRequest has the following API. It does not vary between transports:

```python
class HttpRequest(object):

Expand Down Expand Up @@ -240,6 +247,7 @@ This is to accomodate how the data is extracted for the object returned by the H
There is also an async flavor: AsyncHttpResponse. This is to allow for the asynchronous streaming of
data from the response.
For example:

```python
from azure.core.pipeline.transport import (
RequestsTransportResponse, # HttpResponse
Expand All @@ -248,10 +256,12 @@ from azure.core.pipeline.transport import (
AsyncioRequestsTransportResponse, # AsyncHttpResponse
)
```

The API for each of these response types is identical, so the consumer of the Response need not know about these
particular types.

The HttpResponse has the following API. It does not vary between transports:

```python
class HttpResponse(object):

Expand Down Expand Up @@ -297,6 +307,7 @@ transport specific and can contain data persisted between pipeline requests (for
pool or "session"), as well as used by the SDK developer to carry arbitrary data through the pipeline.

The API for PipelineRequest and PipelineResponse is as follows:

```python
class PipelineRequest(object):

Expand Down Expand Up @@ -327,6 +338,7 @@ This is a simple abstract class, that can act before the request is done, or aft
- Logging the request and/or response

A SansIOHTTPPolicy should implement one or more of the following methods:

```python
def on_request(self, request):
"""Is executed before sending the request to next policy."""
Expand All @@ -345,6 +357,7 @@ def on_exception(self, request):
SansIOHTTPPolicy methods can be declared as coroutines, but then they can only be used with a AsyncPipeline.

Current provided sans IO policies include:

```python
from azure.core.pipeline.policies import (
HeadersPolicy, # Add custom headers to all requests
Expand All @@ -364,6 +377,7 @@ Some policies are more complex, like retry strategy, and need to have control of
In the current version, they are subclasses of HTTPPolicy or AsyncHTTPPolicy, and can be used only their corresponding synchronous or asynchronous pipeline type.

An HTTPPolicy or AsyncHTTPPolicy must implement the `send` method, and this implementation must in include a call to process the next policy in the pipeline:

```python
class CustomPolicy(HTTPPolicy):

Expand All @@ -384,6 +398,7 @@ class CustomAsyncPolicy(AsyncHTTPPolicy):
```

Currently provided HTTP policies include:

```python
from azure.core.pipeline.policies import (
RetryPolicy,
Expand Down Expand Up @@ -449,7 +464,6 @@ from azure.core.pipeline.policies import (
| | | retry_mode | x | x | Fixed or exponential delay between attemps, default is exponential. |
| | | timeout | x | x | Timeout setting for the operation in seconds, default is `604800s` (7 days). |


### The Pipeline

The pipeline itself represents a chain of policies where the final node in the chain is the HTTP transport.
Expand Down
50 changes: 37 additions & 13 deletions sdk/core/azure-core/azure/core/_pipeline_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
DistributedTracingPolicy,
HttpLoggingPolicy,
RequestIdPolicy,
RetryPolicy,
)
from .pipeline.transport import RequestsTransport

Expand Down Expand Up @@ -111,10 +112,10 @@ def close(self):
def _build_pipeline(self, config, **kwargs): # pylint: disable=no-self-use
transport = kwargs.get('transport')
policies = kwargs.get('policies')
per_call_policies = kwargs.get('per_call_policies', [])
per_retry_policies = kwargs.get('per_retry_policies', [])

if policies is None: # [] is a valid policy list
per_call_policies = kwargs.get('per_call_policies', [])
per_retry_policies = kwargs.get('per_retry_policies', [])
policies = [
RequestIdPolicy(**kwargs),
config.headers_policy,
Expand All @@ -123,24 +124,47 @@ def _build_pipeline(self, config, **kwargs): # pylint: disable=no-self-use
ContentDecodePolicy(**kwargs)
]
if isinstance(per_call_policies, Iterable):
for policy in per_call_policies:
policies.append(policy)
policies.extend(per_call_policies)
else:
policies.append(per_call_policies)

policies = policies + [config.redirect_policy,
config.retry_policy,
config.authentication_policy,
config.custom_hook_policy]
policies.extend([config.redirect_policy,
config.retry_policy,
config.authentication_policy,
config.custom_hook_policy])
if isinstance(per_retry_policies, Iterable):
for policy in per_retry_policies:
policies.append(policy)
policies.extend(per_retry_policies)
else:
policies.append(per_retry_policies)

policies = policies + [config.logging_policy,
DistributedTracingPolicy(**kwargs),
config.http_logging_policy or HttpLoggingPolicy(**kwargs)]
policies.extend([config.logging_policy,
DistributedTracingPolicy(**kwargs),
config.http_logging_policy or HttpLoggingPolicy(**kwargs)])
else:
if isinstance(per_call_policies, Iterable):
per_call_policies_list = list(per_call_policies)
else:
per_call_policies_list = [per_call_policies]
per_call_policies_list.extend(policies)
policies = per_call_policies_list

if isinstance(per_retry_policies, Iterable):
per_retry_policies_list = list(per_retry_policies)
else:
per_retry_policies_list = [per_retry_policies]
if len(per_retry_policies_list) > 0:
index_of_retry = -1
for index, policy in enumerate(policies):
if isinstance(policy, RetryPolicy):
index_of_retry = index
if index_of_retry == -1:
raise ValueError("Failed to add per_retry_policies; "
"no RetryPolicy found in the supplied list of policies. ")
policies_1 = policies[:index_of_retry+1]
policies_2 = policies[index_of_retry+1:]
policies_1.extend(per_retry_policies_list)
policies_1.extend(policies_2)
policies = policies_1

if not transport:
transport = RequestsTransport(**kwargs)
Expand Down
55 changes: 37 additions & 18 deletions sdk/core/azure-core/azure/core/_pipeline_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
DistributedTracingPolicy,
HttpLoggingPolicy,
RequestIdPolicy,
AsyncRetryPolicy,
)

try:
Expand Down Expand Up @@ -73,7 +74,7 @@ class AsyncPipelineClient(PipelineClientBase):
:keyword per_retry_policies: If specified, the policies will be added into the policy list after RetryPolicy
:paramtype per_retry_policies: Union[AsyncHTTPPolicy, SansIOHTTPPolicy,
list[AsyncHTTPPolicy], list[SansIOHTTPPolicy]]
:keyword AsyncHttpTransport transport: If omitted, AioHttpTransport is used for synchronous transport.
:keyword AsyncHttpTransport transport: If omitted, AioHttpTransport is used for asynchronous transport.
:return: An async pipeline object.
:rtype: ~azure.core.pipeline.AsyncPipeline
Expand Down Expand Up @@ -109,10 +110,10 @@ async def close(self):
def _build_pipeline(self, config, **kwargs): # pylint: disable=no-self-use
transport = kwargs.get('transport')
policies = kwargs.get('policies')
per_call_policies = kwargs.get('per_call_policies', [])
per_retry_policies = kwargs.get('per_retry_policies', [])

if policies is None: # [] is a valid policy list
per_call_policies = kwargs.get('per_call_policies', [])
per_retry_policies = kwargs.get('per_retry_policies', [])
policies = [
RequestIdPolicy(**kwargs),
config.headers_policy,
Expand All @@ -121,28 +122,46 @@ def _build_pipeline(self, config, **kwargs): # pylint: disable=no-self-use
ContentDecodePolicy(**kwargs)
]
if isinstance(per_call_policies, Iterable):
for policy in per_call_policies:
policies.append(policy)
policies.extend(per_call_policies)
else:
policies.append(per_call_policies)

policies = policies + [
config.redirect_policy,
config.retry_policy,
config.authentication_policy,
config.custom_hook_policy
]
policies.extend([config.redirect_policy,
config.retry_policy,
config.authentication_policy,
config.custom_hook_policy])
if isinstance(per_retry_policies, Iterable):
for policy in per_retry_policies:
policies.append(policy)
policies.extend(per_retry_policies)
else:
policies.append(per_retry_policies)

policies = policies + [
config.logging_policy,
DistributedTracingPolicy(**kwargs),
config.http_logging_policy or HttpLoggingPolicy(**kwargs)
]
policies.extend([config.logging_policy,
DistributedTracingPolicy(**kwargs),
config.http_logging_policy or HttpLoggingPolicy(**kwargs)])
else:
if isinstance(per_call_policies, Iterable):
per_call_policies_list = list(per_call_policies)
else:
per_call_policies_list = [per_call_policies]
per_call_policies_list.extend(policies)
policies = per_call_policies_list
if isinstance(per_retry_policies, Iterable):
per_retry_policies_list = list(per_retry_policies)
else:
per_retry_policies_list = [per_retry_policies]
if len(per_retry_policies_list) > 0:
index_of_retry = -1
for index, policy in enumerate(policies):
if isinstance(policy, AsyncRetryPolicy):
index_of_retry = index
if index_of_retry == -1:
raise ValueError("Failed to add per_retry_policies; "
"no RetryPolicy found in the supplied list of policies. ")
policies_1 = policies[:index_of_retry + 1]
policies_2 = policies[index_of_retry + 1:]
policies_1.extend(per_retry_policies_list)
policies_1.extend(policies_2)
policies = policies_1

if not transport:
from .pipeline.transport import AioHttpTransport
Expand Down
38 changes: 37 additions & 1 deletion sdk/core/azure-core/tests/async_tests/test_pipeline_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
from azure.core.pipeline.policies import (
SansIOHTTPPolicy,
UserAgentPolicy,
DistributedTracingPolicy,
AsyncRetryPolicy,
AsyncRedirectPolicy,
AsyncHTTPPolicy,
AsyncRetryPolicy,
HttpLoggingPolicy
HttpLoggingPolicy,
)
from azure.core.pipeline.transport import (
AsyncHttpTransport,
Expand Down Expand Up @@ -286,3 +287,38 @@ def send(*args):
pos_retry = policies.index(retry_policy)
assert pos_boo < pos_retry
assert pos_foo > pos_retry

policies = [UserAgentPolicy(),
AsyncRetryPolicy(),
DistributedTracingPolicy()]
client = AsyncPipelineClient(base_url="test", policies=policies, per_call_policies=boo_policy)
actual_policies = client._pipeline._impl_policies
assert boo_policy == actual_policies[0]
client = AsyncPipelineClient(base_url="test", policies=policies, per_call_policies=[boo_policy])
actual_policies = client._pipeline._impl_policies
assert boo_policy == actual_policies[0]

client = AsyncPipelineClient(base_url="test", policies=policies, per_retry_policies=foo_policy)
actual_policies = client._pipeline._impl_policies
assert foo_policy == actual_policies[2]
client = AsyncPipelineClient(base_url="test", policies=policies, per_retry_policies=[foo_policy])
actual_policies = client._pipeline._impl_policies
assert foo_policy == actual_policies[2]

client = AsyncPipelineClient(base_url="test", policies=policies, per_call_policies=boo_policy,
per_retry_policies=[foo_policy])
actual_policies = client._pipeline._impl_policies
assert boo_policy == actual_policies[0]
assert foo_policy == actual_policies[3]
client = AsyncPipelineClient(base_url="test", policies=policies, per_call_policies=[boo_policy],
per_retry_policies=[foo_policy])
actual_policies = client._pipeline._impl_policies
assert boo_policy == actual_policies[0]
assert foo_policy == actual_policies[3]

policies = [UserAgentPolicy(),
DistributedTracingPolicy()]
with pytest.raises(ValueError):
client = AsyncPipelineClient(base_url="test", policies=policies, per_retry_policies=foo_policy)
with pytest.raises(ValueError):
client = AsyncPipelineClient(base_url="test", policies=policies, per_retry_policies=[foo_policy])
35 changes: 35 additions & 0 deletions sdk/core/azure-core/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from azure.core.pipeline.policies import (
SansIOHTTPPolicy,
UserAgentPolicy,
DistributedTracingPolicy,
RedirectPolicy,
RetryPolicy,
HttpLoggingPolicy,
Expand Down Expand Up @@ -398,6 +399,40 @@ def send(*args):
assert pos_boo < pos_retry
assert pos_foo > pos_retry

policies = [UserAgentPolicy(),
RetryPolicy(),
DistributedTracingPolicy()]
client = PipelineClient(base_url="test", policies=policies, per_call_policies=boo_policy)
actual_policies = client._pipeline._impl_policies
assert boo_policy == actual_policies[0]
client = PipelineClient(base_url="test", policies=policies, per_call_policies=[boo_policy])
actual_policies = client._pipeline._impl_policies
assert boo_policy == actual_policies[0]

client = PipelineClient(base_url="test", policies=policies, per_retry_policies=foo_policy)
actual_policies = client._pipeline._impl_policies
assert foo_policy == actual_policies[2]
client = PipelineClient(base_url="test", policies=policies, per_retry_policies=[foo_policy])
actual_policies = client._pipeline._impl_policies
assert foo_policy == actual_policies[2]

client = PipelineClient(base_url="test", policies=policies, per_call_policies=boo_policy,
per_retry_policies=foo_policy)
actual_policies = client._pipeline._impl_policies
assert boo_policy == actual_policies[0]
assert foo_policy == actual_policies[3]
client = PipelineClient(base_url="test", policies=policies, per_call_policies=[boo_policy],
per_retry_policies=[foo_policy])
actual_policies = client._pipeline._impl_policies
assert boo_policy == actual_policies[0]
assert foo_policy == actual_policies[3]

policies = [UserAgentPolicy(),
DistributedTracingPolicy()]
with pytest.raises(ValueError):
client = PipelineClient(base_url="test", policies=policies, per_retry_policies=foo_policy)
with pytest.raises(ValueError):
client = PipelineClient(base_url="test", policies=policies, per_retry_policies=[foo_policy])

if __name__ == "__main__":
unittest.main()

0 comments on commit f545697

Please sign in to comment.