Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
shreemaan-abhishek committed Apr 10, 2024
1 parent 18842f9 commit af2749b
Showing 1 changed file with 112 additions and 0 deletions.
112 changes: 112 additions & 0 deletions t/pubsub/kafka.t
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ failed to initialize pubsub module, err: bad "upgrade" request header: nil
# script that prepares the CI environment
location /t {
content_by_lua_block {
local pb = require("pb")
local lib_pubsub = require("lib.pubsub")
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka")
local data = {
Expand Down Expand Up @@ -235,6 +236,8 @@ failed to initialize pubsub module, err: bad "upgrade" request header: nil
}
for i = 1, #data do
-- force clear state
pb.state(nil)
local data = test_pubsub:send_recv_ws_binary(data[i])
if data.error_resp then
ngx.say(data.sequence..data.error_resp.message)
Expand Down Expand Up @@ -367,3 +370,112 @@ qr/self[- ]signed certificate/
}
--- response_body
0offset: 30
=== TEST 3: hit route (Kafka)
--- config
# The messages used in this test are produced in the linux-ci-init-service.sh
# script that prepares the CI environment
location /t {
content_by_lua_block {
local lib_pubsub = require("lib.pubsub")
local test_pubsub = lib_pubsub.new_ws("ws://127.0.0.1:1984/kafka")
local data = {
{
sequence = 0,
cmd_kafka_list_offset = {
topic = "not-exist",
partition = 0,
timestamp = -1,
},
},
{
sequence = 1,
cmd_kafka_fetch = {
topic = "not-exist",
partition = 0,
offset = 0,
},
},
{
-- Query first message offset
sequence = 2,
cmd_kafka_list_offset = {
topic = "test-consumer",
partition = 0,
timestamp = -2,
},
},
{
-- Query last message offset
sequence = 3,
cmd_kafka_list_offset = {
topic = "test-consumer",
partition = 0,
timestamp = -1,
},
},
{
-- Query by timestamp, 9999999999999 later than the
-- production time of any message
sequence = 4,
cmd_kafka_list_offset = {
topic = "test-consumer",
partition = 0,
timestamp = "9999999999999",
},
},
{
-- Query by timestamp, 1500000000000 ms earlier than the
-- production time of any message
sequence = 5,
cmd_kafka_list_offset = {
topic = "test-consumer",
partition = 0,
timestamp = "1500000000000",
},
},
{
sequence = 6,
cmd_kafka_fetch = {
topic = "test-consumer",
partition = 0,
offset = 14,
},
},
{
sequence = 7,
cmd_kafka_fetch = {
topic = "test-consumer",
partition = 0,
offset = 999,
},
},
}
for i = 1, #data do
local data = test_pubsub:send_recv_ws_binary(data[i])
if data.error_resp then
ngx.say(data.sequence..data.error_resp.message)
end
if data.kafka_list_offset_resp then
ngx.say(data.sequence.."offset: "..data.kafka_list_offset_resp.offset)
end
if data.kafka_fetch_resp then
ngx.say(data.sequence.."offset: "..data.kafka_fetch_resp.messages[1].offset..
" msg: "..data.kafka_fetch_resp.messages[1].value)
end
end
test_pubsub:close_ws()
}
}
--- response_body
0failed to list offset, topic: not-exist, partition: 0, err: not found topic
1failed to fetch message, topic: not-exist, partition: 0, err: not found topic
2offset: 0
3offset: 30
4offset: -1
5offset: 0
6offset: 14 msg: testmsg15
7failed to fetch message, topic: test-consumer, partition: 0, err: OFFSET_OUT_OF_RANGE

0 comments on commit af2749b

Please sign in to comment.