diff --git a/pkg/common/config/config.go b/pkg/common/config/config.go index 95f4a864e2..50cca34603 100644 --- a/pkg/common/config/config.go +++ b/pkg/common/config/config.go @@ -85,10 +85,12 @@ type configStruct struct { } `yaml:"redis"` Kafka struct { - Username string `yaml:"username"` - Password string `yaml:"password"` - Addr []string `yaml:"addr"` - TLS *struct { + Username string `yaml:"username"` + Password string `yaml:"password"` + ProducerAck string `yaml:"producerAck"` + CompressType string `yaml:"compressType"` + Addr []string `yaml:"addr"` + TLS *struct { CACrt string `yaml:"caCrt"` ClientCrt string `yaml:"clientCrt"` ClientKey string `yaml:"clientKey"` diff --git a/pkg/common/kafka/producer.go b/pkg/common/kafka/producer.go index 4a52d2befc..227c986573 100644 --- a/pkg/common/kafka/producer.go +++ b/pkg/common/kafka/producer.go @@ -15,8 +15,10 @@ package kafka import ( + "bytes" "context" "errors" + "strings" "time" "github.com/OpenIMSDK/protocol/constant" @@ -51,8 +53,23 @@ func NewKafkaProducer(addr []string, topic string) *Producer { p.config = sarama.NewConfig() // Instantiate a sarama Config p.config.Producer.Return.Successes = true // Whether to enable the successes channel to be notified after the message is sent successfully p.config.Producer.Return.Errors = true - p.config.Producer.RequiredAcks = sarama.WaitForAll // Set producer Message Reply level 0 1 all p.config.Producer.Partitioner = sarama.NewHashPartitioner // Set the hash-key automatic hash partition. When sending a message, you must specify the key value of the message. If there is no key, the partition will be selected randomly + + var producerAck = sarama.WaitForAll // default: WaitForAll + switch strings.ToLower(config.Config.Kafka.ProducerAck) { + case "no_response": + producerAck = sarama.NoResponse + case "wait_for_local": + producerAck = sarama.WaitForLocal + case "wait_for_all": + producerAck = sarama.WaitForAll + } + p.config.Producer.RequiredAcks = producerAck + + var compress = sarama.CompressionNone // default: no compress + _ = compress.UnmarshalText(bytes.ToLower([]byte(config.Config.Kafka.CompressType))) + p.config.Producer.Compression = compress + if config.Config.Kafka.Username != "" && config.Config.Kafka.Password != "" { p.config.Net.SASL.Enable = true p.config.Net.SASL.User = config.Config.Kafka.Username