Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use yield_rows for rows() and scan() #37

Merged
merged 9 commits into from
Jul 17, 2018
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pip-log.txt
.coverage
.tox
.cache
.pytest_cache

# Translations
*.mo
Expand Down
35 changes: 12 additions & 23 deletions src/google/cloud/happybase/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,21 +235,17 @@ def rows(self, rows, columns=None, timestamp=None,
# versions == 1 since we only want the latest.
filter_ = _filter_chain_helper(versions=1, timestamp=timestamp,
filters=filters)

partial_rows_data = self._low_level_table.read_rows(filter_=filter_)
rows_generator = self._low_level_table.yield_rows(
filter_=filter_)
# NOTE: We could use max_loops = 1000 or some similar value to ensure
# that the stream isn't open too long.
partial_rows_data.consume_all()

result = []
for row_key in rows:
if row_key not in partial_rows_data.rows:
continue
curr_row_data = partial_rows_data.rows[row_key]
for rowdata in rows_generator:
curr_row_data = rowdata
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
result.append((row_key, curr_row_dict))

result.append((curr_row_data.row_key, curr_row_dict))
return result

def cells(self, row, column, versions=None, timestamp=None,
Expand Down Expand Up @@ -385,23 +381,16 @@ def scan(self, row_start=None, row_stop=None, row_prefix=None,
row_start, row_stop, filter_chain = _scan_filter_helper(
row_start, row_stop, row_prefix, columns, timestamp, limit, kwargs)

partial_rows_data = self._low_level_table.read_rows(
rows_generator = self._low_level_table.yield_rows(
start_key=row_start, end_key=row_stop,
limit=limit, filter_=filter_chain)

# Mutable copy of data.
rows_dict = partial_rows_data.rows
while True:
try:
partial_rows_data.consume_next()
for row_key in sorted(rows_dict):
curr_row_data = rows_dict.pop(row_key)
# NOTE: We expect len(rows_dict) == 0, but don't check it.
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
yield (row_key, curr_row_dict)
except StopIteration:
break
for rowdata in rows_generator:
curr_row_data = rowdata
# NOTE: We expect len(rows_dict) == 0, but don't check it.
curr_row_dict = _partial_row_to_dict(
curr_row_data, include_timestamp=include_timestamp)
yield (curr_row_data.row_key, curr_row_dict)

This comment was marked as spam.


def put(self, row, data, timestamp=None, wal=_WAL_SENTINEL):
"""Insert data into a row in this table.
Expand Down
20 changes: 6 additions & 14 deletions unit_tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ def test_rows_with_columns(self):
table._low_level_table = _MockLowLevelTable()
rr_result = _MockPartialRowsData()
table._low_level_table.read_rows_result = rr_result
self.assertEqual(rr_result.consume_all_calls, 0)

# Set-up mocks.
fake_col_filter = object()
Expand Down Expand Up @@ -322,7 +321,6 @@ def mock_filter_chain_helper(**kwargs):
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_all_calls, 1)

self.assertEqual(mock_cols, [(columns,)])
self.assertEqual(mock_rows, [(rows,)])
Expand Down Expand Up @@ -350,7 +348,6 @@ def test_rows_with_results(self):
# Return row1 but not row2
rr_result = _MockPartialRowsData(rows={row_key1: row1})
table._low_level_table.read_rows_result = rr_result
self.assertEqual(rr_result.consume_all_calls, 0)

# Set-up mocks.
fake_rows_filter = object()
Expand Down Expand Up @@ -393,7 +390,6 @@ def mock_cells_to_pairs(*args, **kwargs):
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_all_calls, 1)

self.assertEqual(mock_rows, [(rows,)])
expected_kwargs = {
Expand Down Expand Up @@ -649,8 +645,6 @@ def mock_filter_chain_helper(**kwargs):
self.assertEqual(table._low_level_table.read_rows_calls, [
(read_rows_args, read_rows_kwargs),
])
self.assertEqual(rr_result.consume_next_calls,
rr_result.iterations + 1)

if columns is not None:
self.assertEqual(mock_columns, [(columns,)])
Expand Down Expand Up @@ -1472,10 +1466,13 @@ def read_row(self, *args, **kwargs):
self.read_row_calls.append((args, kwargs))
return self.read_row_result

def read_rows(self, *args, **kwargs):
def yield_rows(self, *args, **kwargs):
self.read_rows_calls.append((args, kwargs))
return self.read_rows_result

self.read_rows_result.consume_all()
rows_dict = self.read_rows_result.rows
for row_key in sorted(rows_dict):
curr_row_data = rows_dict.pop(row_key)
yield curr_row_data

class _MockLowLevelRow(object):

Expand Down Expand Up @@ -1528,8 +1525,3 @@ def __init__(self, rows=None, iterations=0):

def consume_all(self):
self.consume_all_calls += 1

This comment was marked as spam.


def consume_next(self):
self.consume_next_calls += 1
if self.consume_next_calls > self.iterations:
raise StopIteration