From b67fbea6ab19f047af565a1d553fe3b33107ff78 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Fri, 2 Aug 2024 16:40:44 +0000 Subject: [PATCH 1/5] test: Added system test for query offset issue --- tests/system/test_query.py | 33 +++++++++ tests/system/utils/clear_datastore.py | 18 ++--- tests/system/utils/populate_datastore.py | 88 +++++++++++++++++++++++- 3 files changed, 130 insertions(+), 9 deletions(-) diff --git a/tests/system/test_query.py b/tests/system/test_query.py index 9f902205..e9528f50 100644 --- a/tests/system/test_query.py +++ b/tests/system/test_query.py @@ -337,6 +337,17 @@ def large_query_client(datastore_client): return large_query_client +@pytest.fixture(scope="session") +def mergejoin_query_client(datastore_client): + mergejoin_query_client = _helpers.clone_client( + datastore_client, + namespace=populate_datastore.MERGEJOIN_DATASET_NAMESPACE, + ) + populate_datastore.add_mergejoin_dataset_entities(client=mergejoin_query_client) + + return mergejoin_query_client + + @pytest.fixture(scope="function") def large_query(large_query_client): # Use the client for this test instead of the global. @@ -346,6 +357,15 @@ def large_query(large_query_client): ) +@pytest.fixture(scope="function") +def mergejoin_query(mergejoin_query_client): + # Use the client for this test instead of the global. + return mergejoin_query_client.query( + kind=populate_datastore.MERGEJOIN_DATASET_KIND, + namespace=populate_datastore.MERGEJOIN_DATASET_NAMESPACE + ) + + @pytest.mark.parametrize( "limit,offset,expected", [ @@ -385,6 +405,19 @@ def test_large_query(large_query, limit, offset, expected, database_id): assert len(entities) == expected +def test_mergejoin_query(mergejoin_query): + query = mergejoin_query + query.add_filter(filter=PropertyFilter('a', '=', 1)) + query.add_filter(filter=PropertyFilter('b', '=', 1)) + + # There should be 2 * MERGEJOIN_QUERY_NUM_RESULTS results total + expected_total = 2 * populate_datastore.MERGEJOIN_QUERY_NUM_RESULTS + for offset in range(0, expected_total + 1): + iterator = query.fetch(offset=offset) + num_entities = len([e for e in iterator]) + assert num_entities == expected_total - offset + + @pytest.mark.parametrize("database_id", [None, _helpers.TEST_DATABASE], indirect=True) def test_query_add_property_filter(ancestor_query, database_id): query = ancestor_query diff --git a/tests/system/utils/clear_datastore.py b/tests/system/utils/clear_datastore.py index 2082bce7..8f0ca752 100644 --- a/tests/system/utils/clear_datastore.py +++ b/tests/system/utils/clear_datastore.py @@ -31,6 +31,8 @@ "Post", "uuid_key", "timestamp_key", + "LargeCharacter", + "Mergejoin" ) TRANSACTION_MAX_GROUPS = 5 MAX_DEL_ENTITIES = 500 @@ -90,23 +92,23 @@ def remove_all_entities(client): def run(database): - client = datastore.Client(database=database) kinds = sys.argv[1:] if len(kinds) == 0: kinds = ALL_KINDS - print_func( - "This command will remove all entities from the database " - + database - + " for the following kinds:" - ) + "This command will remove all entities from the database " + + database + + " for the following kinds:" + ) print_func("\n".join("- " + val for val in kinds)) response = input("Is this OK [y/n]? ") if response.lower() == "y": - for kind in kinds: - remove_kind(kind, client) + for namespace in ["", "LargeCharacterEntity", "MergejoinNamespace"]: + client = datastore.Client(database=database, namespace=namespace) + for kind in kinds: + remove_kind(kind, client) else: print_func("Doing nothing.") diff --git a/tests/system/utils/populate_datastore.py b/tests/system/utils/populate_datastore.py index 9077241f..62077b39 100644 --- a/tests/system/utils/populate_datastore.py +++ b/tests/system/utils/populate_datastore.py @@ -58,6 +58,11 @@ LARGE_CHARACTER_NAMESPACE = "LargeCharacterEntity" LARGE_CHARACTER_KIND = "LargeCharacter" +MERGEJOIN_QUERY_NUM_RESULTS = 7 +MERGEJOIN_DATASET_INTERMEDIATE_OBJECTS = 20000 +MERGEJOIN_DATASET_NAMESPACE = "MergejoinNamespace" +MERGEJOIN_DATASET_KIND = "Mergejoin" + def get_system_test_db(): return os.getenv("SYSTEM_TESTS_DATABASE") or "system-tests-named-db" @@ -179,12 +184,87 @@ def add_timestamp_keys(client=None): batch.put(entity) +def add_mergejoin_dataset_entities(client=None): + """ + Dataset to account for one bug that was seen in https://github.com/googleapis/python-datastore/issues/547 + The root cause of this is us setting a subsequent query's start_cursor to skipped_cursor instead of end_cursor. + In niche scenarios involving mergejoins, skipped_cursor becomes empty and the query starts back from the beginning, + returning duplicate items. + + This bug is able to be reproduced with a dataset shown in b/352377540, with 7 items of a=1, b=1 + followed by 20k items of alternating a=1, b=0 and a=0, b=1, then 7 more a=1, b=1, then querying for all + items with a=1, b=1 and an offset of 8. + """ + client.namespace = MERGEJOIN_DATASET_NAMESPACE + + # Query used for all tests + page_query = client.query( + kind=MERGEJOIN_DATASET_KIND, namespace=MERGEJOIN_DATASET_NAMESPACE + ) + + def create_entity(id, a, b): + key = client.key(MERGEJOIN_DATASET_KIND, id) + entity = datastore.Entity(key=key) + entity["a"] = a + entity["b"] = b + return entity + + def put_objects(count): + id = 1 + curr_intermediate_entries = 0 + + # Can only do 500 operations in a transaction with an overall + # size limit. + ENTITIES_TO_BATCH = 500 + + with client.transaction() as xact: + for _ in range(0, MERGEJOIN_QUERY_NUM_RESULTS): + entity = create_entity(id, 1, 1) + id += 1 + xact.put(entity) + + while curr_intermediate_entries < count - MERGEJOIN_QUERY_NUM_RESULTS: + start = curr_intermediate_entries + end = min(curr_intermediate_entries + ENTITIES_TO_BATCH, count) + with client.transaction() as xact: + # The name/ID for the new entity + for i in range(start, end): + if id % 2: + entity = create_entity(id, 0, 1) + else: + entity = create_entity(id, 1, 0) + id += 1 + + # Saves the entity + xact.put(entity) + curr_intermediate_entries += ENTITIES_TO_BATCH + + with client.transaction() as xact: + for _ in range(0, MERGEJOIN_QUERY_NUM_RESULTS): + entity = create_entity(id, 1, 1) + id += 1 + xact.put(entity) + + + # If anything exists in this namespace, delete it, since we need to + # set up something very specific. + all_entities = [e for e in page_query.fetch()] + if len(all_entities) > 0: + # Cleanup Collection if not an exact match + while all_entities: + entities = all_entities[:500] + all_entities = all_entities[500:] + client.delete_multi([e.key for e in entities]) + # Put objects + put_objects(MERGEJOIN_DATASET_INTERMEDIATE_OBJECTS) + + def run(database): client = datastore.Client(database=database) flags = sys.argv[1:] if len(flags) == 0: - flags = ["--characters", "--uuid", "--timestamps"] + flags = ["--characters", "--uuid", "--timestamps", "--large-characters", "--mergejoin"] if "--characters" in flags: add_characters(client) @@ -194,6 +274,12 @@ def run(database): if "--timestamps" in flags: add_timestamp_keys(client) + + if "--large-characters" in flags: + add_large_character_entities(client) + + if "--mergejoin" in flags: + add_mergejoin_dataset_entities(client) def main(): From cbda773785e5c72dba377bbbd03c268419315e08 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Fri, 2 Aug 2024 17:14:48 +0000 Subject: [PATCH 2/5] linting --- tests/system/test_query.py | 6 +++--- tests/system/utils/clear_datastore.py | 10 +++++----- tests/system/utils/populate_datastore.py | 19 ++++++++++++------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/tests/system/test_query.py b/tests/system/test_query.py index e9528f50..325bcf83 100644 --- a/tests/system/test_query.py +++ b/tests/system/test_query.py @@ -362,7 +362,7 @@ def mergejoin_query(mergejoin_query_client): # Use the client for this test instead of the global. return mergejoin_query_client.query( kind=populate_datastore.MERGEJOIN_DATASET_KIND, - namespace=populate_datastore.MERGEJOIN_DATASET_NAMESPACE + namespace=populate_datastore.MERGEJOIN_DATASET_NAMESPACE, ) @@ -407,8 +407,8 @@ def test_large_query(large_query, limit, offset, expected, database_id): def test_mergejoin_query(mergejoin_query): query = mergejoin_query - query.add_filter(filter=PropertyFilter('a', '=', 1)) - query.add_filter(filter=PropertyFilter('b', '=', 1)) + query.add_filter(filter=PropertyFilter("a", "=", 1)) + query.add_filter(filter=PropertyFilter("b", "=", 1)) # There should be 2 * MERGEJOIN_QUERY_NUM_RESULTS results total expected_total = 2 * populate_datastore.MERGEJOIN_QUERY_NUM_RESULTS diff --git a/tests/system/utils/clear_datastore.py b/tests/system/utils/clear_datastore.py index 8f0ca752..05a63b31 100644 --- a/tests/system/utils/clear_datastore.py +++ b/tests/system/utils/clear_datastore.py @@ -32,7 +32,7 @@ "uuid_key", "timestamp_key", "LargeCharacter", - "Mergejoin" + "Mergejoin", ) TRANSACTION_MAX_GROUPS = 5 MAX_DEL_ENTITIES = 500 @@ -97,10 +97,10 @@ def run(database): if len(kinds) == 0: kinds = ALL_KINDS print_func( - "This command will remove all entities from the database " - + database - + " for the following kinds:" - ) + "This command will remove all entities from the database " + + database + + " for the following kinds:" + ) print_func("\n".join("- " + val for val in kinds)) response = input("Is this OK [y/n]? ") diff --git a/tests/system/utils/populate_datastore.py b/tests/system/utils/populate_datastore.py index 62077b39..0eea15fb 100644 --- a/tests/system/utils/populate_datastore.py +++ b/tests/system/utils/populate_datastore.py @@ -189,7 +189,7 @@ def add_mergejoin_dataset_entities(client=None): Dataset to account for one bug that was seen in https://github.com/googleapis/python-datastore/issues/547 The root cause of this is us setting a subsequent query's start_cursor to skipped_cursor instead of end_cursor. In niche scenarios involving mergejoins, skipped_cursor becomes empty and the query starts back from the beginning, - returning duplicate items. + returning duplicate items. This bug is able to be reproduced with a dataset shown in b/352377540, with 7 items of a=1, b=1 followed by 20k items of alternating a=1, b=0 and a=0, b=1, then 7 more a=1, b=1, then querying for all @@ -222,7 +222,7 @@ def put_objects(count): entity = create_entity(id, 1, 1) id += 1 xact.put(entity) - + while curr_intermediate_entries < count - MERGEJOIN_QUERY_NUM_RESULTS: start = curr_intermediate_entries end = min(curr_intermediate_entries + ENTITIES_TO_BATCH, count) @@ -238,14 +238,13 @@ def put_objects(count): # Saves the entity xact.put(entity) curr_intermediate_entries += ENTITIES_TO_BATCH - + with client.transaction() as xact: for _ in range(0, MERGEJOIN_QUERY_NUM_RESULTS): entity = create_entity(id, 1, 1) id += 1 xact.put(entity) - # If anything exists in this namespace, delete it, since we need to # set up something very specific. all_entities = [e for e in page_query.fetch()] @@ -264,7 +263,13 @@ def run(database): flags = sys.argv[1:] if len(flags) == 0: - flags = ["--characters", "--uuid", "--timestamps", "--large-characters", "--mergejoin"] + flags = [ + "--characters", + "--uuid", + "--timestamps", + "--large-characters", + "--mergejoin", + ] if "--characters" in flags: add_characters(client) @@ -274,10 +279,10 @@ def run(database): if "--timestamps" in flags: add_timestamp_keys(client) - + if "--large-characters" in flags: add_large_character_entities(client) - + if "--mergejoin" in flags: add_mergejoin_dataset_entities(client) From 3603fc55304e3716c40e1d267c0897970c15b964 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 2 Aug 2024 17:17:03 +0000 Subject: [PATCH 3/5] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- tests/system/test_query.py | 6 +++--- tests/system/utils/clear_datastore.py | 10 +++++----- tests/system/utils/populate_datastore.py | 19 ++++++++++++------- 3 files changed, 20 insertions(+), 15 deletions(-) diff --git a/tests/system/test_query.py b/tests/system/test_query.py index e9528f50..325bcf83 100644 --- a/tests/system/test_query.py +++ b/tests/system/test_query.py @@ -362,7 +362,7 @@ def mergejoin_query(mergejoin_query_client): # Use the client for this test instead of the global. return mergejoin_query_client.query( kind=populate_datastore.MERGEJOIN_DATASET_KIND, - namespace=populate_datastore.MERGEJOIN_DATASET_NAMESPACE + namespace=populate_datastore.MERGEJOIN_DATASET_NAMESPACE, ) @@ -407,8 +407,8 @@ def test_large_query(large_query, limit, offset, expected, database_id): def test_mergejoin_query(mergejoin_query): query = mergejoin_query - query.add_filter(filter=PropertyFilter('a', '=', 1)) - query.add_filter(filter=PropertyFilter('b', '=', 1)) + query.add_filter(filter=PropertyFilter("a", "=", 1)) + query.add_filter(filter=PropertyFilter("b", "=", 1)) # There should be 2 * MERGEJOIN_QUERY_NUM_RESULTS results total expected_total = 2 * populate_datastore.MERGEJOIN_QUERY_NUM_RESULTS diff --git a/tests/system/utils/clear_datastore.py b/tests/system/utils/clear_datastore.py index 8f0ca752..05a63b31 100644 --- a/tests/system/utils/clear_datastore.py +++ b/tests/system/utils/clear_datastore.py @@ -32,7 +32,7 @@ "uuid_key", "timestamp_key", "LargeCharacter", - "Mergejoin" + "Mergejoin", ) TRANSACTION_MAX_GROUPS = 5 MAX_DEL_ENTITIES = 500 @@ -97,10 +97,10 @@ def run(database): if len(kinds) == 0: kinds = ALL_KINDS print_func( - "This command will remove all entities from the database " - + database - + " for the following kinds:" - ) + "This command will remove all entities from the database " + + database + + " for the following kinds:" + ) print_func("\n".join("- " + val for val in kinds)) response = input("Is this OK [y/n]? ") diff --git a/tests/system/utils/populate_datastore.py b/tests/system/utils/populate_datastore.py index 62077b39..0eea15fb 100644 --- a/tests/system/utils/populate_datastore.py +++ b/tests/system/utils/populate_datastore.py @@ -189,7 +189,7 @@ def add_mergejoin_dataset_entities(client=None): Dataset to account for one bug that was seen in https://github.com/googleapis/python-datastore/issues/547 The root cause of this is us setting a subsequent query's start_cursor to skipped_cursor instead of end_cursor. In niche scenarios involving mergejoins, skipped_cursor becomes empty and the query starts back from the beginning, - returning duplicate items. + returning duplicate items. This bug is able to be reproduced with a dataset shown in b/352377540, with 7 items of a=1, b=1 followed by 20k items of alternating a=1, b=0 and a=0, b=1, then 7 more a=1, b=1, then querying for all @@ -222,7 +222,7 @@ def put_objects(count): entity = create_entity(id, 1, 1) id += 1 xact.put(entity) - + while curr_intermediate_entries < count - MERGEJOIN_QUERY_NUM_RESULTS: start = curr_intermediate_entries end = min(curr_intermediate_entries + ENTITIES_TO_BATCH, count) @@ -238,14 +238,13 @@ def put_objects(count): # Saves the entity xact.put(entity) curr_intermediate_entries += ENTITIES_TO_BATCH - + with client.transaction() as xact: for _ in range(0, MERGEJOIN_QUERY_NUM_RESULTS): entity = create_entity(id, 1, 1) id += 1 xact.put(entity) - # If anything exists in this namespace, delete it, since we need to # set up something very specific. all_entities = [e for e in page_query.fetch()] @@ -264,7 +263,13 @@ def run(database): flags = sys.argv[1:] if len(flags) == 0: - flags = ["--characters", "--uuid", "--timestamps", "--large-characters", "--mergejoin"] + flags = [ + "--characters", + "--uuid", + "--timestamps", + "--large-characters", + "--mergejoin", + ] if "--characters" in flags: add_characters(client) @@ -274,10 +279,10 @@ def run(database): if "--timestamps" in flags: add_timestamp_keys(client) - + if "--large-characters" in flags: add_large_character_entities(client) - + if "--mergejoin" in flags: add_mergejoin_dataset_entities(client) From 46ace77778cc525082d84de332628627284095f0 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Fri, 2 Aug 2024 17:36:30 +0000 Subject: [PATCH 4/5] fixed test --- tests/system/test_query.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/system/test_query.py b/tests/system/test_query.py index 325bcf83..3b375519 100644 --- a/tests/system/test_query.py +++ b/tests/system/test_query.py @@ -405,7 +405,8 @@ def test_large_query(large_query, limit, offset, expected, database_id): assert len(entities) == expected -def test_mergejoin_query(mergejoin_query): +@pytest.mark.parametrize("database_id", [None, _helpers.TEST_DATABASE], indirect=True) +def test_mergejoin_query(mergejoin_query, database_id): query = mergejoin_query query.add_filter(filter=PropertyFilter("a", "=", 1)) query.add_filter(filter=PropertyFilter("b", "=", 1)) From 3e0e91a409e520ea2b221c26fa41d4755e676c56 Mon Sep 17 00:00:00 2001 From: Kevin Zheng Date: Tue, 6 Aug 2024 20:00:04 +0000 Subject: [PATCH 5/5] Removed testing the default database --- tests/system/test_query.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_query.py b/tests/system/test_query.py index 3b375519..b9574789 100644 --- a/tests/system/test_query.py +++ b/tests/system/test_query.py @@ -405,7 +405,7 @@ def test_large_query(large_query, limit, offset, expected, database_id): assert len(entities) == expected -@pytest.mark.parametrize("database_id", [None, _helpers.TEST_DATABASE], indirect=True) +@pytest.mark.parametrize("database_id", [_helpers.TEST_DATABASE], indirect=True) def test_mergejoin_query(mergejoin_query, database_id): query = mergejoin_query query.add_filter(filter=PropertyFilter("a", "=", 1))