diff --git a/gcloud/bigtable/row.py b/gcloud/bigtable/row.py index c9252daaa362..d090cb071620 100644 --- a/gcloud/bigtable/row.py +++ b/gcloud/bigtable/row.py @@ -19,6 +19,7 @@ import six +from gcloud._helpers import _datetime_from_microseconds from gcloud._helpers import _microseconds_from_datetime from gcloud._helpers import _to_bytes from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 @@ -449,6 +450,67 @@ def commit(self): return result + def clear_modification_rules(self): + """Removes all currently accumulated modifications on current row.""" + del self._rule_pb_list[:] + + def commit_modifications(self): + """Makes a ``ReadModifyWriteRow`` API request. + + This commits modifications made by :meth:`append_cell_value` and + :meth:`increment_cell_value`. If no modifications were made, makes + no API request and just returns ``{}``. + + Modifies a row atomically, reading the latest existing timestamp/value + from the specified columns and writing a new value by appending / + incrementing. The new cell created uses either the current server time + or the highest timestamp of a cell in that column (if it exceeds the + server time). + + :rtype: dict + :returns: The new contents of all modified cells. Returned as a + dictionary of column families, each of which holds a + dictionary of columns. Each column contains a list of cells + modified. Each cell is represented with a two-tuple with the + value (in bytes) and the timestamp for the cell. For example: + + .. code:: python + + { + u'col-fam-id': { + b'col-name1': [ + (b'cell-val', datetime.datetime(...)), + (b'cell-val-newer', datetime.datetime(...)), + ], + b'col-name2': [ + (b'altcol-cell-val', datetime.datetime(...)), + ], + }, + u'col-fam-id2': { + b'col-name3-but-other-fam': [ + (b'foo', datetime.datetime(...)), + ], + }, + } + """ + if len(self._rule_pb_list) == 0: + return {} + request_pb = messages_pb2.ReadModifyWriteRowRequest( + table_name=self._table.name, + row_key=self._row_key, + rules=self._rule_pb_list, + ) + # We expect a `.data_pb2.Row` + client = self._table._cluster._client + row_response = client._data_stub.ReadModifyWriteRow( + request_pb, client.timeout_seconds) + + # Reset modifications after commit-ing request. + self.clear_modification_rules() + + # NOTE: We expect row_response.key == self._row_key but don't check. + return _parse_rmw_row_response(row_response) + class RowFilter(object): """Basic filter to apply to cells in a row. @@ -1192,3 +1254,81 @@ def to_pb(self): condition_kwargs['false_filter'] = self.false_filter.to_pb() condition = data_pb2.RowFilter.Condition(**condition_kwargs) return data_pb2.RowFilter(condition=condition) + + +def _parse_rmw_row_response(row_response): + """Parses the response to a ``ReadModifyWriteRow`` request. + + :type row_response: :class:`.data_pb2.Row` + :param row_response: The response row (with only modified cells) from a + ``ReadModifyWriteRow`` request. + + :rtype: dict + :returns: The new contents of all modified cells. Returned as a + dictionary of column families, each of which holds a + dictionary of columns. Each column contains a list of cells + modified. Each cell is represented with a two-tuple with the + value (in bytes) and the timestamp for the cell. For example: + + .. code:: python + + { + u'col-fam-id': { + b'col-name1': [ + (b'cell-val', datetime.datetime(...)), + (b'cell-val-newer', datetime.datetime(...)), + ], + b'col-name2': [ + (b'altcol-cell-val', datetime.datetime(...)), + ], + }, + u'col-fam-id2': { + b'col-name3-but-other-fam': [ + (b'foo', datetime.datetime(...)), + ], + }, + } + """ + result = {} + for column_family in row_response.families: + column_family_id, curr_family = _parse_family_pb(column_family) + result[column_family_id] = curr_family + return result + + +def _parse_family_pb(family_pb): + """Parses a Family protobuf into a dictionary. + + :type family_pb: :class:`._generated.bigtable_data_pb2.Family` + :param family_pb: A protobuf + + :rtype: tuple + :returns: A string and dictionary. The string is the name of the + column family and the dictionary has column names (within the + family) as keys and cell lists as values. Each cell is + represented with a two-tuple with the value (in bytes) and the + timestamp for the cell. For example: + + .. code:: python + + { + b'col-name1': [ + (b'cell-val', datetime.datetime(...)), + (b'cell-val-newer', datetime.datetime(...)), + ], + b'col-name2': [ + (b'altcol-cell-val', datetime.datetime(...)), + ], + } + """ + result = {} + for column in family_pb.columns: + result[column.qualifier] = cells = [] + for cell in column.cells: + val_pair = ( + cell.value, + _datetime_from_microseconds(cell.timestamp_micros), + ) + cells.append(val_pair) + + return family_pb.name, result diff --git a/gcloud/bigtable/test_row.py b/gcloud/bigtable/test_row.py index a0f7493d8c88..71a62763bd32 100644 --- a/gcloud/bigtable/test_row.py +++ b/gcloud/bigtable/test_row.py @@ -545,6 +545,88 @@ def test_commit_with_filter_no_mutations(self): # Make sure no request was sent. self.assertEqual(stub.method_calls, []) + def test_commit_modifications(self): + from gcloud._testing import _Monkey + from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 + from gcloud.bigtable._generated import ( + bigtable_service_messages_pb2 as messages_pb2) + from gcloud.bigtable._testing import _FakeStub + from gcloud.bigtable import row as MUT + + row_key = b'row_key' + table_name = 'projects/more-stuff' + column_family_id = u'column_family_id' + column = b'column' + timeout_seconds = 87 + client = _Client(timeout_seconds=timeout_seconds) + table = _Table(table_name, client=client) + row = self._makeOne(row_key, table) + + # Create request_pb + value = b'bytes-value' + # We will call row.append_cell_value(COLUMN_FAMILY_ID, COLUMN, value). + request_pb = messages_pb2.ReadModifyWriteRowRequest( + table_name=table_name, + row_key=row_key, + rules=[ + data_pb2.ReadModifyWriteRule( + family_name=column_family_id, + column_qualifier=column, + append_value=value, + ), + ], + ) + + # Create response_pb + response_pb = object() + + # Patch the stub used by the API method. + client._data_stub = stub = _FakeStub(response_pb) + + # Create expected_result. + row_responses = [] + expected_result = object() + + def mock_parse_rmw_row_response(row_response): + row_responses.append(row_response) + return expected_result + + # Perform the method and check the result. + with _Monkey(MUT, _parse_rmw_row_response=mock_parse_rmw_row_response): + row.append_cell_value(column_family_id, column, value) + result = row.commit_modifications() + + self.assertEqual(result, expected_result) + self.assertEqual(stub.method_calls, [( + 'ReadModifyWriteRow', + (request_pb, timeout_seconds), + {}, + )]) + self.assertEqual(row._pb_mutations, []) + self.assertEqual(row._true_pb_mutations, None) + self.assertEqual(row._false_pb_mutations, None) + + self.assertEqual(row_responses, [response_pb]) + self.assertEqual(row._rule_pb_list, []) + + def test_commit_modifications_no_rules(self): + from gcloud.bigtable._testing import _FakeStub + + row_key = b'row_key' + client = _Client() + table = _Table(None, client=client) + row = self._makeOne(row_key, table) + self.assertEqual(row._rule_pb_list, []) + + # Patch the stub used by the API method. + client._data_stub = stub = _FakeStub() + + # Perform the method and check the result. + result = row.commit_modifications() + self.assertEqual(result, {}) + # Make sure no request was sent. + self.assertEqual(stub.method_calls, []) + class Test_BoolFilter(unittest2.TestCase): @@ -1535,6 +1617,151 @@ def test_to_pb_false_only(self): self.assertEqual(filter_pb, expected_pb) +class Test__parse_rmw_row_response(unittest2.TestCase): + + def _callFUT(self, row_response): + from gcloud.bigtable.row import _parse_rmw_row_response + return _parse_rmw_row_response(row_response) + + def test_it(self): + from gcloud._helpers import _datetime_from_microseconds + from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 + + col_fam1 = u'col-fam-id' + col_fam2 = u'col-fam-id2' + col_name1 = b'col-name1' + col_name2 = b'col-name2' + col_name3 = b'col-name3-but-other-fam' + cell_val1 = b'cell-val' + cell_val2 = b'cell-val-newer' + cell_val3 = b'altcol-cell-val' + cell_val4 = b'foo' + + microseconds = 1000871 + timestamp = _datetime_from_microseconds(microseconds) + expected_output = { + col_fam1: { + col_name1: [ + (cell_val1, timestamp), + (cell_val2, timestamp), + ], + col_name2: [ + (cell_val3, timestamp), + ], + }, + col_fam2: { + col_name3: [ + (cell_val4, timestamp), + ], + }, + } + sample_input = data_pb2.Row( + families=[ + data_pb2.Family( + name=col_fam1, + columns=[ + data_pb2.Column( + qualifier=col_name1, + cells=[ + data_pb2.Cell( + value=cell_val1, + timestamp_micros=microseconds, + ), + data_pb2.Cell( + value=cell_val2, + timestamp_micros=microseconds, + ), + ], + ), + data_pb2.Column( + qualifier=col_name2, + cells=[ + data_pb2.Cell( + value=cell_val3, + timestamp_micros=microseconds, + ), + ], + ), + ], + ), + data_pb2.Family( + name=col_fam2, + columns=[ + data_pb2.Column( + qualifier=col_name3, + cells=[ + data_pb2.Cell( + value=cell_val4, + timestamp_micros=microseconds, + ), + ], + ), + ], + ), + ], + ) + self.assertEqual(expected_output, self._callFUT(sample_input)) + + +class Test__parse_family_pb(unittest2.TestCase): + + def _callFUT(self, family_pb): + from gcloud.bigtable.row import _parse_family_pb + return _parse_family_pb(family_pb) + + def test_it(self): + from gcloud._helpers import _datetime_from_microseconds + from gcloud.bigtable._generated import bigtable_data_pb2 as data_pb2 + + col_fam1 = u'col-fam-id' + col_name1 = b'col-name1' + col_name2 = b'col-name2' + cell_val1 = b'cell-val' + cell_val2 = b'cell-val-newer' + cell_val3 = b'altcol-cell-val' + + microseconds = 5554441037 + timestamp = _datetime_from_microseconds(microseconds) + expected_dict = { + col_name1: [ + (cell_val1, timestamp), + (cell_val2, timestamp), + ], + col_name2: [ + (cell_val3, timestamp), + ], + } + expected_output = (col_fam1, expected_dict) + sample_input = data_pb2.Family( + name=col_fam1, + columns=[ + data_pb2.Column( + qualifier=col_name1, + cells=[ + data_pb2.Cell( + value=cell_val1, + timestamp_micros=microseconds, + ), + data_pb2.Cell( + value=cell_val2, + timestamp_micros=microseconds, + ), + ], + ), + data_pb2.Column( + qualifier=col_name2, + cells=[ + data_pb2.Cell( + value=cell_val3, + timestamp_micros=microseconds, + ), + ], + ), + ], + ) + self.assertEqual(expected_output, self._callFUT(sample_input)) + + class _Client(object): data_stub = None diff --git a/scripts/run_pylint.py b/scripts/run_pylint.py index 8f566b46a0f8..f78da26e59e9 100644 --- a/scripts/run_pylint.py +++ b/scripts/run_pylint.py @@ -65,7 +65,7 @@ } TEST_RC_REPLACEMENTS = { 'FORMAT': { - 'max-module-lines': 1700, + 'max-module-lines': 1800, }, }