Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(discovery): support endpointslices in kubernetes discovery #10916

Merged
merged 8 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apisix/discovery/kubernetes/informer_factory.lua
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ function _M.new(group, version, kind, plural, namespace)
end

if namespace and namespace ~= "" then
path = path .. "/namespace/" .. namespace
path = path .. "/namespaces/" .. namespace
dongjiang1989 marked this conversation as resolved.
Show resolved Hide resolved
end
path = path .. "/" .. plural

Expand Down
98 changes: 91 additions & 7 deletions apisix/discovery/kubernetes/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,69 @@ local function sort_nodes_cmp(left, right)
return left.port < right.port
end

local function on_endpoint_slices_modified(handle, endpoint)
if handle.namespace_selector and
not handle:namespace_selector(endpoint.metadata.namespace) then
return
end

core.log.debug(core.json.delay_encode(endpoint))
core.table.clear(endpoint_buffer)

local endpointslices = endpoint.endpoints
for _, endpointslice in ipairs(endpointslices or {}) do
if endpointslice.addresses then
local addresses = endpointslices.addresses
for _, port in ipairs(endpoint.ports or {}) do
local port_name
if port.name then
port_name = port.name
elseif port.targetPort then
port_name = tostring(port.targetPort)
else
port_name = tostring(port.port)
end

