Skip to content

Commit

Permalink
Merge pull request #1472 from dhermes/bigtable-row-commit-mods
Browse files Browse the repository at this point in the history
Implementing Bigtable Row.commit_modifications().
  • Loading branch information
dhermes committed Feb 16, 2016
2 parents 9e08e57 + 389f07f commit df3c4e4
Show file tree
Hide file tree
Showing 3 changed files with 368 additions and 1 deletion.
140 changes: 140 additions & 0 deletions gcloud/bigtable/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import six

from gcloud._helpers import _datetime_from_microseconds
from gcloud._helpers import _microseconds_from_datetime
from gcloud._helpers import _to_bytes
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
Expand Down Expand Up @@ -449,6 +450,67 @@ def commit(self):

return result

def clear_modification_rules(self):
"""Removes all currently accumulated modifications on current row."""
del self._rule_pb_list[:]

def commit_modifications(self):
"""Makes a ``ReadModifyWriteRow`` API request.
This commits modifications made by :meth:`append_cell_value` and
:meth:`increment_cell_value`. If no modifications were made, makes
no API request and just returns ``{}``.
Modifies a row atomically, reading the latest existing timestamp/value
from the specified columns and writing a new value by appending /
incrementing. The new cell created uses either the current server time
or the highest timestamp of a cell in that column (if it exceeds the
server time).
:rtype: dict
:returns: The new contents of all modified cells. Returned as a
dictionary of column families, each of which holds a
dictionary of columns. Each column contains a list of cells
modified. Each cell is represented with a two-tuple with the
value (in bytes) and the timestamp for the cell. For example:
.. code:: python
{
u'col-fam-id': {
b'col-name1': [
(b'cell-val', datetime.datetime(...)),
(b'cell-val-newer', datetime.datetime(...)),
],
b'col-name2': [
(b'altcol-cell-val', datetime.datetime(...)),
],
},
u'col-fam-id2': {
b'col-name3-but-other-fam': [
(b'foo', datetime.datetime(...)),
],
},
}
"""
if len(self._rule_pb_list) == 0:
return {}
request_pb = messages_pb2.ReadModifyWriteRowRequest(
table_name=self._table.name,
row_key=self._row_key,
rules=self._rule_pb_list,
)
# We expect a `.data_pb2.Row`
client = self._table._cluster._client
row_response = client._data_stub.ReadModifyWriteRow(
request_pb, client.timeout_seconds)

# Reset modifications after commit-ing request.
self.clear_modification_rules()

# NOTE: We expect row_response.key == self._row_key but don't check.
return _parse_rmw_row_response(row_response)


