Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid caching outdated discovery upstream nodes #3295

Merged
merged 1 commit into from
Jan 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ local function fetch_health_nodes(upstream, checker)
end

if core.table.nkeys(up_nodes) == 0 then
core.log.warn("all upstream nodes is unhealth, use default")
core.log.warn("all upstream nodes is unhealthy, use default")
for _, node in ipairs(nodes) do
up_nodes[node.host .. ":" .. node.port] = node.weight
end
Expand Down
9 changes: 7 additions & 2 deletions apisix/http/service.lua
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@ local function filter(service)
return
end

if not service.value.upstream or not service.value.upstream.nodes then
if not service.value.upstream then
return
end

service.value.upstream.parent = service

if not service.value.upstream.nodes then
return
end

Expand Down Expand Up @@ -79,7 +85,6 @@ local function filter(service)
service.value.upstream.nodes = new_nodes
end

service.value.upstream.parent = service
core.log.info("filter service: ", core.json.delay_encode(service))
end

Expand Down
29 changes: 3 additions & 26 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ local admin_init = require("apisix.admin.init")
local get_var = require("resty.ngxvar").fetch
local router = require("apisix.router")
local set_upstream = require("apisix.upstream").set_by_route
local upstream_util = require("apisix.utils.upstream")
local ipmatcher = require("resty.ipmatcher")
local ngx = ngx
local get_method = ngx.req.get_method
Expand All @@ -32,7 +33,6 @@ local math = math
local error = error
local ipairs = ipairs
local tostring = tostring
local type = type
local ngx_now = ngx.now
local str_byte = string.byte
local str_sub = string.sub
Expand Down Expand Up @@ -237,29 +237,6 @@ local function parse_domain_for_nodes(nodes)
end


local function compare_upstream_node(old_t, new_t)
if type(old_t) ~= "table" then
return false
end

if #new_t ~= #old_t then
return false
end

for i = 1, #new_t do
local new_node = new_t[i]
local old_node = old_t[i]
for _, name in ipairs({"host", "port", "weight"}) do
if new_node[name] ~= old_node[name] then
return false
end
end
end

return true
end


local function parse_domain_in_up(up)
local nodes = up.value.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
Expand All @@ -268,7 +245,7 @@ local function parse_domain_in_up(up)
end

local old_dns_value = up.dns_value and up.dns_value.nodes
local ok = compare_upstream_node(old_dns_value, new_nodes)
local ok = upstream_util.compare_upstream_node(old_dns_value, new_nodes)
if ok then
return up
end
Expand All @@ -291,7 +268,7 @@ local function parse_domain_in_route(route)
end

local old_dns_value = route.dns_value and route.dns_value.upstream.nodes
local ok = compare_upstream_node(old_dns_value, new_nodes)
local ok = upstream_util.compare_upstream_node(old_dns_value, new_nodes)
if ok then
return route
end
Expand Down
4 changes: 2 additions & 2 deletions apisix/plugin.lua
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,8 @@ end


function _M.merge_service_route(service_conf, route_conf)
core.log.info("service conf: ", core.json.delay_encode(service_conf))
core.log.info(" route conf: ", core.json.delay_encode(route_conf))
core.log.info("service conf: ", core.json.delay_encode(service_conf, true))
core.log.info(" route conf: ", core.json.delay_encode(route_conf, true))

local route_service_key = route_conf.value.id .. "#"
.. route_conf.modifiedIndex .. "#" .. service_conf.modifiedIndex
Expand Down
9 changes: 7 additions & 2 deletions apisix/router.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ local function filter(route)
return
end

if not route.value.upstream or not route.value.upstream.nodes then
if not route.value.upstream then
return
end

route.value.upstream.parent = route

if not route.value.upstream.nodes then
return
end

Expand Down Expand Up @@ -64,7 +70,6 @@ local function filter(route)
route.value.upstream.nodes = new_nodes
end

route.value.upstream.parent = route
core.log.info("filter route: ", core.json.delay_encode(route))
end

Expand Down
31 changes: 27 additions & 4 deletions apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
local require = require
local core = require("apisix.core")
local discovery = require("apisix.discovery.init").discovery
local upstream_util = require("apisix.utils.upstream")
local error = error
local tostring = tostring
local ipairs = ipairs
Expand Down Expand Up @@ -132,9 +133,26 @@ function _M.set_by_route(route, api_ctx)

local dis = discovery[up_conf.discovery_type]
if not dis then
return 500, "discovery " .. up_conf.discovery_type .. "is uninitialized"
return 500, "discovery " .. up_conf.discovery_type .. " is uninitialized"
end
local new_nodes = dis.nodes(up_conf.service_name)
local same = upstream_util.compare_upstream_node(up_conf.nodes, new_nodes)
if not same then
up_conf.nodes = new_nodes
local new_up_conf = core.table.clone(up_conf)
core.log.info("discover new upstream from ", up_conf.service_name, ", type ",
up_conf.discovery_type, ": ",
core.json.delay_encode(new_up_conf, true))

local parent = up_conf.parent
if parent.value.upstream then
-- the up_conf comes from route or service
parent.value.upstream = new_up_conf
else
parent.value = new_up_conf
end
up_conf = new_up_conf
end
up_conf.nodes = dis.nodes(up_conf.service_name)
end

set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf),
Expand Down Expand Up @@ -175,7 +193,13 @@ function _M.init_worker()
item_schema = core.schema.upstream,
filter = function(upstream)
upstream.has_domain = false
if not upstream.value or not upstream.value.nodes then
if not upstream.value then
return
end

upstream.value.parent = upstream

if not upstream.value.nodes then
return
end

Expand Down Expand Up @@ -207,7 +231,6 @@ function _M.init_worker()
upstream.value.nodes = new_nodes
end

upstream.value.parent = upstream
core.log.info("filter upstream: ", core.json.delay_encode(upstream))
end,
})
Expand Down
56 changes: 56 additions & 0 deletions apisix/utils/upstream.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local ipairs = ipairs
local type = type


local _M = {}


local function sort_by_key_host(a, b)
return a.host < b.host
end


function _M.compare_upstream_node(old_t, new_t)
if type(old_t) ~= "table" then
return false
end

if #new_t ~= #old_t then
return false
end

core.table.sort(old_t, sort_by_key_host)
core.table.sort(new_t, sort_by_key_host)

for i = 1, #new_t do
local new_node = new_t[i]
local old_node = old_t[i]
for _, name in ipairs({"host", "port", "weight"}) do
if new_node[name] ~= old_node[name] then
return false
end
end
end

return true
end


return _M
Loading