Skip to content

Commit

Permalink
feat: support passing different host headers in multiple nodes (#4208)
Browse files Browse the repository at this point in the history
Fix #2620
Fix #4197
Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander committed May 13, 2021
1 parent d30fe36 commit 045c351
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 42 deletions.
97 changes: 73 additions & 24 deletions apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,29 @@ local function create_server_picker(upstream, checker)
end

if picker then
local nodes = upstream.nodes
local addr_to_domain = {}
for _, node in ipairs(nodes) do
if node.domain then
local addr = node.host .. ":" .. node.port
addr_to_domain[addr] = node.domain
end
end

local up_nodes = fetch_health_nodes(upstream, checker)

if #up_nodes._priority_index > 1 then
core.log.info("upstream nodes: ", core.json.delay_encode(up_nodes))
return priority_balancer.new(up_nodes, upstream, picker)
local server_picker = priority_balancer.new(up_nodes, upstream, picker)
server_picker.addr_to_domain = addr_to_domain
return server_picker
end

core.log.info("upstream nodes: ",
core.json.delay_encode(up_nodes[up_nodes._priority_index[1]]))
return picker.new(up_nodes[up_nodes._priority_index[1]], upstream)
local server_picker = picker.new(up_nodes[up_nodes._priority_index[1]], upstream)
server_picker.addr_to_domain = addr_to_domain
return server_picker
end

return nil, "invalid balancer type: " .. upstream.type, 0
Expand All @@ -120,11 +133,9 @@ local function parse_addr(addr)
end


local function pick_server(route, ctx)
core.log.info("route: ", core.json.delay_encode(route, true))
core.log.info("ctx: ", core.json.delay_encode(ctx, true))
-- set_balancer_opts will be called in balancer phase and before any tries
local function set_balancer_opts(ctx)
local up_conf = ctx.upstream_conf

if up_conf.timeout then
local timeout = up_conf.timeout
local ok, err = set_timeouts(timeout.connect, timeout.send,
Expand All @@ -134,6 +145,30 @@ local function pick_server(route, ctx)
end
end

local retries = up_conf.retries
if not retries or retries < 0 then
retries = #up_conf.nodes - 1
end

if retries > 0 then
local ok, err = set_more_tries(retries)
if not ok then
core.log.error("could not set upstream retries: ", err)
elseif err then
core.log.warn("could not set upstream retries: ", err)
end
end
end


-- pick_server will be called:
-- 1. in the access phase so that we can set headers according to the picked server
-- 2. each time we need to retry upstream
local function pick_server(route, ctx)
core.log.info("route: ", core.json.delay_encode(route, true))
core.log.info("ctx: ", core.json.delay_encode(ctx, true))
local up_conf = ctx.upstream_conf

local nodes_count = #up_conf.nodes
if nodes_count == 1 then
local node = up_conf.nodes[1]
Expand Down Expand Up @@ -168,17 +203,6 @@ local function pick_server(route, ctx)
end
end

if ctx.balancer_try_count == 1 then
local retries = up_conf.retries
if not retries or retries < 0 then
retries = #up_conf.nodes - 1
end

if retries > 0 then
set_more_tries(retries)
end
end

if checker then
version = version .. "#" .. checker.status_ver
end
Expand All @@ -200,15 +224,18 @@ local function pick_server(route, ctx)
end
ctx.balancer_server = server

local domain = server_picker.addr_to_domain[server]
local res, err = lrucache_addr(server, nil, parse_addr, server)
ctx.balancer_ip = res.host
ctx.balancer_port = res.port
-- core.log.info("cached balancer peer host: ", host, ":", port)
if err then
core.log.error("failed to parse server addr: ", server, " err: ", err)
return core.response.exit(502)
end

res.domain = domain
ctx.balancer_ip = res.host
ctx.balancer_port = res.port
ctx.server_picker = server_picker

return res
end

Expand All @@ -218,10 +245,32 @@ _M.pick_server = pick_server


function _M.run(route, ctx)
local server, err = pick_server(route, ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
local server, err

if ctx.picked_server then
-- use the server picked in the access phase
server = ctx.picked_server
ctx.picked_server = nil

set_balancer_opts(ctx)

else
-- retry
server, err = pick_server(route, ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
end

local pass_host = ctx.pass_host
if pass_host == "node" and balancer.recreate_request then
local host = server.domain or server.host
if host ~= ctx.var.upstream_host then
-- retried node has a different host
ctx.var.upstream_host = host
balancer.recreate_request()
end
end
end

core.log.info("proxy request to ", server.host, ":", server.port)
Expand Down
43 changes: 26 additions & 17 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ local set_upstream = apisix_upstream.set_by_route
local upstream_util = require("apisix.utils.upstream")
local ctxdump = require("resty.ctxdump")
local ipmatcher = require("resty.ipmatcher")
local ngx_balancer = require("ngx.balancer")
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
Expand Down Expand Up @@ -106,7 +107,7 @@ function _M.http_init_worker()
discovery.init_worker()
end
require("apisix.balancer").init_worker()
load_balancer = require("apisix.balancer").run
load_balancer = require("apisix.balancer")
require("apisix.admin.init").init_worker()

require("apisix.timers").init_worker()
Expand Down Expand Up @@ -228,7 +229,7 @@ local function parse_domain_in_route(route)
end


local function set_upstream_host(api_ctx)
local function set_upstream_host(api_ctx, picked_server)
local pass_host = api_ctx.pass_host or "pass"
if pass_host == "pass" then
return
Expand All @@ -239,21 +240,13 @@ local function set_upstream_host(api_ctx)
return
end

-- only support single node for `node` mode currently
local host
local up_conf = api_ctx.upstream_conf
local nodes_count = up_conf.nodes and #up_conf.nodes or 0
if nodes_count == 1 then
local node = up_conf.nodes[1]
if node.domain and #node.domain > 0 then
host = node.domain
else
host = node.host
end
end

if host then
api_ctx.var.upstream_host = host
api_ctx.var.upstream_host = node.domain or node.host
elseif picked_server.domain and ngx_balancer.recreate_request then
api_ctx.var.upstream_host = picked_server.domain
end
end

Expand Down Expand Up @@ -473,7 +466,15 @@ function _M.http_access_phase()
core.response.exit(code)
end

set_upstream_host(api_ctx)
local server, err = load_balancer.pick_server(route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return core.response.exit(502)
end

api_ctx.picked_server = server

set_upstream_host(api_ctx, server)

ngx_var.ctx_ref = ctxdump.stash_ngx_ctx()
local up_scheme = api_ctx.upstream_scheme
Expand Down Expand Up @@ -665,7 +666,7 @@ function _M.http_balancer_phase()
return core.response.exit(500)
end

load_balancer(api_ctx.matched_route, api_ctx)
load_balancer.run(api_ctx.matched_route, api_ctx)
end


Expand Down Expand Up @@ -763,7 +764,7 @@ function _M.stream_init_worker()
core.config.init_worker()
end

load_balancer = require("apisix.balancer").run
load_balancer = require("apisix.balancer")

local_conf = core.config.local_conf()
end
Expand Down Expand Up @@ -815,6 +816,14 @@ function _M.stream_preread_phase()
core.log.error("failed to set upstream: ", err)
return ngx_exit(1)
end

local server, err = load_balancer.pick_server(matched_route, api_ctx)
if not server then
core.log.error("failed to pick server: ", err)
return ngx_exit(1)
end

api_ctx.picked_server = server
end


Expand All @@ -826,7 +835,7 @@ function _M.stream_balancer_phase()
return ngx_exit(1)
end

load_balancer(api_ctx.matched_route, api_ctx)
load_balancer.run(api_ctx.matched_route, api_ctx)
end


Expand Down
3 changes: 2 additions & 1 deletion apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ local core = require("apisix.core")
local discovery = require("apisix.discovery.init").discovery
local upstream_util = require("apisix.utils.upstream")
local apisix_ssl = require("apisix.ssl")
local balancer = require("ngx.balancer")
local error = error
local tostring = tostring
local ipairs = ipairs
Expand Down Expand Up @@ -385,7 +386,7 @@ local function check_upstream_conf(in_dp, conf)
end

if conf.pass_host == "node" and conf.nodes and
core.table.nkeys(conf.nodes) ~= 1
not balancer.recreate_request and core.table.nkeys(conf.nodes) ~= 1
then
return false, "only support single node for `node` mode currently"
end
Expand Down
1 change: 1 addition & 0 deletions t/admin/upstream.t
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,7 @@ GET /t
}
--- request
GET /t
--- skip_nginx: 5: > 1.19.0
--- error_code: 400
--- no_error_log
[error]
Expand Down
Loading

0 comments on commit 045c351

Please sign in to comment.