class RowFilter(object):
"""Basic filter to apply to cells in a row.
Expand Down Expand Up @@ -1192,3 +1254,81 @@ def to_pb(self):
condition_kwargs['false_filter'] = self.false_filter.to_pb()
condition = data_pb2.RowFilter.Condition(**condition_kwargs)
return data_pb2.RowFilter(condition=condition)


def _parse_rmw_row_response(row_response):
"""Parses the response to a ``ReadModifyWriteRow`` request.
:type row_response: :class:`.data_pb2.Row`
:param row_response: The response row (with only modified cells) from a
``ReadModifyWriteRow`` request.
:rtype: dict
:returns: The new contents of all modified cells. Returned as a
dictionary of column families, each of which holds a
dictionary of columns. Each column contains a list of cells
modified. Each cell is represented with a two-tuple with the
value (in bytes) and the timestamp for the cell. For example:
.. code:: python
{
u'col-fam-id': {
b'col-name1': [
(b'cell-val', datetime.datetime(...)),
(b'cell-val-newer', datetime.datetime(...)),
],
b'col-name2': [
(b'altcol-cell-val', datetime.datetime(...)),
],
},
u'col-fam-id2': {
b'col-name3-but-other-fam': [
(b'foo', datetime.datetime(...)),
],
},
}
"""
result = {}
for column_family in row_response.families:
column_family_id, curr_family = _parse_family_pb(column_family)
result[column_family_id] = curr_family
return result


def _parse_family_pb(family_pb):
"""Parses a Family protobuf into a dictionary.
:type family_pb: :class:`._generated.bigtable_data_pb2.Family`
:param family_pb: A protobuf
:rtype: tuple
:returns: A string and dictionary. The string is the name of the
column family and the dictionary has column names (within the
family) as keys and cell lists as values. Each cell is
represented with a two-tuple with the value (in bytes) and the
timestamp for the cell. For example:
.. code:: python
{
b'col-name1': [
(b'cell-val', datetime.datetime(...)),
(b'cell-val-newer', datetime.datetime(...)),
],
b'col-name2': [
(b'altcol-cell-val', datetime.datetime(...)),
],
}
"""
result = {}
for column in family_pb.columns:
result[column.qualifier] = cells = []
for cell in column.cells:
val_pair = (
cell.value,
_datetime_from_microseconds(cell.timestamp_micros),
)
cells.append(val_pair)

return family_pb.name, result
227 changes: 227 additions & 0 deletions gcloud/bigtable/test_row.py
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,88 @@ def test_commit_with_filter_no_mutations(self):
# Make sure no request was sent.
self.assertEqual(stub.method_calls, [])

def test_commit_modifications(self):
from gcloud._testing import _Monkey
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud.bigtable._generated import (
bigtable_service_messages_pb2 as messages_pb2)
from gcloud.bigtable._testing import _FakeStub
from gcloud.bigtable import row as MUT

row_key = b'row_key'
table_name = 'projects/more-stuff'
column_family_id = u'column_family_id'
column = b'column'
timeout_seconds = 87
client = _Client(timeout_seconds=timeout_seconds)
table = _Table(table_name, client=client)
row = self._makeOne(row_key, table)

# Create request_pb
value = b'bytes-value'
# We will call row.append_cell_value(COLUMN_FAMILY_ID, COLUMN, value).
request_pb = messages_pb2.ReadModifyWriteRowRequest(
table_name=table_name,
row_key=row_key,
rules=[
data_pb2.ReadModifyWriteRule(
family_name=column_family_id,
column_qualifier=column,
append_value=value,
),
],
)

# Create response_pb
response_pb = object()

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub(response_pb)

# Create expected_result.
row_responses = []
expected_result = object()

def mock_parse_rmw_row_response(row_response):
row_responses.append(row_response)
return expected_result

# Perform the method and check the result.
with _Monkey(MUT, _parse_rmw_row_response=mock_parse_rmw_row_response):
row.append_cell_value(column_family_id, column, value)
result = row.commit_modifications()

self.assertEqual(result, expected_result)
self.assertEqual(stub.method_calls, [(
'ReadModifyWriteRow',
(request_pb, timeout_seconds),
{},
)])
self.assertEqual(row._pb_mutations, [])
self.assertEqual(row._true_pb_mutations, None)
self.assertEqual(row._false_pb_mutations, None)

self.assertEqual(row_responses, [response_pb])
self.assertEqual(row._rule_pb_list, [])

def test_commit_modifications_no_rules(self):
from gcloud.bigtable._testing import _FakeStub

row_key = b'row_key'
client = _Client()
table = _Table(None, client=client)
row = self._makeOne(row_key, table)
self.assertEqual(row._rule_pb_list, [])

# Patch the stub used by the API method.
client._data_stub = stub = _FakeStub()

# Perform the method and check the result.
result = row.commit_modifications()
self.assertEqual(result, {})
# Make sure no request was sent.
self.assertEqual(stub.method_calls, [])


class Test_BoolFilter(unittest2.TestCase):

Expand Down Expand Up @@ -1535,6 +1617,151 @@ def test_to_pb_false_only(self):
self.assertEqual(filter_pb, expected_pb)


class Test__parse_rmw_row_response(unittest2.TestCase):

def _callFUT(self, row_response):
from gcloud.bigtable.row import _parse_rmw_row_response
return _parse_rmw_row_response(row_response)

def test_it(self):
from gcloud._helpers import _datetime_from_microseconds
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2

col_fam1 = u'col-fam-id'
col_fam2 = u'col-fam-id2'
col_name1 = b'col-name1'
col_name2 = b'col-name2'
col_name3 = b'col-name3-but-other-fam'
cell_val1 = b'cell-val'
cell_val2 = b'cell-val-newer'
cell_val3 = b'altcol-cell-val'
cell_val4 = b'foo'

microseconds = 1000871
timestamp = _datetime_from_microseconds(microseconds)
expected_output = {
col_fam1: {
col_name1: [
(cell_val1, timestamp),
(cell_val2, timestamp),
],
col_name2: [
(cell_val3, timestamp),
],
},
col_fam2: {
col_name3: [
(cell_val4, timestamp),
],
},
}
sample_input = data_pb2.Row(
families=[
data_pb2.Family(
name=col_fam1,
columns=[
data_pb2.Column(
qualifier=col_name1,
cells=[
data_pb2.Cell(
value=cell_val1,
timestamp_micros=microseconds,
),
data_pb2.Cell(
value=cell_val2,
timestamp_micros=microseconds,
),
],
),
data_pb2.Column(
qualifier=col_name2,
cells=[
data_pb2.Cell(
value=cell_val3,
timestamp_micros=microseconds,
),
],
),
],
),
data_pb2.Family(
name=col_fam2,
columns=[
data_pb2.Column(
qualifier=col_name3,
cells=[
data_pb2.Cell(
value=cell_val4,
timestamp_micros=microseconds,
),
],
),
],
),
],
)
self.assertEqual(expected_output, self._callFUT(sample_input))


class Test__parse_family_pb(unittest2.TestCase):

def _callFUT(self, family_pb):
from gcloud.bigtable.row import _parse_family_pb
return _parse_family_pb(family_pb)

def test_it(self):
from gcloud._helpers import _datetime_from_microseconds
from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2

col_fam1 = u'col-fam-id'
col_name1 = b'col-name1'
col_name2 = b'col-name2'
cell_val1 = b'cell-val'
cell_val2 = b'cell-val-newer'
cell_val3 = b'altcol-cell-val'

microseconds = 5554441037
timestamp = _datetime_from_microseconds(microseconds)
expected_dict = {
col_name1: [
(cell_val1, timestamp),
(cell_val2, timestamp),
],
col_name2: [
(cell_val3, timestamp),
],
}
expected_output = (col_fam1, expected_dict)
sample_input = data_pb2.Family(
name=col_fam1,
columns=[
data_pb2.Column(
qualifier=col_name1,
cells=[
data_pb2.Cell(
value=cell_val1,
timestamp_micros=microseconds,
),
data_pb2.Cell(
value=cell_val2,
timestamp_micros=microseconds,
),
],
),
data_pb2.Column(
qualifier=col_name2,
cells=[
data_pb2.Cell(
value=cell_val3,
timestamp_micros=microseconds,
),
],
),
],
)
self.assertEqual(expected_output, self._callFUT(sample_input))


class _Client(object):

data_stub = None
Expand Down
Loading

0 comments on commit df3c4e4

Please sign in to comment.