Skip to content

Commit

Permalink
Merge pull request #5484 from LenaAn/rr_read_only
Browse files Browse the repository at this point in the history
kafka: reject produce requests to read replica topics
  • Loading branch information
Lena Anyusheva committed Jul 21, 2022
2 parents c375111 + 58dfc65 commit 333dc52
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 22 deletions.
56 changes: 36 additions & 20 deletions src/v/kafka/server/handlers/produce.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,23 @@
#include "model/timestamp.h"
#include "raft/errc.h"
#include "raft/types.h"
#include "ssx/future-util.h"
#include "utils/remote.h"
#include "utils/to_string.h"
#include "vlog.h"

#include <seastar/core/execution_stage.hh>
#include <seastar/core/future.hh>
#include <seastar/core/shared_ptr.hh>
#include <seastar/core/smp.hh>
#include <seastar/util/log.hh>

#include <boost/container_hash/extensions.hpp>
#include <fmt/ostream.h>

#include <chrono>
#include <cstdint>
#include <memory>
#include <string_view>

namespace kafka {
Expand Down Expand Up @@ -207,6 +210,22 @@ static partition_produce_stages partition_append(
};
}

ss::future<produce_response::partition> finalize_request_with_error_code(
error_code ec,
std::unique_ptr<ss::promise<>> dispatch,
model::ntp ntp,
ss::shard_id source_shard) {
// submit back to promise source shard
ssx::background = ss::smp::submit_to(
source_shard, [dispatch = std::move(dispatch)]() mutable {
dispatch->set_value();
dispatch.reset();
});
return ss::make_ready_future<produce_response::partition>(
produce_response::partition{
.partition_index = ntp.tp.partition, .error_code = ec});
}

/**
* \brief handle writing to a single topic partition.
*/
Expand Down Expand Up @@ -274,28 +293,25 @@ static partition_produce_stages produce_topic_partition(
cluster::partition_manager& mgr) mutable {
auto partition = mgr.get(ntp);
if (!partition) {
// submit back to promise source shard
(void)ss::smp::submit_to(
source_shard, [dispatch = std::move(dispatch)]() mutable {
dispatch->set_value();
dispatch.reset();
});
return ss::make_ready_future<produce_response::partition>(
produce_response::partition{
.partition_index = ntp.tp.partition,
.error_code = error_code::unknown_topic_or_partition});
return finalize_request_with_error_code(
error_code::unknown_topic_or_partition,
std::move(dispatch),
ntp,
source_shard);
}
if (unlikely(!partition->is_leader())) {
// submit back to promise source shard
(void)ss::smp::submit_to(
source_shard, [dispatch = std::move(dispatch)]() mutable {
dispatch->set_value();
dispatch.reset();
});
return ss::make_ready_future<produce_response::partition>(
produce_response::partition{
.partition_index = ntp.tp.partition,
.error_code = error_code::not_leader_for_partition});
return finalize_request_with_error_code(
error_code::not_leader_for_partition,
std::move(dispatch),
ntp,
source_shard);
}
if (partition->is_read_replica_mode_enabled()) {
return finalize_request_with_error_code(
error_code::invalid_topic_exception,
std::move(dispatch),
ntp,
source_shard);
}
auto stages = partition_append(
ntp.tp.partition,
Expand Down
53 changes: 51 additions & 2 deletions tests/rptest/tests/read_replica_e2e_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

from rptest.clients.default import DefaultClient
from rptest.services.redpanda import SISettings
from rptest.clients.rpk import RpkTool
from rptest.clients.rpk import RpkTool, RpkException
from rptest.clients.types import TopicSpec
from rptest.util import expect_exception
from ducktape.mark import matrix

import json
Expand Down Expand Up @@ -40,12 +41,13 @@ def __init__(self, test_context):
super(TestReadReplicaService, self).__init__(test_context=test_context)
self.second_cluster = None

def create_read_replica_topic(self):
def start_second_cluster(self):
self.second_cluster = RedpandaService(self.test_context,
num_brokers=3,
si_settings=self.si_settings)
self.second_cluster.start(start_si=False)

def create_read_replica_topic(self):
rpk_second_cluster = RpkTool(self.second_cluster)
conf = {
'redpanda.remote.readreplica': self.s3_bucket_name,
Expand All @@ -72,6 +74,52 @@ def start_producer(self):
message_validator=is_int_with_prefix)
self.producer.start()

@cluster(num_nodes=6)
@matrix(partition_count=[10])
def test_produce_is_forbidden(self, partition_count):
# Create original topic
self.start_redpanda(3, si_settings=self.si_settings)
spec = TopicSpec(name=self.topic_name,
partition_count=partition_count,
replication_factor=3)

DefaultClient(self.redpanda).create_topic(spec)

# Make original topic upload data to S3
rpk = RpkTool(self.redpanda)
rpk.alter_topic_config(spec.name, 'redpanda.remote.write', 'true')

self.start_second_cluster()

def create_read_replica_topic_success():
try:
self.create_read_replica_topic()
except RpkException as e:
if "The server experienced an unexpected error when processing the request" in str(
e):
return False
else:
raise
else:
return True

# wait until the read replica topic creation succeeds
wait_until(
create_read_replica_topic_success,
timeout_sec=
60, #should be uploaded since cloud_storage_segment_max_upload_interval_sec=5
backoff_sec=5,
err_msg=
f"Could not create read replica topic. Most likely because topic manifest is not in S3, in S3 bucket: {list(self.redpanda._s3client.list_objects(self.s3_bucket_name))}"
)

second_rpk = RpkTool(self.second_cluster)
with expect_exception(
RpkException, lambda e:
"unable to produce record: INVALID_TOPIC_EXCEPTION: The request attempted to perform an operation on an invalid topic."
in str(e)):
second_rpk.produce(self.topic_name, "", "test payload")

@cluster(num_nodes=8)
@matrix(partition_count=[10], min_records=[10000])
def test_simple_end_to_end(self, partition_count, min_records):
Expand Down Expand Up @@ -127,6 +175,7 @@ def s3_has_all_data():
)

# Create read replica topic, consume from it and validate
self.start_second_cluster()
self.create_read_replica_topic()
self.start_consumer()
self.run_validation()

0 comments on commit 333dc52

Please sign in to comment.