Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: Added system test for query offset issue #557

Merged
merged 7 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions tests/system/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,17 @@ def large_query_client(datastore_client):
return large_query_client


@pytest.fixture(scope="session")
def mergejoin_query_client(datastore_client):
mergejoin_query_client = _helpers.clone_client(
datastore_client,
namespace=populate_datastore.MERGEJOIN_DATASET_NAMESPACE,
)
populate_datastore.add_mergejoin_dataset_entities(client=mergejoin_query_client)

return mergejoin_query_client


@pytest.fixture(scope="function")
def large_query(large_query_client):
# Use the client for this test instead of the global.
Expand All @@ -346,6 +357,15 @@ def large_query(large_query_client):
)


@pytest.fixture(scope="function")
def mergejoin_query(mergejoin_query_client):
# Use the client for this test instead of the global.
return mergejoin_query_client.query(
kind=populate_datastore.MERGEJOIN_DATASET_KIND,
namespace=populate_datastore.MERGEJOIN_DATASET_NAMESPACE,
)


@pytest.mark.parametrize(
"limit,offset,expected",
[
Expand Down Expand Up @@ -385,6 +405,20 @@ def test_large_query(large_query, limit, offset, expected, database_id):
assert len(entities) == expected


@pytest.mark.parametrize("database_id", [_helpers.TEST_DATABASE], indirect=True)
def test_mergejoin_query(mergejoin_query, database_id):
query = mergejoin_query
query.add_filter(filter=PropertyFilter("a", "=", 1))
query.add_filter(filter=PropertyFilter("b", "=", 1))

# There should be 2 * MERGEJOIN_QUERY_NUM_RESULTS results total
expected_total = 2 * populate_datastore.MERGEJOIN_QUERY_NUM_RESULTS
for offset in range(0, expected_total + 1):
iterator = query.fetch(offset=offset)
num_entities = len([e for e in iterator])
assert num_entities == expected_total - offset


@pytest.mark.parametrize("database_id", [None, _helpers.TEST_DATABASE], indirect=True)
def test_query_add_property_filter(ancestor_query, database_id):
query = ancestor_query
Expand Down
10 changes: 6 additions & 4 deletions tests/system/utils/clear_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
"Post",
"uuid_key",
"timestamp_key",
"LargeCharacter",
"Mergejoin",
)
TRANSACTION_MAX_GROUPS = 5
MAX_DEL_ENTITIES = 500
Expand Down Expand Up @@ -90,12 +92,10 @@ def remove_all_entities(client):


def run(database):
client = datastore.Client(database=database)
kinds = sys.argv[1:]

if len(kinds) == 0:
kinds = ALL_KINDS

print_func(
"This command will remove all entities from the database "
+ database
Expand All @@ -105,8 +105,10 @@ def run(database):
response = input("Is this OK [y/n]? ")

if response.lower() == "y":
for kind in kinds:
remove_kind(kind, client)
for namespace in ["", "LargeCharacterEntity", "MergejoinNamespace"]:
client = datastore.Client(database=database, namespace=namespace)
for kind in kinds:
remove_kind(kind, client)

else:
print_func("Doing nothing.")
Expand Down
93 changes: 92 additions & 1 deletion tests/system/utils/populate_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
LARGE_CHARACTER_NAMESPACE = "LargeCharacterEntity"
LARGE_CHARACTER_KIND = "LargeCharacter"

MERGEJOIN_QUERY_NUM_RESULTS = 7
MERGEJOIN_DATASET_INTERMEDIATE_OBJECTS = 20000
MERGEJOIN_DATASET_NAMESPACE = "MergejoinNamespace"
MERGEJOIN_DATASET_KIND = "Mergejoin"


def get_system_test_db():
return os.getenv("SYSTEM_TESTS_DATABASE") or "system-tests-named-db"
Expand Down Expand Up @@ -179,12 +184,92 @@ def add_timestamp_keys(client=None):
batch.put(entity)


def add_mergejoin_dataset_entities(client=None):
"""
Dataset to account for one bug that was seen in https://github.com/googleapis/python-datastore/issues/547
The root cause of this is us setting a subsequent query's start_cursor to skipped_cursor instead of end_cursor.
In niche scenarios involving mergejoins, skipped_cursor becomes empty and the query starts back from the beginning,
returning duplicate items.

This bug is able to be reproduced with a dataset shown in b/352377540, with 7 items of a=1, b=1
followed by 20k items of alternating a=1, b=0 and a=0, b=1, then 7 more a=1, b=1, then querying for all
items with a=1, b=1 and an offset of 8.
"""
client.namespace = MERGEJOIN_DATASET_NAMESPACE

# Query used for all tests
page_query = client.query(
kind=MERGEJOIN_DATASET_KIND, namespace=MERGEJOIN_DATASET_NAMESPACE
)

def create_entity(id, a, b):
key = client.key(MERGEJOIN_DATASET_KIND, id)
entity = datastore.Entity(key=key)
entity["a"] = a
entity["b"] = b
return entity

def put_objects(count):
id = 1
curr_intermediate_entries = 0

# Can only do 500 operations in a transaction with an overall
# size limit.
ENTITIES_TO_BATCH = 500

with client.transaction() as xact:
for _ in range(0, MERGEJOIN_QUERY_NUM_RESULTS):
entity = create_entity(id, 1, 1)
id += 1
xact.put(entity)

while curr_intermediate_entries < count - MERGEJOIN_QUERY_NUM_RESULTS:
start = curr_intermediate_entries
end = min(curr_intermediate_entries + ENTITIES_TO_BATCH, count)
with client.transaction() as xact:
# The name/ID for the new entity
for i in range(start, end):
if id % 2:
entity = create_entity(id, 0, 1)
else:
entity = create_entity(id, 1, 0)
id += 1

# Saves the entity
xact.put(entity)
curr_intermediate_entries += ENTITIES_TO_BATCH

with client.transaction() as xact:
for _ in range(0, MERGEJOIN_QUERY_NUM_RESULTS):
entity = create_entity(id, 1, 1)
id += 1
xact.put(entity)

# If anything exists in this namespace, delete it, since we need to
# set up something very specific.
all_entities = [e for e in page_query.fetch()]
if len(all_entities) > 0:
# Cleanup Collection if not an exact match
while all_entities:
entities = all_entities[:500]
all_entities = all_entities[500:]
client.delete_multi([e.key for e in entities])
# Put objects
put_objects(MERGEJOIN_DATASET_INTERMEDIATE_OBJECTS)


def run(database):
client = datastore.Client(database=database)
flags = sys.argv[1:]

if len(flags) == 0:
flags = ["--characters", "--uuid", "--timestamps"]
flags = [
"--characters",
"--uuid",
"--timestamps",
"--large-characters",
"--mergejoin",
]

if "--characters" in flags:
add_characters(client)
Expand All @@ -195,6 +280,12 @@ def run(database):
if "--timestamps" in flags:
add_timestamp_keys(client)

if "--large-characters" in flags:
add_large_character_entities(client)

if "--mergejoin" in flags:
add_mergejoin_dataset_entities(client)


def main():
for database in ["", get_system_test_db()]:
Expand Down