Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: kafka-logger implemented reuse kafka producer #3429

Merged
merged 13 commits into from
Jan 31, 2021
1 change: 1 addition & 0 deletions .github/workflows/centos7-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ jobs:
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env spring.application.name=apisix-eureka --env server.port=8761 --env eureka.instance.ip-address=127.0.0.1 --env eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false --env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ bitinit/eureka
sleep 5
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 3 --topic test3
docker run --rm --name skywalking -d -p 1234:1234 -p 11800:11800 -p 12800:12800 apache/skywalking-oap-server

- name: install dependencies
Expand Down
1 change: 1 addition & 0 deletions .travis/linux_openresty_common_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ before_install() {
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env spring.application.name=apisix-eureka --env server.port=8761 --env eureka.instance.ip-address=127.0.0.1 --env eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false --env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ bitinit/eureka
sleep 5
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 3 --topic test3

# start skywalking
docker run --rm --name skywalking -d -p 1234:1234 -p 11800:11800 -p 12800:12800 apache/skywalking-oap-server
Expand Down
1 change: 1 addition & 0 deletions .travis/linux_tengine_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ before_install() {
docker run --name eureka -d -p 8761:8761 --env ENVIRONMENT=apisix --env spring.application.name=apisix-eureka --env server.port=8761 --env eureka.instance.ip-address=127.0.0.1 --env eureka.client.registerWithEureka=true --env eureka.client.fetchRegistry=false --env eureka.client.serviceUrl.defaultZone=http://127.0.0.1:8761/eureka/ bitinit/eureka
sleep 5
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
docker exec -i kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 3 --topic test3
}

tengine_install() {
Expand Down
70 changes: 49 additions & 21 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ local producer = require ("resty.kafka.producer")
local batch_processor = require("apisix.utils.batch-processor")
local pairs = pairs
local type = type
local table = table
local ipairs = ipairs
local plugin_name = "kafka-logger"
local stale_timer_running = false
local timer_at = ngx.timer.at
local ngx = ngx
local buffers = {}


local lrucache = core.lrucache.new({
type = "plugin",
})

local schema = {
type = "object",
properties = {
Expand Down Expand Up @@ -66,33 +70,28 @@ function _M.check_schema(conf)
end


local function send_kafka_data(conf, log_message)
if core.table.nkeys(conf.broker_list) == 0 then
core.log.error("failed to identify the broker specified")
local function partition_id(sendbuffer, topic, log_message)
if not sendbuffer.topics[topic] then
core.log.info("current topic in sendbuffer has no message")
return nil
end

local broker_list = {}
local broker_config = {}

for host, port in pairs(conf.broker_list) do
if type(host) == 'string'
and type(port) == 'number' then

local broker = {
host = host, port = port
}
table.insert(broker_list,broker)
for i, message in pairs(sendbuffer.topics[topic]) do
if log_message == message.queue[2] then
return i
end
end
end

broker_config["request_timeout"] = conf.timeout * 1000

local prod, err = producer:new(broker_list,broker_config)
if err then
return nil, "failed to identify the broker specified: " .. err
local function send_kafka_data(conf, log_message, prod)
if core.table.nkeys(conf.broker_list) == 0 then
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved
core.log.error("failed to identify the broker specified")
end

local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
core.log.info("partition_id: ", partition_id(prod.sendbuffer,
conf.kafka_topic, log_message))

if not ok then
return nil, "failed to send data to Kafka topic: " .. err
end
Expand All @@ -118,6 +117,12 @@ local function remove_stale_objects(premature)
end


local function create_producer(broker_list, broker_config)
core.log.info("create new kafka producer instance")
return producer:new(broker_list, broker_config)
end


function _M.log(conf, ctx)
local entry
if conf.meta_format == "origin" then
Expand All @@ -141,6 +146,29 @@ function _M.log(conf, ctx)
return
end

-- reuse producer via lrucache to avoid unbalanced partitions of messages in kafka
local broker_list = core.table.new(0, core.table.nkeys(conf.broker_list))
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved
local broker_config = {}

for host, port in pairs(conf.broker_list) do
if type(host) == 'string'
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved
and type(port) == 'number' then
tzssangglass marked this conversation as resolved.
Show resolved Hide resolved
local broker = {
host = host,
port = port
}
core.table.insert(broker_list, broker)
end
end

broker_config["request_timeout"] = conf.timeout * 1000

local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer,
broker_list, broker_config)
if err then
return nil, "failed to identify the broker specified: " .. err
end

-- Generate a function to be executed by the batch processor
local func = function(entries, batch_max_size)
local data, err
Expand All @@ -158,7 +186,7 @@ function _M.log(conf, ctx)
end

core.log.info("send data to kafka: ", data)
return send_kafka_data(conf, data)
return send_kafka_data(conf, data, prod)
end

local config = {
Expand Down
69 changes: 69 additions & 0 deletions t/plugin/kafka-logger.t
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,72 @@ hello world
--- error_log_like eval
qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/
--- wait: 2



=== TEST 17: use the topic with 3 partitions
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
local code, body = t('/apisix/admin/routes/1',
ngx.HTTP_PUT,
[[{
"plugins": {
"kafka-logger": {
"broker_list" : {
"127.0.0.1": 9092
},
"kafka_topic" : "test3",
"timeout" : 1,
"batch_max_size": 1,
"include_req_body": false
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}]]
)
if code >= 300 then
ngx.status = code
end
ngx.say(body)
}
}
--- request
GET /t
--- response_body
passed
--- no_error_log
[error]



=== TEST 18: report log to kafka by different partitions
--- config
location /t {
content_by_lua_block {
local t = require("lib.test_admin").test
t('/hello',ngx.HTTP_GET)
ngx.sleep(0.5)
t('/hello',ngx.HTTP_GET)
ngx.sleep(0.5)
t('/hello',ngx.HTTP_GET)
ngx.sleep(0.5)
}
}
--- request
GET /t
--- timeout: 5s
--- ignore_response
--- no_error_log
[error]
--- error_log eval
[qr/partition_id: 1/,
qr/partition_id: 0/,
qr/partition_id: 2/]