Skip to content

Commit

Permalink
Merge pull request #6044 from vbotbuildovich/backport-5389-v22.2.x-580
Browse files Browse the repository at this point in the history
[v22.2.x] tests: update ManyPartitionsTest
  • Loading branch information
jcsp committed Aug 17, 2022
2 parents ba6fbc0 + 0f9365b commit a818ef1
Show file tree
Hide file tree
Showing 12 changed files with 1,191 additions and 293 deletions.
2 changes: 1 addition & 1 deletion src/v/cluster/scheduling/leader_balancer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ ss::future<bool> leader_balancer::do_transfer_remote(reassignment transfer) {
clusterlog.info,
"Leadership transfer of group {} failed with error: {}",
transfer.group,
raft::make_error_code(res.value().result));
raft::make_error_code(res.value().result).message());

co_return false;
}
Expand Down
54 changes: 29 additions & 25 deletions tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ RUN apt update && \
echo 'root soft nofile 65535' >> /etc/security/limits.conf && \
echo 'root hard nofile 65535' >> /etc/security/limits.conf

# install go
RUN mkdir -p /usr/local/go/ && \
bash -c 'if [[ $(uname -m) = "aarch64" ]]; then ARCHID="arm64"; else export ARCHID="amd64"; fi && \
curl -sSLf --retry 3 --retry-connrefused --retry-delay 2 https://golang.org/dl/go1.17.linux-${ARCHID}.tar.gz | tar -xz -C /usr/local/go/ --strip 1'
ENV PATH="${PATH}:/usr/local/go/bin"

# Install the OMB tool
RUN git -C /opt clone https://github.com/redpanda-data/openmessaging-benchmark.git && \
cd /opt/openmessaging-benchmark && git reset --hard 43b737c357cde3b418a6aee4c95107d6ef28b8a2 && mvn package

# install kafka binary dependencies, librdkafka dev, kcat and kaf tools
ENV KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
Expand All @@ -95,8 +94,6 @@ RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFK
make install && \
cd /opt/librdkafka/tests && \
make build -j$(nproc) && \
go get github.com/birdayz/kaf/cmd/kaf && \
mv /root/go/bin/kaf /usr/local/bin/ && \
mkdir /tmp/kcat && \
curl -SL "https://github.com/edenhill/kcat/archive/1.7.0.tar.gz" | tar -xz --strip-components=1 -C /tmp/kcat && \
cd /tmp/kcat && \
Expand All @@ -105,6 +102,29 @@ RUN mkdir -p "/opt/kafka-2.3.1" && chmod a+rw /opt/kafka-2.3.1 && curl -s "$KAFK
make install && \
ldconfig

# install go
RUN mkdir -p /usr/local/go/ && \
bash -c 'if [[ $(uname -m) = "aarch64" ]]; then ARCHID="arm64"; else export ARCHID="amd64"; fi && \
curl -sSLf --retry 3 --retry-connrefused --retry-delay 2 https://golang.org/dl/go1.17.linux-${ARCHID}.tar.gz | tar -xz -C /usr/local/go/ --strip 1'
ENV PATH="${PATH}:/usr/local/go/bin"

# Install `kaf`
RUN go get github.com/birdayz/kaf/cmd/kaf && \
mv /root/go/bin/kaf /usr/local/bin/

ENV PATH="${PATH}:/usr/local/go/bin"
# Compile and install rust-based workload generator.
# Install & remove compiler in the same step, to avoid bulking
# out the resulting container image.
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y && \
export PATH="/root/.cargo/bin:${PATH}" && \
git clone https://github.com/redpanda-data/client-swarm.git && \
cd client-swarm && \
git reset --hard 28790f8 && \
cargo build --release && \
cp target/release/client-swarm /usr/local/bin && \
cd .. && rm -rf client-swarm && rm -rf /root/.cargo

# Install golang dependencies for kafka clients such as sarama
RUN git -C /opt clone -b v1.32.0 --single-branch https://github.com/Shopify/sarama.git && \
cd /opt/sarama/examples/interceptors && go mod tidy && go build && \
Expand All @@ -126,8 +146,8 @@ RUN go install github.com/twmb/kcl@v0.8.0 && \

# Install the kgo-verifier tool
RUN git -C /opt clone https://github.com/redpanda-data/kgo-verifier.git && \
cd /opt/kgo-verifier && git reset --hard dd7dce41012af14e62c1db23a0aa88ec6f39a5f1 && \
go mod tidy && go build
cd /opt/kgo-verifier && git reset --hard dbb32effd519334929976ba5bcd725c735d81550 && \
go mod tidy && make

