Skip to content

Commit

Permalink
Merge pull request #1471 from dhermes/bigtable-row-commit
Browse files Browse the repository at this point in the history
Implementing Row.commit() in Bigtable.
  • Loading branch information
dhermes committed Feb 15, 2016
2 parents 02c5689 + 64b8ef0 commit 9e08e57
Show file tree
Hide file tree
Showing 5 changed files with 336 additions and 4 deletions.
1 change: 0 additions & 1 deletion gcloud/bigquery/test_table.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# pylint: disable=too-many-lines
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Expand Down
2 changes: 1 addition & 1 deletion gcloud/bigtable/happybase/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def _get_cluster(timeout=None):
:rtype: :class:`gcloud.bigtable.cluster.Cluster`
:returns: The unique cluster owned by the project inferred from
the environment.
:raises: :class:`ValueError <exceptions.ValueError>` if their is a failed
:raises: :class:`ValueError <exceptions.ValueError>` if there is a failed
zone or any number of clusters other than one.
"""
client_kwargs = {'admin': True}
Expand Down
108 changes: 108 additions & 0 deletions gcloud/bigtable/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
from gcloud._helpers import _microseconds_from_datetime
from gcloud._helpers import _to_bytes
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)


_MAX_MUTATIONS = 100000
_PACK_I64 = struct.Struct('>q').pack


Expand Down Expand Up @@ -341,6 +344,111 @@ def delete_cells(self, column_family_id, columns, time_range=None,
# processed without error.
mutations_list.extend(to_append)

def _commit_mutate(self):
"""Makes a ``MutateRow`` API request.
Assumes no filter is set on the :class:`Row` and is meant to be called
by :meth:`commit`.
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
mutations exceeds the ``_MAX_MUTATIONS``.
"""
mutations_list = self._get_mutations()
num_mutations = len(mutations_list)
if num_mutations == 0:
return
if num_mutations > _MAX_MUTATIONS:
raise ValueError('%d total mutations exceed the maximum allowable '
'%d.' % (num_mutations, _MAX_MUTATIONS))
request_pb = messages_pb2.MutateRowRequest(
table_name=self._table.name,
row_key=self._row_key,
mutations=mutations_list,
)
# We expect a `google.protobuf.empty_pb2.Empty`
client = self._table._cluster._client
client._data_stub.MutateRow(request_pb, client.timeout_seconds)

def _commit_check_and_mutate(self):
"""Makes a ``CheckAndMutateRow`` API request.
Assumes a filter is set on the :class:`Row` and is meant to be called
by :meth:`commit`.
:rtype: bool
:returns: Flag indicating if the filter was matched (which also
indicates which set of mutations were applied by the server).
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
mutations exceeds the ``_MAX_MUTATIONS``.
"""
true_mutations = self._get_mutations(state=True)
false_mutations = self._get_mutations(state=False)
num_true_mutations = len(true_mutations)
num_false_mutations = len(false_mutations)
if num_true_mutations == 0 and num_false_mutations == 0:
return
if (num_true_mutations > _MAX_MUTATIONS or
num_false_mutations > _MAX_MUTATIONS):
raise ValueError(
'Exceed the maximum allowable mutations (%d). Had %s true '
'mutations and %d false mutations.' % (
_MAX_MUTATIONS, num_true_mutations, num_false_mutations))

request_pb = messages_pb2.CheckAndMutateRowRequest(
table_name=self._table.name,
row_key=self._row_key,
predicate_filter=self._filter.to_pb(),
true_mutations=true_mutations,
false_mutations=false_mutations,
)
# We expect a `.messages_pb2.CheckAndMutateRowResponse`
client = self._table._cluster._client
resp = client._data_stub.CheckAndMutateRow(
request_pb, client.timeout_seconds)
return resp.predicate_matched

def clear_mutations(self):
"""Removes all currently accumulated mutations on the current row."""
if self._filter is None:
del self._pb_mutations[:]
else:
del self._true_pb_mutations[:]
del self._false_pb_mutations[:]

def commit(self):
"""Makes a ``MutateRow`` or ``CheckAndMutateRow`` API request.
If no mutations have been created in the row, no request is made.
Mutations are applied atomically and in order, meaning that earlier
mutations can be masked / negated by later ones. Cells already present
in the row are left unchanged unless explicitly changed by a mutation.
After committing the accumulated mutations, resets the local
mutations to an empty list.
In the case that a filter is set on the :class:`Row`, the mutations
will be applied conditionally, based on whether the filter matches
any cells in the :class:`Row` or not. (Each method which adds a
mutation has a ``state`` parameter for this purpose.)
:rtype: :class:`bool` or :data:`NoneType <types.NoneType>`
:returns: :data:`None` if there is no filter, otherwise a flag
indicating if the filter was matched (which also
indicates which set of mutations were applied by the server).
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
mutations exceeds the ``_MAX_MUTATIONS``.
"""
if self._filter is None:
result = self._commit_mutate()
else:
result = self._commit_check_and_mutate()

# Reset mutations after commit-ing request.
self.clear_mutations()

return result


class RowFilter(object):
"""Basic filter to apply to cells in a row.
Expand Down
209 changes: 209 additions & 0 deletions gcloud/bigtable/test_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,194 @@ def test_delete_cells_with_string_columns(self):
)
self.assertEqual(row._pb_mutations, [expected_pb1, expected_pb2])

def test_commit(self):
from google.protobuf import empty_pb2
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._testing import _FakeStub

