Skip to content

Commit

Permalink
Added support for Async Generators in TableClient.submit_transaction …
Browse files Browse the repository at this point in the history
…method (#21119)

* added support for async iterables in submit_transcation method

* the warning is a known issue in pylint pylint-dev/pylint#3507

* fixed formatting issues

* Code tweaks + tests

* Fixed grammar

* Improved readability

* Fix for Py27

Co-authored-by: antisch <antisch@microsoft.com>
  • Loading branch information
yashbhutoria and annatisch committed Oct 21, 2021
1 parent c8e7ce8 commit 8eeff8e
Show file tree
Hide file tree
Showing 9 changed files with 653 additions and 38 deletions.
6 changes: 5 additions & 1 deletion sdk/tables/azure-data-tables/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# Release History

## 12.1.1 (2021-09-08)
## 12.1.1 (Unreleased)

### Bugs Fixed

- Resolved bug where strings couldn't be used instead of enum value for entity Update Mode (#20247).
- Resolved bug where single quote characters in Partition and Row keys were not escaped correctly (#20301).

### Features Added

- Added support for async iterators in `aio.TableClient.submit_transaction (#21083, thank you yashbhutoria).

### Other Changes

- Bumped dependency on `msrest` to `>=0.6.21`
Expand Down
21 changes: 18 additions & 3 deletions sdk/tables/azure-data-tables/azure/data/tables/_table_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
Dict,
Mapping,
Optional,
List
List,
Tuple
)

from azure.core import MatchConditions

from ._common_conversion import _transform_patch_to_cosmos_post
from ._models import UpdateMode
from ._models import UpdateMode, TransactionOperation
from ._serialize import _get_match_headers, _add_entity_properties, _prepare_key
from ._entity import TableEntity

Expand All @@ -26,8 +27,10 @@
from ._generated import models, AzureTable
from ._generated._configuration import AzureTableConfiguration

EntityType = Union[TableEntity, Mapping[str, Any]]

EntityType = Union[TableEntity, Mapping[str, Any]]
OperationType = Union[TransactionOperation, str]
TransactionOperationType = Union[Tuple[OperationType, EntityType], Tuple[OperationType, EntityType, Mapping[str, Any]]]


class TableBatchOperations(object):
Expand Down Expand Up @@ -91,6 +94,18 @@ def _verify_partition_key(
elif entity["PartitionKey"] != self._partition_key:
raise ValueError("Partition Keys must all be the same")

def add_operation(self, operation):
# type: (TransactionOperationType) -> None
"""Add a single operation to a batch."""
try:
operation_type, entity, kwargs = operation # type: ignore
except ValueError:
operation_type, entity, kwargs = operation[0], operation[1], {} # type: ignore
try:
getattr(self, operation_type.lower())(entity, **kwargs)
except AttributeError:
raise ValueError("Unrecognized operation: {}".format(operation))

def create(
self,
entity, # type: EntityType
Expand Down
36 changes: 17 additions & 19 deletions sdk/tables/azure-data-tables/azure/data/tables/_table_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# --------------------------------------------------------------------------

import functools
from typing import Optional, Any, TYPE_CHECKING, Union, List, Tuple, Dict, Mapping, Iterable, overload, cast
from typing import Optional, Any, TYPE_CHECKING, Union, List, Dict, Mapping, Iterable, overload, cast
try:
from urllib.parse import urlparse, unquote
except ImportError:
Expand Down Expand Up @@ -34,19 +34,14 @@
from ._base_client import parse_connection_str, TablesBaseClient
from ._serialize import serialize_iso, _parameter_filter_substitution
from ._deserialize import deserialize_iso, _return_headers_and_deserialized
from ._table_batch import TableBatchOperations
from ._table_batch import TableBatchOperations, EntityType, TransactionOperationType
from ._models import (
TableEntityPropertiesPaged,
UpdateMode,
TableAccessPolicy,
TransactionOperation,
TableItem
)

EntityType = Union[TableEntity, Mapping[str, Any]]
OperationType = Union[TransactionOperation, str]
TransactionOperationType = Union[Tuple[OperationType, EntityType], Tuple[OperationType, EntityType, Mapping[str, Any]]]

if TYPE_CHECKING:
from azure.core.credentials import AzureNamedKeyCredential, AzureSasCredential

Expand Down Expand Up @@ -691,10 +686,14 @@ def submit_transaction(
If any one of these operations fails, the entire transaction will be rejected.
:param operations: The list of operations to commit in a transaction. This should be a list of
:param operations: The list of operations to commit in a transaction. This should be an iterable of
tuples containing an operation name, the entity on which to operate, and optionally, a dict of additional
kwargs for that operation.
:type operations: Iterable[Tuple[str, EntityType]]
kwargs for that operation. For example::
- ('upsert', {'PartitionKey': 'A', 'RowKey': 'B'})
- ('upsert', {'PartitionKey': 'A', 'RowKey': 'B'}, {'mode': UpdateMode.REPLACE})
:type operations: Iterable[Tuple[str, TableEntity, Mapping[str, Any]]]
:return: A list of mappings with response metadata for each operation in the transaction.
:rtype: List[Mapping[str, Any]]
:raises: :class:`~azure.data.tables.TableTransactionError`
Expand All @@ -717,13 +716,12 @@ def submit_transaction(
is_cosmos_endpoint=self._cosmos_endpoint,
**kwargs
)
for operation in operations:
try:
operation_kwargs = operation[2] # type: ignore
except IndexError:
operation_kwargs = {}
try:
getattr(batched_requests, operation[0].lower())(operation[1], **operation_kwargs)
except AttributeError:
raise ValueError("Unrecognized operation: {}".format(operation[0]))
try:
for operation in operations:
batched_requests.add_operation(operation)
except TypeError:
raise TypeError(
"The value of 'operations' must be an iterator "
"of Tuples. Please check documentation for correct Tuple format."
)
return self._batch_send(*batched_requests.requests, **kwargs) # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from .._common_conversion import _transform_patch_to_cosmos_post
from .._models import UpdateMode
from .._entity import TableEntity
from .._table_batch import EntityType
from .._table_batch import EntityType, TransactionOperationType
from .._serialize import (
_prepare_key,
_get_match_headers,
Expand Down Expand Up @@ -68,6 +68,17 @@ def _verify_partition_key(
elif entity["PartitionKey"] != self._partition_key:
raise ValueError("Partition Keys must all be the same")

def add_operation(self, operation: TransactionOperationType) -> None:
"""Add a single operation to a batch."""
try:
operation_type, entity, kwargs = operation # type: ignore
except ValueError:
operation_type, entity, kwargs = *operation, {} # type: ignore
try:
getattr(self, operation_type.lower())(entity, **kwargs)
except AttributeError:
raise ValueError("Unrecognized operation: {}".format(operation))

def create(
self,
entity: EntityType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# license information.
# --------------------------------------------------------------------------
import functools
from typing import List, Union, Any, Optional, Mapping, Iterable, Dict, overload, cast, TYPE_CHECKING
from typing import AsyncIterable, List, Union, Any, Optional, Mapping, Iterable, Dict, overload, cast, TYPE_CHECKING
try:
from urllib.parse import urlparse, unquote
except ImportError:
Expand Down Expand Up @@ -664,17 +664,24 @@ async def upsert_entity(
@distributed_trace_async
async def submit_transaction(
self,
operations: Iterable[TransactionOperationType],
operations: Union[
Iterable[TransactionOperationType], AsyncIterable[TransactionOperationType]
],
**kwargs
) -> List[Mapping[str, Any]]:
"""Commit a list of operations as a single transaction.
If any one of these operations fails, the entire transaction will be rejected.
:param operations: The list of operations to commit in a transaction. This should be a list of
tuples containing an operation name, the entity on which to operate, and optionally, a dict of additional
kwargs for that operation.
:type operations: Iterable[Tuple[str, EntityType]]
:param operations: The list of operations to commit in a transaction. This should be an iterable
(or async iterable) of tuples containing an operation name, the entity on which to operate,
and optionally, a dict of additional kwargs for that operation. For example::
- ('upsert', {'PartitionKey': 'A', 'RowKey': 'B'})
- ('upsert', {'PartitionKey': 'A', 'RowKey': 'B'}, {'mode': UpdateMode.REPLACE})
:type operations:
Union[Iterable[Tuple[str, Entity, Mapping[str, Any]]],AsyncIterable[Tuple[str, Entity, Mapping[str, Any]]]]
:return: A list of mappings with response metadata for each operation in the transaction.
:rtype: List[Mapping[str, Any]]
:raises ~azure.data.tables.TableTransactionError:
Expand All @@ -697,13 +704,17 @@ async def submit_transaction(
is_cosmos_endpoint=self._cosmos_endpoint,
**kwargs
)
for operation in operations:
try:
operation_kwargs = operation[2] # type: ignore
except IndexError:
operation_kwargs = {}
try:
for operation in operations: # type: ignore
batched_requests.add_operation(operation)
except TypeError:
try:
getattr(batched_requests, operation[0].lower())(operation[1], **operation_kwargs)
except AttributeError:
raise ValueError("Unrecognized operation: {}".format(operation))
async for operation in operations: # type: ignore
batched_requests.add_operation(operation)
except TypeError:
raise TypeError(
"The value of 'operations' must be an iterator or async iterator "
"of Tuples. Please check documentation for correct Tuple format."
)

return await self._batch_send(*batched_requests.requests, **kwargs)
Loading

0 comments on commit 8eeff8e

Please sign in to comment.