if endpointslice.conditions and endpointslice.condition.ready then
local nodes = endpoint_buffer[port_name]
if nodes == nil then
nodes = core.table.new(0, #endpointslices * #addresses)
endpoint_buffer[port_name] = nodes
end

for _, address in ipairs(endpointslices.addresses) do
core.table.insert(nodes, {
host = address.ip,
port = port.port,
weight = handle.default_weight
})
end
end
end
end
end

for _, ports in pairs(endpoint_buffer) do
for _, nodes in pairs(ports) do
core.table.sort(nodes, sort_nodes_cmp)
end
end
local endpoint_key = endpoint.metadata.namespace .. "/" .. endpoint.metadata.name
local endpoint_content = core.json.encode(endpoint_buffer, true)
local endpoint_version = ngx.crc32_long(endpoint_content)

local _, err
_, err = handle.endpoint_dict:safe_set(endpoint_key .. "#version", endpoint_version)
if err then
core.log.error("set endpoint version into discovery DICT failed, ", err)
return
end
_, err = handle.endpoint_dict:safe_set(endpoint_key, endpoint_content)
if err then
core.log.error("set endpoint into discovery DICT failed, ", err)
handle.endpoint_dict:delete(endpoint_key .. "#version")
end
end

local function on_endpoint_modified(handle, endpoint)
if handle.namespace_selector and
Expand Down Expand Up @@ -367,8 +430,13 @@ local function single_mode_init(conf)
end

local default_weight = conf.default_weight

local endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
local endpoints_informer, err
if conf.watch_endpoint_slices_schema then
endpoints_informer, err = informer_factory.new("discovery.k8s.io", "v1",
"EndpointSlice", "endpointslices", "")
else
endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
end
if err then
error(err)
return
Expand All @@ -377,8 +445,13 @@ local function single_mode_init(conf)
setup_namespace_selector(conf, endpoints_informer)
setup_label_selector(conf, endpoints_informer)

endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
if conf.watch_endpoint_slices_schema then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
end
endpoints_informer.on_deleted = on_endpoint_deleted
endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list
Expand Down Expand Up @@ -463,7 +536,13 @@ local function multiple_mode_init(confs)

local default_weight = conf.default_weight

local endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
local endpoints_informer, err
if conf.watch_endpoint_slices_schema then
endpoints_informer, err = informer_factory.new("discovery.k8s.io", "v1",
"EndpointSlice", "endpointslices", "")
else
endpoints_informer, err = informer_factory.new("", "v1", "Endpoints", "endpoints", "")
end
if err then
error(err)
return
Expand All @@ -472,8 +551,13 @@ local function multiple_mode_init(confs)
setup_namespace_selector(conf, endpoints_informer)
setup_label_selector(conf, endpoints_informer)

endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
if conf.watch_endpoint_slices_schema then
endpoints_informer.on_added = on_endpoint_slices_modified
endpoints_informer.on_modified = on_endpoint_slices_modified
else
endpoints_informer.on_added = on_endpoint_modified
endpoints_informer.on_modified = on_endpoint_modified
end
endpoints_informer.on_deleted = on_endpoint_deleted
endpoints_informer.pre_list = pre_list
endpoints_informer.post_list = post_list
Expand Down
7 changes: 7 additions & 0 deletions apisix/discovery/kubernetes/schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ local shared_size_schema = {
default = "1m",
}

local watch_endpoint_slices_schema = {
type = "boolean",
default = false,
}

return {
anyOf = {
{
Expand Down Expand Up @@ -160,6 +165,7 @@ return {
label_selector = label_selector_schema,
default_weight = default_weight_schema,
shared_size = shared_size_schema,
watch_endpoint_slices = watch_endpoint_slices_schema,
},
},
{
Expand Down Expand Up @@ -202,6 +208,7 @@ return {
label_selector = label_selector_schema,
default_weight = default_weight_schema,
shared_size = shared_size_schema,
watch_endpoint_slices = watch_endpoint_slices_schema,
},
required = { "id", "service", "client" }
},
Expand Down
8 changes: 7 additions & 1 deletion docs/en/latest/discovery/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ discovery:

# reserved lua shared memory size,1m memory can store about 1000 pieces of endpoint
shared_size: 1m #default 1m

# if watch_endpoint_slices setting true, watch apiserver with endpointslices instead of endpoints
watch_endpoint_slices: false #default false
```

If the Kubernetes service discovery runs inside a pod, you can use minimal configuration:
Expand Down Expand Up @@ -220,6 +223,9 @@ discovery:

# reserved lua shared memory size,1m memory can store about 1000 pieces of endpoint
shared_size: 1m #default 1m

# if watch_endpoint_slices setting true, watch apiserver with endpointslices instead of endpoints
watch_endpoint_slices: false #default false
```

Multi-Kubernetes service discovery does not fill default values for service and client fields, you need to fill them according to the cluster configuration.
Expand Down Expand Up @@ -312,7 +318,7 @@ metadata:
name: apisix-test
rules:
- apiGroups: [ "" ]
resources: [ endpoints ]
resources: [ endpoints,endpointslices ]
verbs: [ get,list,watch ]
---

Expand Down
8 changes: 7 additions & 1 deletion docs/zh/latest/discovery/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ discovery:

# reserved lua shared memory size, 1m memory can store about 1000 pieces of endpoint
shared_size: 1m #default 1m

# if watch_endpoint_slices setting true, watch apiserver with endpointslices instead of endpoints
watch_endpoint_slices: false #default false
```

如果 Kubernetes 服务发现运行在 Pod 内,你可以使用如下最简配置:
Expand Down Expand Up @@ -219,6 +222,9 @@ discovery:

# reserved lua shared memory size,1m memory can store about 1000 pieces of endpoint
shared_size: 1m #default 1m

# if watch_endpoint_slices setting true, watch apiserver with endpointslices instead of endpoints
watch_endpoint_slices: false #default false
```

多集群模式 Kubernetes 服务发现没有为 `service` 和 `client` 域填充默认值,你需要根据集群配置情况自行填充。
Expand Down Expand Up @@ -310,7 +316,7 @@ metadata:
name: apisix-test
rules:
- apiGroups: [ "" ]
resources: [ endpoints ]
resources: [ endpoints,endpointslices ]
verbs: [ get,list,watch ]
---

Expand Down
42 changes: 42 additions & 0 deletions t/kubernetes/discovery/kubernetes.t
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
dongjiang1989 marked this conversation as resolved.
Show resolved Hide resolved
"shared_size": "1m",
"default_weight": 50
}
Expand Down Expand Up @@ -162,6 +163,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"shared_size": "1m",
"default_weight": 50
}
Expand Down Expand Up @@ -198,6 +200,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"shared_size": "2m",
"default_weight": 50
}
Expand Down Expand Up @@ -232,6 +235,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"shared_size": "1m",
"default_weight": 33
}
Expand Down Expand Up @@ -283,6 +287,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"default_weight": 50,
"shared_size": "1m"
},
Expand All @@ -296,6 +301,7 @@ GET /compare
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": false,
"default_weight": 33,
"shared_size": "2m"
}
Expand All @@ -304,3 +310,39 @@ GET /compare
Content-type: application/json
--- response_body
true



=== TEST 6: set watch_endpoint_slices true and use kubernetes endpointslices api
--- yaml_config
apisix:
node_listen: 1984
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
kubernetes:
client:
token: ${KUBERNETES_CLIENT_TOKEN}
default_weight: 33
watch_endpoint_slices: true
--- request
GET /compare
{
"service": {
"schema": "https",
"host": "${KUBERNETES_SERVICE_HOST}",
"port": "${KUBERNETES_SERVICE_PORT}"
},
"client": {
"token": "${KUBERNETES_CLIENT_TOKEN}"
},
"watch_endpoint_slices": true,
"shared_size": "1m",
"default_weight": 33
}
--- more_headers
Content-type: application/json
--- response_body
true
Loading
Loading