From 5e4bba6f8a559d76888db17ab11d2a7badd987cc Mon Sep 17 00:00:00 2001 From: Elena Anyusheva Date: Thu, 28 Apr 2022 19:56:34 +0300 Subject: [PATCH 1/2] ducky: move firewall_blocked to util firewall_blocked will be used also for shadow indexing test --- tests/rptest/tests/archival_test.py | 27 +-------------------------- tests/rptest/util.py | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/tests/rptest/tests/archival_test.py b/tests/rptest/tests/archival_test.py index 75be0c5eeaa5..ac73c2d9dcd7 100644 --- a/tests/rptest/tests/archival_test.py +++ b/tests/rptest/tests/archival_test.py @@ -18,6 +18,7 @@ segments_count, produce_until_segments, wait_for_segments_removal, + firewall_blocked, ) from ducktape.mark import matrix @@ -78,32 +79,6 @@ def validate(fn, logger, timeout_sec, backoff_sec=5): ]) -class firewall_blocked: - """Temporary firewall barrier that isolates set of redpanda - nodes from the ip-address""" - def __init__(self, nodes, blocked_port): - self._nodes = nodes - self._port = blocked_port - - def __enter__(self): - """Isolate certain ips from the nodes using firewall rules""" - cmd = [] - cmd.append(f"iptables -A INPUT -p tcp --sport {self._port} -j DROP") - cmd.append(f"iptables -A OUTPUT -p tcp --dport {self._port} -j DROP") - cmd = " && ".join(cmd) - for node in self._nodes: - node.account.ssh_output(cmd, allow_fail=False) - - def __exit__(self, type, value, traceback): - """Remove firewall rules that isolate ips from the nodes""" - cmd = [] - cmd.append(f"iptables -D INPUT -p tcp --sport {self._port} -j DROP") - cmd.append(f"iptables -D OUTPUT -p tcp --dport {self._port} -j DROP") - cmd = " && ".join(cmd) - for node in self._nodes: - node.account.ssh_output(cmd, allow_fail=False) - - def _parse_normalized_segment_path(path, md5, segment_size): """Parse path like 'kafka/panda-topic/1_8/3319-1-v1.log' and return the components - topic: panda-topic, ns: kafka, partition: 1 diff --git a/tests/rptest/util.py b/tests/rptest/util.py index 9cc7cb3485fd..e7b22ba055ba 100644 --- a/tests/rptest/util.py +++ b/tests/rptest/util.py @@ -187,3 +187,29 @@ def inject_remote_script(node, script_name): node.account.copy_to(script_path, remote_path) return remote_path + + +class firewall_blocked: + """Temporary firewall barrier that isolates set of redpanda + nodes from the ip-address""" + def __init__(self, nodes, blocked_port): + self._nodes = nodes + self._port = blocked_port + + def __enter__(self): + """Isolate certain ips from the nodes using firewall rules""" + cmd = [] + cmd.append(f"iptables -A INPUT -p tcp --sport {self._port} -j DROP") + cmd.append(f"iptables -A OUTPUT -p tcp --dport {self._port} -j DROP") + cmd = " && ".join(cmd) + for node in self._nodes: + node.account.ssh_output(cmd, allow_fail=False) + + def __exit__(self, type, value, traceback): + """Remove firewall rules that isolate ips from the nodes""" + cmd = [] + cmd.append(f"iptables -D INPUT -p tcp --sport {self._port} -j DROP") + cmd.append(f"iptables -D OUTPUT -p tcp --dport {self._port} -j DROP") + cmd = " && ".join(cmd) + for node in self._nodes: + node.account.ssh_output(cmd, allow_fail=False) From b9ccdaa8c64d38d3f5edebffed6473ba39569914 Mon Sep 17 00:00:00 2001 From: Elena Anyusheva Date: Thu, 28 Apr 2022 19:57:45 +0300 Subject: [PATCH 2/2] ducky: add si test consuming from blocked s3 When trying to consume from blocked S3, time out error should be returned --- .../tests/shadow_indexing_firewall_test.py | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 tests/rptest/tests/shadow_indexing_firewall_test.py diff --git a/tests/rptest/tests/shadow_indexing_firewall_test.py b/tests/rptest/tests/shadow_indexing_firewall_test.py new file mode 100644 index 000000000000..2c75143401dd --- /dev/null +++ b/tests/rptest/tests/shadow_indexing_firewall_test.py @@ -0,0 +1,77 @@ +# Copyright 2021 Redpanda Data, Inc. +# +# Licensed as a Redpanda Enterprise file under the Redpanda Community +# License (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md + +from rptest.services.cluster import cluster +from rptest.tests.redpanda_test import RedpandaTest +from rptest.services.redpanda import SISettings + +from rptest.clients.types import TopicSpec +from rptest.clients.rpk import RpkTool, RpkException +from rptest.util import ( + produce_until_segments, + wait_for_segments_removal, + firewall_blocked, +) + +# Log errors expected when connectivity between redpanda and the S3 +# backend is disrupted +CONNECTION_ERROR_LOGS = [ + "archival - .*Failed to create archivers", + + # e.g. archival - [fiber1] - service.cc:484 - Failed to upload 3 segments out of 4 + r"archival - .*Failed to upload \d+ segments" +] + + +class ShadowIndexingFirewallTest(RedpandaTest): + log_segment_size = 1048576 # 1MB + retention_bytes = 1024 # 1 KB + + s3_topic_name = "panda-topic" + topics = (TopicSpec(name=s3_topic_name, + partition_count=1, + replication_factor=3), ) + + def __init__(self, test_context): + si_settings = SISettings(cloud_storage_reconciliation_interval_ms=500, + cloud_storage_max_connections=5, + log_segment_size=self.log_segment_size) + + super(ShadowIndexingFirewallTest, + self).__init__(test_context=test_context, + si_settings=si_settings) + + self._s3_port = si_settings.cloud_storage_api_endpoint_port + self.rpk = RpkTool(self.redpanda) + + @cluster(num_nodes=3, log_allow_list=CONNECTION_ERROR_LOGS) + def test_consume_from_blocked_s3(self): + produce_until_segments(redpanda=self.redpanda, + topic=self.s3_topic_name, + partition_idx=0, + count=5, + acks=-1) + + self.rpk.alter_topic_config(self.s3_topic_name, + TopicSpec.PROPERTY_RETENTION_BYTES, + self.retention_bytes) + + wait_for_segments_removal(redpanda=self.redpanda, + topic=self.s3_topic_name, + partition_idx=0, + count=4) + """Disconnect redpanda from S3 and try to read starting with offset 0""" + with firewall_blocked(self.redpanda.nodes, self._s3_port): + try: + out = self.rpk.consume(topic=self.s3_topic_name) + except RpkException as e: + assert 'timed out' in e.msg + else: + raise RuntimeError( + f"RPK consume should have timed out, but ran with output: {out}" + )