Skip to content

Commit

Permalink
Merge pull request #9816 from michael-redpanda/expand-serde-coverage
Browse files Browse the repository at this point in the history
Expand serde coverage
  • Loading branch information
michael-redpanda committed Apr 10, 2023
2 parents 93e239a + 8c3e3ec commit 1ceb333
Show file tree
Hide file tree
Showing 24 changed files with 2,002 additions and 41 deletions.
5 changes: 5 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,8 @@ AlwaysBreakAfterDefinitionReturnType: None
AlwaysBreakAfterReturnType: None
BreakBeforeBinaryOperators: All
BreakBeforeBraces: Attach
---
Language: Proto
BasedOnStyle: Google
ColumnLimit: 80
IndentWidth: 4
2 changes: 1 addition & 1 deletion tests/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ RUN /opt/ducktape-deps.sh install_kafka_streams_examples
# Install the OMB tool
RUN /opt/ducktape-deps.sh install_omb


# Install Kafka Java test clients / workloads
RUN mkdir -p /opt/redpanda-tests/java
COPY --chown=0:0 tests/java/e2e-verifiers /opt/redpanda-tests/java/e2e-verifiers
COPY --chown=0:0 tests/java/verifiers /opt/redpanda-tests/java/verifiers
COPY --chown=0:0 tests/java/kafka-serde /opt/redpanda-tests/java/kafka-serde
RUN /opt/ducktape-deps.sh install_java_test_clients

# - install distro-packaged depedencies
Expand Down
21 changes: 18 additions & 3 deletions tests/docker/ducktape-deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ function install_java_client_deps() {
build-essential \
default-jdk \
git \
maven
maven \
cmake \
curl

install_protobuf_compiler
}

function install_system_deps() {
Expand All @@ -17,9 +21,7 @@ function install_system_deps() {
bind9-utils \
bind9-dnsutils \
bsdmainutils \
curl \
dmidecode \
cmake \
krb5-admin-server \
krb5-kdc \
krb5-user \
Expand Down Expand Up @@ -171,12 +173,25 @@ function install_arroyo() {
function install_java_test_clients() {
mvn clean package --batch-mode --file /opt/redpanda-tests/java/e2e-verifiers --define buildDir=/opt/e2e-verifiers
mvn clean package --batch-mode --file /opt/redpanda-tests/java/verifiers --define buildDir=/opt/verifiers
mvn clean package --batch-mode --file /opt/redpanda-tests/java/kafka-serde --define buildDir=/opt/kafka-serde
}

function install_go_test_clients() {
cd /opt/redpanda-tests/go/sarama/produce_test
go mod tidy
go build

cd /opt/redpanda-tests/go/go-kafka-serde
GOPATH=${HOME}/go make clean all
}

function install_protobuf_compiler() {
mkdir /tmp/protobuf
curl -SL "https://vectorized-public.s3.amazonaws.com/dependencies/protobuf-cpp-3.21.8.tar.gz" | tar --no-same-owner -xz --strip-components=1 -C /tmp/protobuf
cd /tmp/protobuf
cmake -Dprotobuf_BUILD_TESTS=OFF -DCMAKE_INSTALL_PREFIX=/usr
make -j$(nproc)
make install
}

$@
19 changes: 19 additions & 0 deletions tests/go/go-kafka-serde/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export PATH := $(PATH):$(GOPATH)/bin

all: payload.pb.go go-kafka-serde

clean:
rm -rf payload.pb.go go-kafka-serde

go-kafka-serde: payload.pb.go
go build

PROTOC_GEN_GO := $(GOPATH)/bin/protoc-gen-go

$(PROTOC_GEN_GO):
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.30.0

payload.pb.go: payload.proto | $(PROTOC_GEN_GO)
protoc --proto_path=. --go_out=. --go_opt=Mpayload.proto=. payload.proto

compile: payload.pb.go
44 changes: 44 additions & 0 deletions tests/go/go-kafka-serde/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Golang Kafka Serde Client

This directory contains a client that can be used to validate the operation of Redpanda's Schema Registry via
Confluent's Golang client.

This application will create either a simple AVRO or PROTOBUF schema, serialize a number of messages

## Building

### Pre-requisites

* Protobuf Compiler
* Golang compiler

### Build Instructions

It is important to set `$GOPATH` either within your environment or on the command line when you run `make`:

```shell
GOPATH=$HOME/go make clean all
```

## Usage

```shell
❯ ./go-kafka-serde -help
Usage of ./go-kafka-serde:
-brokers string
comma delimited list of brokers (default "localhost:9092")
-consumer-group string
Consumer group to use (default "db5b01db-7f48-4d7d-8ba6-eb512788341e")
-count int
Number of messages to produce and consume (default 1)
-debug
Enable verbose logging
-protocol string
Protocol to use. Must be AVRO or PROTOBUF (default "AVRO")
-schema-registry string
URL of schema registry (default "http://127.0.0.1:8081")
-security string
Security settings
-topic string
topic to produce/consume from (default "3aa75595-1222-4daa-b407-afe43c8496d5")
```
76 changes: 76 additions & 0 deletions tests/go/go-kafka-serde/avro-serializer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2023 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package main

import (
log "github.com/sirupsen/logrus"

"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde/avro"
)

type AvroSerializer struct {
ser *avro.GenericSerializer
deser *avro.GenericDeserializer
}

type payload struct {
Value int `json:"val"`
}

func NewAvroSerializer(srClient *schemaregistry.Client) (s *AvroSerializer, err error) {
ser, err := avro.NewGenericSerializer(*srClient, serde.ValueSerde, avro.NewSerializerConfig())

if err != nil {
return nil, err
}

deser, err := avro.NewGenericDeserializer(*srClient, serde.ValueSerde, avro.NewDeserializerConfig())

if err != nil {
return nil, err
}

s = new(AvroSerializer)

s.ser = ser
s.deser = deser

return s, nil
}

func (s *AvroSerializer) CreateSerializedData(msgNum int, topic *string) ([]byte, error) {
log.Debugf("Creating message with value %v", msgNum)
val := payload{
Value: msgNum,
}

return s.ser.Serialize(*topic, &val)
}

func (s *AvroSerializer) DeserializeAndCheck(msgNum int, topic *string, buf []byte) (valid bool, err error) {
val := payload{}
err = s.deser.DeserializeInto(*topic, buf, &val)

if err != nil {
return false, err
}

valid = (msgNum == val.Value)

if !valid {
log.Errorf("Mismatch: %v != %v", msgNum, val.Value)
}

return valid, nil
}
Loading

0 comments on commit 1ceb333

Please sign in to comment.