Skip to content

Commit

Permalink
Merge pull request #5620 from LenaAn/fix_test
Browse files Browse the repository at this point in the history
ducky: test read_replica even if not data is uploaded yet
  • Loading branch information
jcsp committed Jul 28, 2022
2 parents a3c1ee2 + 0aa5cb0 commit e83dd5b
Showing 1 changed file with 26 additions and 56 deletions.
82 changes: 26 additions & 56 deletions tests/rptest/tests/read_replica_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@
class TestReadReplicaService(EndToEndTest):
log_segment_size = 1048576 # 5MB
topic_name = "panda-topic"
s3_bucket_name = "panda-bucket"
si_settings = SISettings(
cloud_storage_bucket=s3_bucket_name,
cloud_storage_reconciliation_interval_ms=500,
cloud_storage_max_connections=5,
log_segment_size=log_segment_size,
Expand All @@ -51,7 +49,8 @@ def start_second_cluster(self) -> None:
def create_read_replica_topic(self) -> None:
rpk_second_cluster = RpkTool(self.second_cluster)
conf = {
'redpanda.remote.readreplica': self.s3_bucket_name,
'redpanda.remote.readreplica':
self.si_settings.cloud_storage_bucket,
}
rpk_second_cluster.create_topic(self.topic_name, config=conf)

Expand All @@ -75,6 +74,18 @@ def start_producer(self) -> None:
message_validator=is_int_with_prefix)
self.producer.start()

def create_read_replica_topic_success(self) -> bool:
try:
self.create_read_replica_topic()
except RpkException as e:
if "The server experienced an unexpected error when processing the request" in str(
e):
return False
else:
raise
else:
return True

@cluster(num_nodes=6)
@matrix(partition_count=[10])
def test_produce_is_forbidden(self, partition_count: int) -> None:
Expand All @@ -85,36 +96,20 @@ def test_produce_is_forbidden(self, partition_count: int) -> None:
replication_factor=3)

DefaultClient(self.redpanda).create_topic(spec)
assert self.redpanda and self.redpanda.s3_client

# Make original topic upload data to S3
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(spec.name, 'redpanda.remote.write', 'true')

self.start_second_cluster()

def create_read_replica_topic_success() -> bool:
try:
self.create_read_replica_topic()
except RpkException as e:
if "The server experienced an unexpected error when processing the request" in str(
e):
return False
else:
raise
else:
return True

assert self.redpanda and self.redpanda.s3_client
# wait until the read replica topic creation succeeds
wait_until(
create_read_replica_topic_success,
timeout_sec=
60, #should be uploaded since cloud_storage_segment_max_upload_interval_sec=5
self.create_read_replica_topic_success,
timeout_sec=300,
backoff_sec=5,
err_msg="Could not create read replica topic. Most likely " +
"because topic manifest is not in S3, in S3 bucket: " +
f"{list(self.redpanda.s3_client.list_objects(self.s3_bucket_name))}"
)
"because topic manifest is not in S3.")

second_rpk = RpkTool(self.second_cluster)
with expect_exception(
Expand All @@ -134,6 +129,7 @@ def test_simple_end_to_end(self, partition_count: int,
replication_factor=3)

DefaultClient(self.redpanda).create_topic(spec)
assert self.redpanda and self.redpanda.s3_client # drops optional types

self.start_producer()
wait_until(lambda: self.producer.num_acked > min_records,
Expand All @@ -148,42 +144,16 @@ def test_simple_end_to_end(self, partition_count: int,
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(spec.name, 'redpanda.remote.write', 'true')

assert self.redpanda and self.redpanda.s3_client # drops optional types

# Make sure all produced data is uploaded to S3
def s3_has_all_data() -> bool:
# pyright doesn't consider the assert in outer scope
assert self.redpanda and self.redpanda.s3_client
objects = list(
self.redpanda.s3_client.list_objects(self.s3_bucket_name))
total_uploaded = 0
for o in objects:
if o.Key.endswith(
"/manifest.json") and self.topic_name in o.Key:
data = self.redpanda.s3_client.get_object_data(
self.s3_bucket_name, o.Key)
manifest = json.loads(data)
last_upl_offset = manifest['last_offset']
total_uploaded += last_upl_offset
self.logger.info(
f"Found manifest at {o.Key}, last_offset is {last_upl_offset}"
)
self.logger.info(
f"Total uploaded: {total_uploaded}, num_acked: {self.producer.num_acked}"
)
return total_uploaded >= self.producer.num_acked

# Start second cluster and create read replica topic
self.start_second_cluster()
wait_until(
s3_has_all_data,
timeout_sec=
30, #should be uploaded since cloud_storage_segment_max_upload_interval_sec=5
self.create_read_replica_topic_success,
timeout_sec=300,
backoff_sec=5,
err_msg="Not all data is uploaded to S3 bucket: " +
f"{list(self.redpanda.s3_client.list_objects(self.s3_bucket_name))}"
err_msg=
f"Could not create read replica topic. Most likely because topic manifest is not in S3"
)

# Create read replica topic, consume from it and validate
self.start_second_cluster()
self.create_read_replica_topic()
# Consume from read replica topic and validate
self.start_consumer()
self.run_validation()

0 comments on commit e83dd5b

Please sign in to comment.