Skip to content

Commit

Permalink
tests: uses action injection in e2e SI test
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijat committed May 5, 2022
1 parent 96c1d65 commit 3d6686f
Showing 1 changed file with 67 additions and 29 deletions.
96 changes: 67 additions & 29 deletions tests/rptest/tests/e2e_shadow_indexing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,70 +7,74 @@
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0

from rptest.services.cluster import cluster
from rptest.services.redpanda import SISettings
import os
import random

from ducktape.tests.test import TestContext
from ducktape.utils.util import wait_until

from rptest.clients.kafka_cli_tools import KafkaCliTools
from rptest.clients.rpk import RpkTool
from rptest.clients.types import TopicSpec
from rptest.services.redpanda import RedpandaService, SISettings
from rptest.util import Scale
from rptest.services.action_injector import random_process_kills
from rptest.services.cluster import cluster
from rptest.services.franz_go_verifiable_services import FranzGoVerifiableProducer, FranzGoVerifiableRandomConsumer
from rptest.services.franz_go_verifiable_services import ServiceStatus, await_minimum_produced_records
from rptest.services.redpanda import RedpandaService, CHAOS_LOG_ALLOW_LIST
from rptest.services.redpanda import SISettings
from rptest.tests.end_to_end import EndToEndTest
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.util import Scale
from rptest.util import (
produce_until_segments,
wait_for_segments_removal,
)

from rptest.services.franz_go_verifiable_services import FranzGoVerifiableProducer, FranzGoVerifiableRandomConsumer
from rptest.services.franz_go_verifiable_services import ServiceStatus, await_minimum_produced_records
from rptest.tests.prealloc_nodes import PreallocNodesTest
from rptest.clients.rpk import RpkTool
from ducktape.utils.util import wait_until
from ducktape.tests.test import TestContext

import os
import random


class EndToEndShadowIndexingTest(EndToEndTest):
class EndToEndShadowIndexingBase(EndToEndTest):
segment_size = 1048576 # 1 Mb
s3_topic_name = "panda-topic"

num_brokers = 3

topics = (TopicSpec(
name=s3_topic_name,
partition_count=1,
replication_factor=3,
), )

def __init__(self, test_context):
super(EndToEndShadowIndexingTest,
def __init__(self, test_context, extra_rp_conf=None):
super(EndToEndShadowIndexingBase,
self).__init__(test_context=test_context)

self.test_context = test_context
self.topic = EndToEndShadowIndexingTest.s3_topic_name

si_settings = SISettings(
self.si_settings = SISettings(
cloud_storage_reconciliation_interval_ms=500,
cloud_storage_max_connections=5,
log_segment_size=EndToEndShadowIndexingTest.segment_size, # 1MB
)
self.s3_bucket_name = si_settings.cloud_storage_bucket

si_settings.load_context(self.logger, test_context)

self.s3_bucket_name = self.si_settings.cloud_storage_bucket
self.si_settings.load_context(self.logger, test_context)
self.scale = Scale(test_context)
self.redpanda = RedpandaService(
context=test_context,
num_brokers=3,
si_settings=si_settings,
)

self.redpanda = RedpandaService(context=self.test_context,
num_brokers=self.num_brokers,
si_settings=self.si_settings,
extra_rp_conf=extra_rp_conf)
self.kafka_tools = KafkaCliTools(self.redpanda)

def setUp(self):
self.redpanda.start()
for topic in EndToEndShadowIndexingTest.topics:
for topic in EndToEndShadowIndexingBase.topics:
self.kafka_tools.create_topic(topic)

def tearDown(self):
self.s3_client.empty_bucket(self.s3_bucket_name)


class EndToEndShadowIndexingTest(EndToEndShadowIndexingBase):
@cluster(num_nodes=5)
def test_write(self):
"""Write at least 10 segments, set retention policy to leave only 5
Expand All @@ -95,11 +99,45 @@ def test_write(self):
topic=self.topic,
partition_idx=0,
count=6)

self.start_consumer()
self.run_validation()


class EndToEndShadowIndexingTestWithDisruptions(EndToEndShadowIndexingBase):
def __init__(self, test_context):
super().__init__(test_context,
extra_rp_conf={
'default_topic_replications': self.num_brokers,
})

@cluster(num_nodes=5, log_allow_list=CHAOS_LOG_ALLOW_LIST)
def test_write_with_node_failures(self):
self.start_producer()
produce_until_segments(
redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
count=10,
)

self.kafka_tools.alter_topic_config(
self.topic,
{
TopicSpec.PROPERTY_RETENTION_BYTES:
5 * EndToEndShadowIndexingTest.segment_size
},
)

with random_process_kills(self.redpanda) as ctx:
wait_for_segments_removal(redpanda=self.redpanda,
topic=self.topic,
partition_idx=0,
count=6)
self.start_consumer()
self.run_validation()
ctx.assert_actions_triggered()


class ShadowIndexingWhileBusyTest(PreallocNodesTest):
# With SI enabled, run common operations against a cluster
# while the system is under load (busy).
Expand Down

0 comments on commit 3d6686f

Please sign in to comment.