Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ducky: test read_replica even if not data is uploaded yet #5620

Merged
merged 1 commit into from
Jul 28, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 26 additions & 56 deletions tests/rptest/tests/read_replica_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
class TestReadReplicaService(EndToEndTest):
log_segment_size = 1048576 # 5MB
topic_name = "panda-topic"
s3_bucket_name = "panda-bucket"
si_settings = SISettings(
cloud_storage_bucket=s3_bucket_name,
cloud_storage_reconciliation_interval_ms=500,
cloud_storage_max_connections=5,
log_segment_size=log_segment_size,
Expand All @@ -51,7 +49,8 @@ def start_second_cluster(self) -> None:
def create_read_replica_topic(self) -> None:
rpk_second_cluster = RpkTool(self.second_cluster)
conf = {
'redpanda.remote.readreplica': self.s3_bucket_name,
'redpanda.remote.readreplica':
self.si_settings.cloud_storage_bucket,
}
rpk_second_cluster.create_topic(self.topic_name, config=conf)

Expand All @@ -75,6 +74,18 @@ def start_producer(self) -> None:
message_validator=is_int_with_prefix)
self.producer.start()

def create_read_replica_topic_success(self) -> bool:
try:
self.create_read_replica_topic()
except RpkException as e:
if "The server experienced an unexpected error when processing the request" in str(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this function is just getting moved, but it would benefit from a comment explaining why "unexpected error" is the expected error in this context -- I'm guessing this is because read replica topic creation doesn't have a suitable kafka error code and so it uses UNKNOWN_SERVER_ERROR?

(Maybe this was already discussed, but perhaps something like UNKNOWN_TOPIC_OR_PARTITION would help to distinguish this case from true internal errors)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @mattschumpert do you have an opinion on error code when redpanda can't create read replica topic because there's no corresponding data in S3?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LenaAn ,

something like

'Error creating Remote Read Replica topic: No topic data found for topic {foo} at s3://{bucket_url_with_pathl}'

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattschumpert : I think the request here is for suggestion of a good/relevant kafka error code.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piyushredpanda , @LenaAn

In this case, I suggest:

LOG_DIR_NOT_FOUND | 57 | False | The user-specified log directory is not found in the broker config.

or

KAFKA_STORAGE_ERROR | 56 | True | Disk error when trying to access log file on the disk.

LOG_DIR_NOT_FOUND is actually quite accurate.

e):
return False
else:
raise
else:
return True

@cluster(num_nodes=6)
@matrix(partition_count=[10])
def test_produce_is_forbidden(self, partition_count: int) -> None:
Expand All @@ -85,36 +96,20 @@ def test_produce_is_forbidden(self, partition_count: int) -> None:
replication_factor=3)

DefaultClient(self.redpanda).create_topic(spec)
assert self.redpanda and self.redpanda.s3_client

# Make original topic upload data to S3
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(spec.name, 'redpanda.remote.write', 'true')

self.start_second_cluster()

def create_read_replica_topic_success() -> bool:
try:
self.create_read_replica_topic()
except RpkException as e:
if "The server experienced an unexpected error when processing the request" in str(
e):
return False
else:
raise
else:
return True

assert self.redpanda and self.redpanda.s3_client
# wait until the read replica topic creation succeeds
wait_until(
create_read_replica_topic_success,
timeout_sec=
60, #should be uploaded since cloud_storage_segment_max_upload_interval_sec=5
self.create_read_replica_topic_success,
timeout_sec=300,
backoff_sec=5,
err_msg="Could not create read replica topic. Most likely " +
"because topic manifest is not in S3, in S3 bucket: " +
f"{list(self.redpanda.s3_client.list_objects(self.s3_bucket_name))}"
)
"because topic manifest is not in S3.")

second_rpk = RpkTool(self.second_cluster)
with expect_exception(
Expand All @@ -134,6 +129,7 @@ def test_simple_end_to_end(self, partition_count: int,
replication_factor=3)

DefaultClient(self.redpanda).create_topic(spec)
assert self.redpanda and self.redpanda.s3_client # drops optional types

self.start_producer()
wait_until(lambda: self.producer.num_acked > min_records,
Expand All @@ -148,42 +144,16 @@ def test_simple_end_to_end(self, partition_count: int,
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(spec.name, 'redpanda.remote.write', 'true')

assert self.redpanda and self.redpanda.s3_client # drops optional types

# Make sure all produced data is uploaded to S3
def s3_has_all_data() -> bool:
# pyright doesn't consider the assert in outer scope
assert self.redpanda and self.redpanda.s3_client
objects = list(
self.redpanda.s3_client.list_objects(self.s3_bucket_name))
total_uploaded = 0
for o in objects:
if o.Key.endswith(
"/manifest.json") and self.topic_name in o.Key:
data = self.redpanda.s3_client.get_object_data(
self.s3_bucket_name, o.Key)
manifest = json.loads(data)
last_upl_offset = manifest['last_offset']
total_uploaded += last_upl_offset
self.logger.info(
f"Found manifest at {o.Key}, last_offset is {last_upl_offset}"
)
self.logger.info(
f"Total uploaded: {total_uploaded}, num_acked: {self.producer.num_acked}"
)
return total_uploaded >= self.producer.num_acked

# Start second cluster and create read replica topic
self.start_second_cluster()
wait_until(
s3_has_all_data,
timeout_sec=
30, #should be uploaded since cloud_storage_segment_max_upload_interval_sec=5
self.create_read_replica_topic_success,
timeout_sec=300,
backoff_sec=5,
err_msg="Not all data is uploaded to S3 bucket: " +
f"{list(self.redpanda.s3_client.list_objects(self.s3_bucket_name))}"
err_msg=
f"Could not create read replica topic. Most likely because topic manifest is not in S3"
)

# Create read replica topic, consume from it and validate
self.start_second_cluster()
self.create_read_replica_topic()
# Consume from read replica topic and validate
self.start_consumer()
self.run_validation()