diff --git a/apisix/balancer.lua b/apisix/balancer.lua index ccf7d62026fa4..219eb0c4f3c09 100644 --- a/apisix/balancer.lua +++ b/apisix/balancer.lua @@ -67,7 +67,7 @@ local function fetch_health_nodes(upstream, checker) end if core.table.nkeys(up_nodes) == 0 then - core.log.warn("all upstream nodes is unhealth, use default") + core.log.warn("all upstream nodes is unhealthy, use default") for _, node in ipairs(nodes) do up_nodes[node.host .. ":" .. node.port] = node.weight end diff --git a/apisix/init.lua b/apisix/init.lua index bdbde96aef0de..af465887ff39e 100644 --- a/apisix/init.lua +++ b/apisix/init.lua @@ -24,6 +24,7 @@ local admin_init = require("apisix.admin.init") local get_var = require("resty.ngxvar").fetch local router = require("apisix.router") local set_upstream = require("apisix.upstream").set_by_route +local upstream_util = require("apisix.utils.upstream") local ipmatcher = require("resty.ipmatcher") local ngx = ngx local get_method = ngx.req.get_method @@ -32,7 +33,6 @@ local math = math local error = error local ipairs = ipairs local tostring = tostring -local type = type local ngx_now = ngx.now local str_byte = string.byte local str_sub = string.sub @@ -237,29 +237,6 @@ local function parse_domain_for_nodes(nodes) end -local function compare_upstream_node(old_t, new_t) - if type(old_t) ~= "table" then - return false - end - - if #new_t ~= #old_t then - return false - end - - for i = 1, #new_t do - local new_node = new_t[i] - local old_node = old_t[i] - for _, name in ipairs({"host", "port", "weight"}) do - if new_node[name] ~= old_node[name] then - return false - end - end - end - - return true -end - - local function parse_domain_in_up(up) local nodes = up.value.nodes local new_nodes, err = parse_domain_for_nodes(nodes) @@ -268,7 +245,7 @@ local function parse_domain_in_up(up) end local old_dns_value = up.dns_value and up.dns_value.nodes - local ok = compare_upstream_node(old_dns_value, new_nodes) + local ok = upstream_util.compare_upstream_node(old_dns_value, new_nodes) if ok then return up end @@ -291,7 +268,7 @@ local function parse_domain_in_route(route) end local old_dns_value = route.dns_value and route.dns_value.upstream.nodes - local ok = compare_upstream_node(old_dns_value, new_nodes) + local ok = upstream_util.compare_upstream_node(old_dns_value, new_nodes) if ok then return route end diff --git a/apisix/upstream.lua b/apisix/upstream.lua index 096f81e78430c..96fcdfaa5a123 100644 --- a/apisix/upstream.lua +++ b/apisix/upstream.lua @@ -17,6 +17,7 @@ local require = require local core = require("apisix.core") local discovery = require("apisix.discovery.init").discovery +local upstream_util = require("apisix.utils.upstream") local error = error local tostring = tostring local ipairs = ipairs @@ -144,9 +145,18 @@ function _M.set_by_route(route, api_ctx) local dis = discovery[up_conf.discovery_type] if not dis then - return 500, "discovery " .. up_conf.discovery_type .. "is uninitialized" + return 500, "discovery " .. up_conf.discovery_type .. " is uninitialized" + end + local new_nodes = dis.nodes(up_conf.service_name) + local same = upstream_util.compare_upstream_node(up_conf.nodes, new_nodes) + if not same then + up_conf.nodes = new_nodes + local new_up_conf = core.table.clone(up_conf) + core.log.info("discover new upstream from ", up_conf.service_name, ", type ", + up_conf.discovery_type, ": ", + core.json.delay_encode(new_up_conf)) + up_conf = new_up_conf end - up_conf.nodes = dis.nodes(up_conf.service_name) end set_directly(api_ctx, up_conf.type .. "#upstream_" .. tostring(up_conf), diff --git a/apisix/utils/upstream.lua b/apisix/utils/upstream.lua new file mode 100644 index 0000000000000..346789aa1f665 --- /dev/null +++ b/apisix/utils/upstream.lua @@ -0,0 +1,56 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local ipairs = ipairs +local type = type + + +local _M = {} + + +local function sort_by_key_host(a, b) + return a.host < b.host +end + + +function _M.compare_upstream_node(old_t, new_t) + if type(old_t) ~= "table" then + return false + end + + if #new_t ~= #old_t then + return false + end + + core.table.sort(old_t, sort_by_key_host) + core.table.sort(new_t, sort_by_key_host) + + for i = 1, #new_t do + local new_node = new_t[i] + local old_node = old_t[i] + for _, name in ipairs({"host", "port", "weight"}) do + if new_node[name] ~= old_node[name] then + return false + end + end + end + + return true +end + + +return _M diff --git a/t/node/healthcheck-discovery.t b/t/node/healthcheck-discovery.t new file mode 100644 index 0000000000000..14515b35843dc --- /dev/null +++ b/t/node/healthcheck-discovery.t @@ -0,0 +1,157 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use t::APISIX 'no_plan'; + +master_on(); +repeat_each(1); +no_root_location(); +no_shuffle(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->yaml_config) { + my $yaml_config = <<_EOC_; +apisix: + node_listen: 1984 + config_center: yaml + enable_admin: false +_EOC_ + + $block->set_value("yaml_config", $yaml_config); + } + + if ($block->apisix_yaml) { + my $upstream = <<_EOC_; +upstreams: + - service_name: mock + discovery_type: mock + type: roundrobin + id: 1 + checks: + active: + http_path: "/status" + host: 127.0.0.1 + port: 1988 + healthy: + interval: 1 + successes: 1 + unhealthy: + interval: 1 + http_failures: 1 +#END +_EOC_ + + $block->set_value("apisix_yaml", $block->apisix_yaml . $upstream); + } + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: sanity +--- apisix_yaml +routes: + - + uris: + - /hello + upstream_id: 1 +--- config + location /t { + content_by_lua_block { + local discovery = require("apisix.discovery.init").discovery + discovery.mock = { + nodes = function() + return { + {host = "127.0.0.1", port = 1980, weight = 1}, + {host = "0.0.0.0", port = 1980, weight = 1}, + } + end + } + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + + ngx.sleep(0.5) + + ngx.say(res.status) + } + } +--- grep_error_log eval +qr/unhealthy TCP increment \(1\/2\) for '127.0.0.1\([^)]+\)'/ +--- grep_error_log_out +unhealthy TCP increment (1/2) for '127.0.0.1(127.0.0.1:1988)' +unhealthy TCP increment (1/2) for '127.0.0.1(0.0.0.0:1988)' + + + +=== TEST 2: create new checker when nodes changed +--- ONLY +--- apisix_yaml +routes: + - + uris: + - /hello + upstream_id: 1 +--- config + location /t { + content_by_lua_block { + local discovery = require("apisix.discovery.init").discovery + discovery.mock = { + nodes = function() + return { + {host = "127.0.0.1", port = 1980, weight = 1}, + {host = "0.0.0.0", port = 1980, weight = 1}, + } + end + } + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + ngx.sleep(0.5) + + discovery.mock = { + nodes = function() + return { + {host = "127.0.0.1", port = 1980, weight = 1}, + {host = "127.0.0.2", port = 1980, weight = 1}, + {host = "127.0.0.3", port = 1980, weight = 1}, + } + end + } + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + ngx.say(res.status) + } + } +--- grep_error_log eval +qr/create new checker: table/ +--- grep_error_log_out +create new checker: table diff --git a/t/node/upstream-discovery.t b/t/node/upstream-discovery.t new file mode 100644 index 0000000000000..ca6882316a9f3 --- /dev/null +++ b/t/node/upstream-discovery.t @@ -0,0 +1,153 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use t::APISIX 'no_plan'; + +master_on(); +repeat_each(1); +no_root_location(); +no_shuffle(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->yaml_config) { + my $yaml_config = <<_EOC_; +apisix: + node_listen: 1984 + config_center: yaml + enable_admin: false +_EOC_ + + $block->set_value("yaml_config", $yaml_config); + } + + if ($block->apisix_yaml) { + my $upstream = <<_EOC_; +upstreams: + - service_name: mock + discovery_type: mock + type: roundrobin + id: 1 +#END +_EOC_ + + $block->set_value("apisix_yaml", $block->apisix_yaml . $upstream); + } + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: create new server picker when nodes change +--- apisix_yaml +routes: + - + uris: + - /hello + upstream_id: 1 +--- config + location /t { + content_by_lua_block { + local discovery = require("apisix.discovery.init").discovery + discovery.mock = { + nodes = function() + return { + {host = "127.0.0.1", port = 1980, weight = 1}, + {host = "0.0.0.0", port = 1980, weight = 1}, + } + end + } + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + ngx.say(res.status) + + discovery.mock = { + nodes = function() + return { + {host = "127.0.0.2", port = 1980, weight = 1}, + {host = "127.0.0.3", port = 1980, weight = 1}, + } + end + } + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + } + } +--- grep_error_log eval +qr/create_obj_fun\(\): upstream nodes:/ +--- grep_error_log_out +create_obj_fun(): upstream nodes: +create_obj_fun(): upstream nodes: + + + +=== TEST 2: don't create new server picker if nodes don't change +--- apisix_yaml +routes: + - + uris: + - /hello + upstream_id: 1 +--- config + location /t { + content_by_lua_block { + local discovery = require("apisix.discovery.init").discovery + discovery.mock = { + nodes = function() + return { + {host = "127.0.0.1", port = 1980, weight = 1}, + {host = "0.0.0.0", port = 1980, weight = 1}, + } + end + } + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + ngx.say(res.status) + + discovery.mock = { + nodes = function() + return { + {host = "0.0.0.0", port = 1980, weight = 1}, + {host = "127.0.0.1", port = 1980, weight = 1}, + } + end + } + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local httpc = http.new() + local res, err = httpc:request_uri(uri, {method = "GET", keepalive = false}) + } + } +--- grep_error_log eval +qr/create_obj_fun\(\): upstream nodes:/ +--- grep_error_log_out +create_obj_fun(): upstream nodes: +create_obj_fun(): upstream nodes: