Skip to content

Commit

Permalink
fix(test): use Sarama transactional producer
Browse files Browse the repository at this point in the history
One of the pieces of #1695 was to use a small jar for publishing
uncommited messages as part of the functional test. Replacing that with
the native Sarama transactional producer

Fixes #1733

Co-authored-by: KJTsanaktsidis <ktsanaktsidis@zendesk.com>
  • Loading branch information
dnwe and KJTsanaktsidis committed May 7, 2021
1 parent 83d633e commit 621d753
Showing 1 changed file with 64 additions and 32 deletions.
96 changes: 64 additions & 32 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ package sarama
import (
"context"
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
Expand All @@ -21,10 +18,6 @@ import (
toxiproxy "github.com/Shopify/toxiproxy/client"
)

const (
uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar"
)

var (
testTopicDetails = map[string]*TopicDetail{
"test.1": {
Expand Down Expand Up @@ -340,40 +333,79 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
}
}

// This is kind of gross, but we don't actually have support for doing transactional publishing
// with sarama, so we need to use a java-based tool to publish uncomitted messages to
// the uncommitted-topic-test-4 topic
jarName := filepath.Base(uncomittedMsgJar)
if _, err := os.Stat(jarName); err != nil {
Logger.Printf("Downloading %s\n", uncomittedMsgJar)
req, err := http.NewRequest("GET", uncomittedMsgJar, nil)
if err != nil {
return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err)
transactionalID := "transactional.id config"

var coordinator *Broker

// wait until transactional topic is available and then connect to the
// coordinator broker
for coordinator == nil {
if _, err := client.Leader("__transaction_state", 0); err != nil {
time.Sleep(2 * time.Second)
_ = client.RefreshMetadata("__transaction_state")
continue
}
res, err := http.DefaultClient.Do(req)
coordRes, err := controller.FindCoordinator(&FindCoordinatorRequest{
Version: 2,
CoordinatorKey: transactionalID,
CoordinatorType: CoordinatorTransaction,
})
if err != nil {
return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err)
return err
}
defer res.Body.Close()
jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644)
if err != nil {
return fmt.Errorf("failed opening the uncomitted msg jar: %w", err)
if coordRes.Err != ErrNoError {
continue
}
defer jarFile.Close()

_, err = io.Copy(jarFile, res.Body)
if err != nil {
return fmt.Errorf("failed writing the uncomitted msg jar: %w", err)
if err := coordRes.Coordinator.Open(client.Config()); err != nil {
return err
}
coordinator = coordRes.Coordinator
break
}

c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4")
c.Stdout = os.Stdout
c.Stderr = os.Stderr
err = c.Run()
pidRes, err := coordinator.InitProducerID(&InitProducerIDRequest{
TransactionalID: &transactionalID,
TransactionTimeout: 10 * time.Second,
})
if err != nil {
return err
}

_, _ = coordinator.AddPartitionsToTxn(&AddPartitionsToTxnRequest{
TransactionalID: transactionalID,
ProducerID: pidRes.ProducerID,
ProducerEpoch: pidRes.ProducerEpoch,
TopicPartitions: map[string][]int32{
"uncomitted-topic-test-4": {0},
},
})
if err != nil {
return fmt.Errorf("failed running uncomitted msg jar: %w", err)
return err
}

ps := &produceSet{
msgs: make(map[string]map[int32]*partitionSet),
parent: &asyncProducer{
conf: config,
},
producerID: pidRes.ProducerID,
producerEpoch: pidRes.ProducerEpoch,
}
_ = ps.add(&ProducerMessage{
Topic: "uncomitted-topic-test-4",
Partition: 0,
Value: StringEncoder("uncomimtted message"),
})
produceReq := ps.buildRequest()
produceReq.TransactionalID = &transactionalID
_, _ = coordinator.Produce(produceReq)
_, _ = coordinator.EndTxn(&EndTxnRequest{
TransactionalID: transactionalID,
ProducerID: pidRes.ProducerID,
ProducerEpoch: pidRes.ProducerEpoch,
TransactionResult: false,
})

return nil
}

Expand Down

0 comments on commit 621d753

Please sign in to comment.