Skip to content

Commit

Permalink
fix: avoid caching outdated discovery upstream nodes
Browse files Browse the repository at this point in the history
Fix #2369
Fix #1838

Signed-off-by: spacewander <spacewanderlzx@gmail.com>
  • Loading branch information
spacewander committed Jan 14, 2021
1 parent 3e19b06 commit 5b0f51e
Show file tree
Hide file tree
Showing 6 changed files with 382 additions and 29 deletions.
2 changes: 1 addition & 1 deletion apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ local function fetch_health_nodes(upstream, checker)
end

if core.table.nkeys(up_nodes) == 0 then
core.log.warn("all upstream nodes is unhealth, use default")
core.log.warn("all upstream nodes is unhealthy, use default")
for _, node in ipairs(nodes) do
up_nodes[node.host .. ":" .. node.port] = node.weight
end
Expand Down
29 changes: 3 additions & 26 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ local admin_init = require("apisix.admin.init")
local get_var = require("resty.ngxvar").fetch
local router = require("apisix.router")
local set_upstream = require("apisix.upstream").set_by_route
local upstream_util = require("apisix.utils.upstream")
local ipmatcher = require("resty.ipmatcher")
local ngx = ngx
local get_method = ngx.req.get_method
Expand All @@ -32,7 +33,6 @@ local math = math
local error = error
local ipairs = ipairs
local tostring = tostring
local type = type
local ngx_now = ngx.now
local str_byte = string.byte
local str_sub = string.sub
Expand Down Expand Up @@ -237,29 +237,6 @@ local function parse_domain_for_nodes(nodes)
end


local function compare_upstream_node(old_t, new_t)
if type(old_t) ~= "table" then
return false
end

if #new_t ~= #old_t then
return false
end

for i = 1, #new_t do
local new_node = new_t[i]
local old_node = old_t[i]
for _, name in ipairs({"host", "port", "weight"}) do
if new_node[name] ~= old_node[name] then
return false
end
end
end

return true
end


local function parse_domain_in_up(up)
local nodes = up.value.nodes
local new_nodes, err = parse_domain_for_nodes(nodes)
Expand All @@ -268,7 +245,7 @@ local function parse_domain_in_up(up)
end

local old_dns_value = up.dns_value and up.dns_value.nodes
local ok = compare_upstream_node(old_dns_value, new_nodes)
local ok = upstream_util.compare_upstream_node(old_dns_value, new_nodes)
if ok then
return up
end
Expand All @@ -291,7 +268,7 @@ local function parse_domain_in_route(route)
end

local old_dns_value = route.dns_value and route.dns_value.upstream.nodes
local ok = compare_upstream_node(old_dns_value, new_nodes)
local ok = upstream_util.compare_upstream_node(old_dns_value, new_nodes)
if ok then
return route
end
Expand Down
14 changes: 12 additions & 2 deletions apisix/upstream.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
local require = require
local core = require("apisix.core")
local discovery = require("apisix.discovery.init").discovery
local upstream_util = require("apisix.utils.upstream")
local error = error
local tostring = tostring
local ipairs = ipairs
Expand Down Expand Up @@ -144,9 +145,18 @@ function _M.set_by_route(route, api_ctx)

local dis = discovery[up_conf.discovery_type]
if not dis then
return 500, "discovery " .. up_conf.discovery_type .. "is uninitialized"
return 500, "discovery " .. up_conf.discovery_type .. " is uninitialized"
end
local new_nodes = dis.nodes(up_conf.service_name)
local same = upstream_util.compare_upstream_node(up_conf.nodes, new_nodes)
if not same then
up_conf.nodes = new_nodes
local new_up_conf = core.table.clone(up_conf)
core.log.info("discover new upstream from ", up_conf.service_name, ", type ",
up_conf.discovery_type, ": ",
core.json.delay_encode(new_up_conf))
up_conf = new_up_conf
end
up_conf.nodes = dis.nodes(up_conf.service_name)
end

set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf),
Expand Down
56 changes: 56 additions & 0 deletions apisix/utils/upstream.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local ipairs = ipairs
local type = type


local _M = {}


local function sort_by_key_host(a, b)
return a.host < b.host
end


function _M.compare_upstream_node(old_t, new_t)
if type(old_t) ~= "table" then
return false
end

if #new_t ~= #old_t then
return false
end

core.table.sort(old_t, sort_by_key_host)
core.table.sort(new_t, sort_by_key_host)

for i = 1, #new_t do
local new_node = new_t[i]
local old_node = old_t[i]
for _, name in ipairs({"host", "port", "weight"}) do
if new_node[name] ~= old_node[name] then
return false
end
end
end

return true
end


return _M
157 changes: 157 additions & 0 deletions t/node/healthcheck-discovery.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

use t::APISIX 'no_plan';

master_on();
repeat_each(1);
no_root_location();
no_shuffle();

add_block_preprocessor(sub {
my ($block) = @_;

if (!$block->yaml_config) {
my $yaml_config = <<_EOC_;
apisix:
node_listen: 1984
config_center: yaml
enable_admin: false
_EOC_

$block->set_value("yaml_config", $yaml_config);
}

if ($block->apisix_yaml) {
my $upstream = <<_EOC_;
upstreams:
- service_name: mock
discovery_type: mock
type: roundrobin
id: 1
checks:
active:
http_path: "/status"
host: 127.0.0.1
port: 1988
healthy:
interval: 1
successes: 1
unhealthy:
interval: 1
http_failures: 1
#END
_EOC_

$block->set_value("apisix_yaml", $block->apisix_yaml . $upstream);
}

if (!$block->request) {
$block->set_value("request", "GET /t");
}

if ((!defined $block->error_log) && (!defined $block->no_error_log)) {
$block->set_value("no_error_log", "[error]");
}
});

run_tests();

__DATA__
=== TEST 1: sanity
--- apisix_yaml
routes:
-
uris:
- /hello
upstream_id: 1
--- config
location /t {
content_by_lua_block {
local discovery = require("apisix.discovery.init").discovery
discovery.mock = {
nodes = function()
return {
{host = "127.0.0.1", port = 1980, weight = 1},
{host = "0.0.0.0", port = 1980, weight = 1},
}
end
}
local http = require "resty.http"
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
ngx.sleep(0.5)
ngx.say(res.status)
}
}
--- grep_error_log eval
qr/unhealthy TCP increment \(1\/2\) for '127.0.0.1\([^)]+\)'/
--- grep_error_log_out
unhealthy TCP increment (1/2) for '127.0.0.1(127.0.0.1:1988)'
unhealthy TCP increment (1/2) for '127.0.0.1(0.0.0.0:1988)'
=== TEST 2: create new checker when nodes changed
--- ONLY
--- apisix_yaml
routes:
-
uris:
- /hello
upstream_id: 1
--- config
location /t {
content_by_lua_block {
local discovery = require("apisix.discovery.init").discovery
discovery.mock = {
nodes = function()
return {
{host = "127.0.0.1", port = 1980, weight = 1},
{host = "0.0.0.0", port = 1980, weight = 1},
}
end
}
local http = require "resty.http"
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
ngx.sleep(0.5)
discovery.mock = {
nodes = function()
return {
{host = "127.0.0.1", port = 1980, weight = 1},
{host = "127.0.0.2", port = 1980, weight = 1},
{host = "127.0.0.3", port = 1980, weight = 1},
}
end
}
local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello"
local httpc = http.new()
local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false})
ngx.say(res.status)
}
}
--- grep_error_log eval
qr/create new checker: table/
--- grep_error_log_out
create new checker: table
Loading

0 comments on commit 5b0f51e

Please sign in to comment.