Skip to content

Commit

Permalink
fix(kafka): Add redpanda testcontainer module (#441)
Browse files Browse the repository at this point in the history
Co-authored-by: Gudjon Ragnar Brynjarsson <gudjon.brynjarsson@controlant.com>
Co-authored-by: Dave Ankin <daveankin@gmail.com>
  • Loading branch information
3 people committed Mar 31, 2024
1 parent 302c73d commit 451d278
Show file tree
Hide file tree
Showing 6 changed files with 154 additions and 7 deletions.
1 change: 1 addition & 0 deletions modules/kafka/README.rst
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.. autoclass:: testcontainers.kafka.KafkaContainer
.. title:: testcontainers.kafka.KafkaContainer
.. autoclass:: testcontainers.kafka.RedpandaContainer
6 changes: 6 additions & 0 deletions modules/kafka/testcontainers/kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
from testcontainers.core.container import DockerContainer
from testcontainers.core.utils import raise_for_deprecated_parameter
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.kafka._redpanda import RedpandaContainer

__all__ = [
"KafkaContainer",
"RedpandaContainer",
]


class KafkaContainer(DockerContainer):
Expand Down
82 changes: 82 additions & 0 deletions modules/kafka/testcontainers/kafka/_redpanda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import tarfile
import time
from io import BytesIO
from textwrap import dedent

from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs


class RedpandaContainer(DockerContainer):
"""
Redpanda container.
Example:
.. doctest::
>>> from testcontainers.kafka import RedpandaContainer
>>> with RedpandaContainer() as redpanda:
... connection = redpanda.get_bootstrap_server()
"""

TC_START_SCRIPT = "/tc-start.sh"

def __init__(
self,
image: str = "docker.redpanda.com/redpandadata/redpanda:v23.1.13",
**kwargs,
) -> None:
kwargs["entrypoint"] = "sh"
super().__init__(image, **kwargs)
self.redpanda_port = 9092
self.schema_registry_port = 8081
self.with_exposed_ports(self.redpanda_port, self.schema_registry_port)

def get_bootstrap_server(self) -> str:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.redpanda_port)
return f"{host}:{port}"

def get_schema_registry_address(self) -> str:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.schema_registry_port)
return f"http://{host}:{port}"

def tc_start(self) -> None:
host = self.get_container_host_ip()
port = self.get_exposed_port(self.redpanda_port)

data = (
dedent(
f"""
#!/bin/bash
/usr/bin/rpk redpanda start --mode dev-container --smp 1 --memory 1G \
--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 \
--advertise-kafka-addr PLAINTEXT://127.0.0.1:29092,OUTSIDE://{host}:{port}
"""
)
.strip()
.encode("utf-8")
)

self.create_file(data, RedpandaContainer.TC_START_SCRIPT)

def start(self, timeout=10) -> "RedpandaContainer":
script = RedpandaContainer.TC_START_SCRIPT
command = f'-c "while [ ! -f {script} ]; do sleep 0.1; done; sh {script}"'
self.with_command(command)
super().start()
self.tc_start()
wait_for_logs(self, r".*Started Kafka API server.*", timeout=timeout)
return self

def create_file(self, content: bytes, path: str) -> None:
with BytesIO() as archive, tarfile.TarFile(fileobj=archive, mode="w") as tar:
tarinfo = tarfile.TarInfo(name=path)
tarinfo.size = len(content)
tarinfo.mtime = time.time()
tar.addfile(tarinfo, BytesIO(content))
archive.seek(0)
self.get_wrapped_container().put_archive("/", archive)
54 changes: 54 additions & 0 deletions modules/kafka/tests/test_redpanda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import pytest
from requests import post, get
from json import dumps

from kafka import KafkaConsumer, KafkaProducer, TopicPartition, KafkaAdminClient
from kafka.admin import NewTopic

from testcontainers.kafka import RedpandaContainer


def test_redpanda_producer_consumer():
with RedpandaContainer() as container:
produce_and_consume_message(container)


@pytest.mark.parametrize("version", ["v23.1.13", "v23.3.10"])
def test_redpanda_confluent_version(version):
with RedpandaContainer(image=f"docker.redpanda.com/redpandadata/redpanda:{version}") as container:
produce_and_consume_message(container)


def test_schema_registry():
with RedpandaContainer() as container:
address = container.get_schema_registry_address()
subject_name = "test-subject-value"
url = f"{address}/subjects"

payload = {"schema": dumps({"type": "string"})}
headers = {"Content-Type": "application/vnd.schemaregistry.v1+json"}
create_result = post(f"{url}/{subject_name}/versions", data=dumps(payload), headers=headers)
assert create_result.status_code == 200

result = get(url)
assert result.status_code == 200
assert subject_name in result.json()


def produce_and_consume_message(container):
topic = "test-topic"
bootstrap_server = container.get_bootstrap_server()

admin = KafkaAdminClient(bootstrap_servers=[bootstrap_server])
admin.create_topics([NewTopic(topic, 1, 1)])

producer = KafkaProducer(bootstrap_servers=[bootstrap_server])
future = producer.send(topic, b"verification message")
future.get(timeout=10)
producer.close()

consumer = KafkaConsumer(bootstrap_servers=[bootstrap_server])
tp = TopicPartition(topic, 0)
consumer.assign([tp])
consumer.seek_to_beginning()
assert consumer.end_offsets([tp])[tp] == 1, "Expected exactly one test message to be present on test topic !"
16 changes: 10 additions & 6 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ psycopg2-binary = "*"
pg8000 = "*"
sqlalchemy = "*"
psycopg = "*"
kafka-python = "^2.0.2"
cassandra-driver = "*"
pytest-asyncio = "0.23.5"
kafka-python-ng = "^2.2.0"

[[tool.poetry.source]]
name = "PyPI"
Expand Down

0 comments on commit 451d278

Please sign in to comment.