Skip to content

Commit

Permalink
Merge pull request #4478 from LenaAn/si_blocked_s3
Browse files Browse the repository at this point in the history
ducky: add si test with blocked s3
  • Loading branch information
Lena Anyusheva committed Apr 30, 2022
2 parents 147ccde + b9ccdaa commit aaa9bec
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 26 deletions.
27 changes: 1 addition & 26 deletions tests/rptest/tests/archival_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
segments_count,
produce_until_segments,
wait_for_segments_removal,
firewall_blocked,
)

from ducktape.mark import matrix
Expand Down Expand Up @@ -78,32 +79,6 @@ def validate(fn, logger, timeout_sec, backoff_sec=5):
])


class firewall_blocked:
"""Temporary firewall barrier that isolates set of redpanda
nodes from the ip-address"""
def __init__(self, nodes, blocked_port):
self._nodes = nodes
self._port = blocked_port

def __enter__(self):
"""Isolate certain ips from the nodes using firewall rules"""
cmd = []
cmd.append(f"iptables -A INPUT -p tcp --sport {self._port} -j DROP")
cmd.append(f"iptables -A OUTPUT -p tcp --dport {self._port} -j DROP")
cmd = " && ".join(cmd)
for node in self._nodes:
node.account.ssh_output(cmd, allow_fail=False)

def __exit__(self, type, value, traceback):
"""Remove firewall rules that isolate ips from the nodes"""
cmd = []
cmd.append(f"iptables -D INPUT -p tcp --sport {self._port} -j DROP")
cmd.append(f"iptables -D OUTPUT -p tcp --dport {self._port} -j DROP")
cmd = " && ".join(cmd)
for node in self._nodes:
node.account.ssh_output(cmd, allow_fail=False)


def _parse_normalized_segment_path(path, md5, segment_size):
"""Parse path like 'kafka/panda-topic/1_8/3319-1-v1.log' and
return the components - topic: panda-topic, ns: kafka, partition: 1
Expand Down
77 changes: 77 additions & 0 deletions tests/rptest/tests/shadow_indexing_firewall_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2021 Redpanda Data, Inc.
#
# Licensed as a Redpanda Enterprise file under the Redpanda Community
# License (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md

from rptest.services.cluster import cluster
from rptest.tests.redpanda_test import RedpandaTest
from rptest.services.redpanda import SISettings

from rptest.clients.types import TopicSpec
from rptest.clients.rpk import RpkTool, RpkException
from rptest.util import (
produce_until_segments,
wait_for_segments_removal,
firewall_blocked,
)

# Log errors expected when connectivity between redpanda and the S3
# backend is disrupted
CONNECTION_ERROR_LOGS = [
"archival - .*Failed to create archivers",

# e.g. archival - [fiber1] - service.cc:484 - Failed to upload 3 segments out of 4
r"archival - .*Failed to upload \d+ segments"
]


class ShadowIndexingFirewallTest(RedpandaTest):
log_segment_size = 1048576 # 1MB
retention_bytes = 1024 # 1 KB

s3_topic_name = "panda-topic"
topics = (TopicSpec(name=s3_topic_name,
partition_count=1,
replication_factor=3), )

def __init__(self, test_context):
si_settings = SISettings(cloud_storage_reconciliation_interval_ms=500,
cloud_storage_max_connections=5,
log_segment_size=self.log_segment_size)

super(ShadowIndexingFirewallTest,
self).__init__(test_context=test_context,
si_settings=si_settings)

self._s3_port = si_settings.cloud_storage_api_endpoint_port
self.rpk = RpkTool(self.redpanda)

@cluster(num_nodes=3, log_allow_list=CONNECTION_ERROR_LOGS)
def test_consume_from_blocked_s3(self):
produce_until_segments(redpanda=self.redpanda,
topic=self.s3_topic_name,
partition_idx=0,
count=5,
acks=-1)

self.rpk.alter_topic_config(self.s3_topic_name,
TopicSpec.PROPERTY_RETENTION_BYTES,
self.retention_bytes)

wait_for_segments_removal(redpanda=self.redpanda,
topic=self.s3_topic_name,
partition_idx=0,
count=4)
"""Disconnect redpanda from S3 and try to read starting with offset 0"""
with firewall_blocked(self.redpanda.nodes, self._s3_port):
try:
out = self.rpk.consume(topic=self.s3_topic_name)
except RpkException as e:
assert 'timed out' in e.msg
else:
raise RuntimeError(
f"RPK consume should have timed out, but ran with output: {out}"
)
26 changes: 26 additions & 0 deletions tests/rptest/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,29 @@ def inject_remote_script(node, script_name):
node.account.copy_to(script_path, remote_path)

return remote_path


class firewall_blocked:
"""Temporary firewall barrier that isolates set of redpanda
nodes from the ip-address"""
def __init__(self, nodes, blocked_port):
self._nodes = nodes
self._port = blocked_port

def __enter__(self):
"""Isolate certain ips from the nodes using firewall rules"""
cmd = []
cmd.append(f"iptables -A INPUT -p tcp --sport {self._port} -j DROP")
cmd.append(f"iptables -A OUTPUT -p tcp --dport {self._port} -j DROP")
cmd = " && ".join(cmd)
for node in self._nodes:
node.account.ssh_output(cmd, allow_fail=False)

def __exit__(self, type, value, traceback):
"""Remove firewall rules that isolate ips from the nodes"""
cmd = []
cmd.append(f"iptables -D INPUT -p tcp --sport {self._port} -j DROP")
cmd.append(f"iptables -D OUTPUT -p tcp --dport {self._port} -j DROP")
cmd = " && ".join(cmd)
for node in self._nodes:
node.account.ssh_output(cmd, allow_fail=False)

0 comments on commit aaa9bec

Please sign in to comment.