row_key = b'row_key'
table_name = 'projects/more-stuff'
column_family_id = u'column_family_id'
column = b'column'
timeout_seconds = 711
client = _Client(timeout_seconds=timeout_seconds)
table = _Table(table_name, client=client)
row = self._makeOne(row_key, table)

# Create request_pb
value = b'bytes-value'
mutation = data_pb2.Mutation(
set_cell=data_pb2.Mutation.SetCell(
family_name=column_family_id,
column_qualifier=column,
timestamp_micros=-1, # Default value.
value=value,
),
)
request_pb = messages_pb2.MutateRowRequest(
table_name=table_name,
row_key=row_key,
mutations=[mutation],
)

# Create response_pb
response_pb = empty_pb2.Empty()

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub(response_pb)

# Create expected_result.
expected_result = None # commit() has no return value when no filter.

# Perform the method and check the result.
row.set_cell(column_family_id, column, value)
result = row.commit()
self.assertEqual(result, expected_result)
self.assertEqual(stub.method_calls, [(
'MutateRow',
(request_pb, timeout_seconds),
{},
)])
self.assertEqual(row._pb_mutations, [])
self.assertEqual(row._true_pb_mutations, None)
self.assertEqual(row._false_pb_mutations, None)

def test_commit_too_many_mutations(self):
from gcloud._testing import _Monkey
from gcloud.bigtable import row as MUT

row_key = b'row_key'
table = object()
row = self._makeOne(row_key, table)
row._pb_mutations = [1, 2, 3]
num_mutations = len(row._pb_mutations)
with _Monkey(MUT, _MAX_MUTATIONS=num_mutations - 1):
with self.assertRaises(ValueError):
row.commit()

def test_commit_no_mutations(self):
from gcloud.bigtable._testing import _FakeStub

row_key = b'row_key'
client = _Client()
table = _Table(None, client=client)
row = self._makeOne(row_key, table)
self.assertEqual(row._pb_mutations, [])

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub()

# Perform the method and check the result.
result = row.commit()
self.assertEqual(result, None)
# Make sure no request was sent.
self.assertEqual(stub.method_calls, [])

def test_commit_with_filter(self):
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._testing import _FakeStub
from gcloud.bigtable.row import RowSampleFilter

row_key = b'row_key'
table_name = 'projects/more-stuff'
column_family_id = u'column_family_id'
column = b'column'
timeout_seconds = 262
client = _Client(timeout_seconds=timeout_seconds)
table = _Table(table_name, client=client)
row_filter = RowSampleFilter(0.33)
row = self._makeOne(row_key, table, filter_=row_filter)

# Create request_pb
value1 = b'bytes-value'
mutation1 = data_pb2.Mutation(
set_cell=data_pb2.Mutation.SetCell(
family_name=column_family_id,
column_qualifier=column,
timestamp_micros=-1, # Default value.
value=value1,
),
)
value2 = b'other-bytes'
mutation2 = data_pb2.Mutation(
set_cell=data_pb2.Mutation.SetCell(
family_name=column_family_id,
column_qualifier=column,
timestamp_micros=-1, # Default value.
value=value2,
),
)
request_pb = messages_pb2.CheckAndMutateRowRequest(
table_name=table_name,
row_key=row_key,
predicate_filter=row_filter.to_pb(),
true_mutations=[mutation1],
false_mutations=[mutation2],
)

# Create response_pb
predicate_matched = True
response_pb = messages_pb2.CheckAndMutateRowResponse(
predicate_matched=predicate_matched)

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub(response_pb)

# Create expected_result.
expected_result = predicate_matched

# Perform the method and check the result.
row.set_cell(column_family_id, column, value1, state=True)
row.set_cell(column_family_id, column, value2, state=False)
result = row.commit()
self.assertEqual(result, expected_result)
self.assertEqual(stub.method_calls, [(
'CheckAndMutateRow',
(request_pb, timeout_seconds),
{},
)])
self.assertEqual(row._pb_mutations, None)
self.assertEqual(row._true_pb_mutations, [])
self.assertEqual(row._false_pb_mutations, [])

def test_commit_with_filter_too_many_mutations(self):
from gcloud._testing import _Monkey
from gcloud.bigtable import row as MUT

row_key = b'row_key'
table = object()
filter_ = object()
row = self._makeOne(row_key, table, filter_=filter_)
row._true_pb_mutations = [1, 2, 3]
num_mutations = len(row._true_pb_mutations)
with _Monkey(MUT, _MAX_MUTATIONS=num_mutations - 1):
with self.assertRaises(ValueError):
row.commit()

def test_commit_with_filter_no_mutations(self):
from gcloud.bigtable._testing import _FakeStub

row_key = b'row_key'
client = _Client()
table = _Table(None, client=client)
filter_ = object()
row = self._makeOne(row_key, table, filter_=filter_)
self.assertEqual(row._true_pb_mutations, [])
self.assertEqual(row._false_pb_mutations, [])

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub()

# Perform the method and check the result.
result = row.commit()
self.assertEqual(result, None)
# Make sure no request was sent.
self.assertEqual(stub.method_calls, [])


class Test_BoolFilter(unittest2.TestCase):

Expand Down Expand Up @@ -1345,3 +1533,24 @@ def test_to_pb_false_only(self):
),
)
self.assertEqual(filter_pb, expected_pb)


class _Client(object):

data_stub = None

def __init__(self, timeout_seconds=None):
self.timeout_seconds = timeout_seconds


class _Cluster(object):

def __init__(self, client=None):
self._client = client


class _Table(object):

def __init__(self, name, client=None):
self.name = name
self._cluster = _Cluster(client)
Loading

0 comments on commit 9e08e57

Please sign in to comment.