# Expose port 8080 for any http examples within clients
EXPOSE 8080
Expand All @@ -142,22 +162,6 @@ COPY --chown=0:0 tests/setup.py /root/tests/
RUN python3 -m pip install --upgrade --force pip && \
python3 -m pip install --force --no-cache-dir -e /root/tests/

# Install the OMB tool
RUN git -C /opt clone https://github.com/redpanda-data/openmessaging-benchmark.git && \
cd /opt/openmessaging-benchmark && git reset --hard 43b737c357cde3b418a6aee4c95107d6ef28b8a2 && mvn package

# Compile and install rust-based workload generator.
# Install & remove compiler in the same step, to avoid bulking
# out the resulting container image.
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y && \
export PATH="/root/.cargo/bin:${PATH}" && \
git clone https://github.com/redpanda-data/client-swarm.git && \
cd client-swarm && \
git reset --hard 28790f8 && \
cargo build --release && \
cp target/release/client-swarm /usr/local/bin && \
cd .. && rm -rf client-swarm && rm -rf /root/.cargo

# Seastar addrress to line utility
RUN apt update && \
apt install -y \
Expand Down
69 changes: 57 additions & 12 deletions tests/rptest/clients/rpk.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,26 +226,54 @@ def produce(self,
assert m, f"Reported offset not found in: {out}"
return int(m.group(1))

def describe_topic(self, topic):
def describe_topic(self, topic: str, tolerant: bool = False):
"""
By default this will omit any partitions which do not have full
metadata in the response: this means that if we are unlucky and a
partition returns NOT_LEADER due to a leadership transfer while
we query offsets, it will be missing. To be more forgiving, pass
tolerant=true
:param topic: topic name
:param tolerant: if true, RpkPartition results may be included with some
fields set to None, as long as the leader field is present.
:return:
"""
cmd = ['describe', topic, '-p']
output = self._run_topic(cmd)
if "not found" in output:
raise Exception(f"Topic not found: {topic}")
lines = output.splitlines()
lines = output.splitlines()[1:]

def partition_line(line):
m = re.match(
r" *(?P<id>\d+) +(?P<leader>\d+) +(?P<epoch>\d+) +\[(?P<replicas>.+?)\] +(?P<logstart>\d+?) +(?P<hw>\d+) *",
line)
if m == None:
if m is None and tolerant:
m = re.match(r" *(?P<id>\d+) +(?P<leader>\d+) .*", line)
if m is None:
self._redpanda.logger.info(f"No match on '{line}'")
return None

return RpkPartition(id=int(m.group('id')),
leader=int(m.group('leader')),
leader_epoch=None,
replicas=None,
hw=None,
start_offset=None)

elif m is None:
return None
replicas = list(map(lambda r: int(r), m.group('replicas').split()))
return RpkPartition(id=int(m.group('id')),
leader=int(m.group('leader')),
leader_epoch=int(m.group('epoch')),
replicas=replicas,
hw=int(m.group('hw')),
start_offset=int(m.group("logstart")))
elif m:
replicas = list(
map(lambda r: int(r),
m.group('replicas').split()))
return RpkPartition(id=int(m.group('id')),
leader=int(m.group('leader')),
leader_epoch=int(m.group('epoch')),
replicas=replicas,
hw=int(m.group('hw')),
start_offset=int(m.group("logstart")))

return filter(None, map(partition_line, lines))

Expand Down Expand Up @@ -311,7 +339,7 @@ def group_seek_to(self, group, to):
cmd = ["seek", group, "--to", to]
self._run_group(cmd)

def group_describe(self, group):
def group_describe(self, group, summary=False):
def parse_field(field_name, string):
pattern = re.compile(f" *{field_name} +(?P<value>.+)")
m = pattern.match(string)
Expand All @@ -333,6 +361,15 @@ def check_lines(lines):
# We should wait until server will get information about it.
if line.find('UNKNOWN_TOPIC_OR_PARTITION') != -1:
return False

# Leadership movements are underway
if 'NOT_LEADER_FOR_PARTITION' in line:
return False

# Cluster not ready yet
if 'unknown broker' in line:
return False

return True

def parse_partition(string):
Expand Down Expand Up @@ -367,13 +404,21 @@ def parse_partition(string):
host=m['host'])

def try_describe_group(group):
cmd = ["describe", group]
if summary:
cmd = ["describe", "-s", group]
else:
cmd = ["describe", group]

try:
out = self._run_group(cmd)
except RpkException as e:
if "COORDINATOR_NOT_AVAILABLE" in e.msg:
# Transient, return None to retry
return None
elif "Kafka replied that group" in e.msg:
# Transient, return None to retry
# e.g. Kafka replied that group repeat01 has broker coordinator 8, but did not reply with that broker in the broker list
return None
else:
raise

Expand Down
Loading

0 comments on commit a818ef1

Please sign in to comment.