-
Notifications
You must be signed in to change notification settings - Fork 2.5k
/
consumer.lua
158 lines (124 loc) · 4.58 KB
/
consumer.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local secret = require("apisix.secret")
local plugin = require("apisix.plugin")
local plugin_checker = require("apisix.plugin").plugin_checker
local error = error
local ipairs = ipairs
local pairs = pairs
local type = type
local consumers
local _M = {
version = 0.3,
}
local lrucache = core.lrucache.new({
ttl = 300, count = 512
})
local function plugin_consumer()
local plugins = {}
if consumers.values == nil then
return plugins
end
for _, consumer in ipairs(consumers.values) do
if type(consumer) ~= "table" then
goto CONTINUE
end
for name, config in pairs(consumer.value.plugins or {}) do
local plugin_obj = plugin.get(name)
if plugin_obj and plugin_obj.type == "auth" then
if not plugins[name] then
plugins[name] = {
nodes = {},
conf_version = consumers.conf_version
}
end
local new_consumer = core.table.clone(consumer.value)
-- Note: the id here is the key of consumer data, which
-- is 'username' field in admin
new_consumer.consumer_name = new_consumer.id
new_consumer.auth_conf = config
new_consumer.modifiedIndex = consumer.modifiedIndex
core.log.info("consumer:", core.json.delay_encode(new_consumer))
core.table.insert(plugins[name].nodes, new_consumer)
end
end
::CONTINUE::
end
return plugins
end
function _M.plugin(plugin_name)
local plugin_conf = core.lrucache.global("/consumers",
consumers.conf_version, plugin_consumer)
return plugin_conf[plugin_name]
end
-- attach chosen consumer to the ctx, used in auth plugin
function _M.attach_consumer(ctx, consumer, conf)
ctx.consumer = consumer
ctx.consumer_name = consumer.consumer_name
ctx.consumer_group_id = consumer.group_id
ctx.consumer_ver = conf.conf_version
end
function _M.consumers()
if not consumers then
return nil, nil
end
return consumers.values, consumers.conf_version
end
local function create_consume_cache(consumers_conf, key_attr)
local consumer_names = {}
for _, consumer in ipairs(consumers_conf.nodes) do
core.log.info("consumer node: ", core.json.delay_encode(consumer))
local new_consumer = core.table.clone(consumer)
new_consumer.auth_conf = secret.fetch_secrets(new_consumer.auth_conf, true,
new_consumer.auth_conf, "")
consumer_names[new_consumer.auth_conf[key_attr]] = new_consumer
end
return consumer_names
end
function _M.consumers_kv(plugin_name, consumer_conf, key_attr)
local consumers = lrucache("consumers_key#" .. plugin_name, consumer_conf.conf_version,
create_consume_cache, consumer_conf, key_attr)
return consumers
end
local function check_consumer(consumer)
return plugin_checker(consumer, core.schema.TYPE_CONSUMER)
end
local function filter(consumer)
if not consumer.value then
return
end
-- We expect the id is the same as username. Fix up it here if it isn't.
consumer.value.id = consumer.value.username
end
function _M.init_worker()
local err
local cfg = {
automatic = true,
item_schema = core.schema.consumer,
checker = check_consumer,
}
if core.config.type ~= "etcd" then
cfg.filter = filter
end
consumers, err = core.config.new("/consumers", cfg)
if not consumers then
error("failed to create etcd instance for fetching consumers: " .. err)
return
end
end
return _M