Skip to content

Commit

Permalink
ducky: test read_replica even if not data is uploaded yet
Browse files Browse the repository at this point in the history
Client may create a read replica topic when not all data is in S3 yet.
We should test this scenario.
  • Loading branch information
Elena Anyusheva committed Jul 26, 2022
1 parent 128386d commit 6eb292e
Showing 1 changed file with 22 additions and 44 deletions.
66 changes: 22 additions & 44 deletions tests/rptest/tests/read_replica_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
class TestReadReplicaService(EndToEndTest):
log_segment_size = 1048576 # 5MB
topic_name = "panda-topic"
s3_bucket_name = "panda-bucket"
si_settings = SISettings(
cloud_storage_bucket=s3_bucket_name,
cloud_storage_reconciliation_interval_ms=500,
cloud_storage_max_connections=5,
log_segment_size=log_segment_size,
Expand All @@ -50,7 +48,8 @@ def start_second_cluster(self):
def create_read_replica_topic(self):
rpk_second_cluster = RpkTool(self.second_cluster)
conf = {
'redpanda.remote.readreplica': self.s3_bucket_name,
'redpanda.remote.readreplica':
self.si_settings.cloud_storage_bucket,
}
rpk_second_cluster.create_topic(self.topic_name, config=conf)

Expand All @@ -74,6 +73,18 @@ def start_producer(self):
message_validator=is_int_with_prefix)
self.producer.start()

def create_read_replica_topic_success(self):
try:
self.create_read_replica_topic()
except RpkException as e:
if "The server experienced an unexpected error when processing the request" in str(
e):
return False
else:
raise
else:
return True

@cluster(num_nodes=6)
@matrix(partition_count=[10])
def test_produce_is_forbidden(self, partition_count):
Expand All @@ -91,26 +102,14 @@ def test_produce_is_forbidden(self, partition_count):

self.start_second_cluster()

def create_read_replica_topic_success():
try:
self.create_read_replica_topic()
except RpkException as e:
if "The server experienced an unexpected error when processing the request" in str(
e):
return False
else:
raise
else:
return True

# wait until the read replica topic creation succeeds
wait_until(
create_read_replica_topic_success,
self.create_read_replica_topic_success,
timeout_sec=
60, #should be uploaded since cloud_storage_segment_max_upload_interval_sec=5
backoff_sec=5,
err_msg=
f"Could not create read replica topic. Most likely because topic manifest is not in S3, in S3 bucket: {list(self.redpanda._s3client.list_objects(self.s3_bucket_name))}"
f"Could not create read replica topic. Most likely because topic manifest is not in S3, in S3 bucket: {list(self.redpanda._s3client.list_objects(self.si_settings.cloud_storage_bucket))}"
)

second_rpk = RpkTool(self.second_cluster)
Expand Down Expand Up @@ -144,38 +143,17 @@ def test_simple_end_to_end(self, partition_count, min_records):
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(spec.name, 'redpanda.remote.write', 'true')

# Make sure all produced data is uploaded to S3
def s3_has_all_data():
objects = list(
self.redpanda._s3client.list_objects(self.s3_bucket_name))
total_uploaded = 0
for o in objects:
if o.Key.endswith(
"/manifest.json") and self.topic_name in o.Key:
data = self.redpanda._s3client.get_object_data(
self.s3_bucket_name, o.Key)
manifest = json.loads(data)
last_upl_offset = manifest['last_offset']
total_uploaded += last_upl_offset
self.logger.info(
f"Found manifest at {o.Key}, last_offset is {last_upl_offset}"
)
self.logger.info(
f"Total uploaded: {total_uploaded}, num_acked: {self.producer.num_acked}"
)
return total_uploaded >= self.producer.num_acked

# Start second cluster and create read replica topic
self.start_second_cluster()
wait_until(
s3_has_all_data,
self.create_read_replica_topic_success,
timeout_sec=
30, #should be uploaded since cloud_storage_segment_max_upload_interval_sec=5
300, #should be uploaded since cloud_storage_segment_max_upload_interval_sec=5
backoff_sec=5,
err_msg=
f"Not all data is uploaded to S3 bucket, is S3 bucket: {list(self.redpanda._s3client.list_objects(self.s3_bucket_name))}"
f"Not all data is uploaded to S3 bucket, is S3 bucket: {list(self.redpanda._s3client.list_objects(self.si_settings.cloud_storage_bucket))}"
)

# Create read replica topic, consume from it and validate
self.start_second_cluster()
self.create_read_replica_topic()
# Consume from read replica topic and validate
self.start_consumer()
self.run_validation()

0 comments on commit 6eb292e

Please sign in to comment.