Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ai-proxy plugin #11499

Merged
merged 87 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from 79 commits
Commits
Show all changes
87 commits
Select commit Hold shift + click to select a range
b98a48f
feat: ai proxy plugin
shreemaan-abhishek Aug 16, 2024
8188ae4
remove subrequest
shreemaan-abhishek Aug 16, 2024
7b83b3a
fix diff test
shreemaan-abhishek Aug 16, 2024
97cafa5
long line fix
shreemaan-abhishek Aug 16, 2024
35b1787
completions typo in consts
shreemaan-abhishek Aug 16, 2024
e18caef
license
shreemaan-abhishek Aug 16, 2024
28f06ae
plugins.t fix
shreemaan-abhishek Aug 16, 2024
82f9692
handle empty req body problem
shreemaan-abhishek Aug 18, 2024
0577e8e
auth schema fix
shreemaan-abhishek Aug 18, 2024
e5f00f7
scheme and method
shreemaan-abhishek Aug 18, 2024
c307b04
auth and model.name required
shreemaan-abhishek Aug 18, 2024
ef4cf84
scheme in lua code forgot to commit
shreemaan-abhishek Aug 18, 2024
4bf6bd2
tests
shreemaan-abhishek Aug 18, 2024
42adfd1
lint
shreemaan-abhishek Aug 18, 2024
0af00ae
add docs
shreemaan-abhishek Aug 20, 2024
aff56a0
options merger test
shreemaan-abhishek Aug 20, 2024
f25f21a
fix encryption mode comment
shreemaan-abhishek Aug 20, 2024
d2d253e
fix(lint): local ngx
shreemaan-abhishek Aug 20, 2024
58ca8a7
fix lint
shreemaan-abhishek Aug 21, 2024
f146f20
index to json
shreemaan-abhishek Aug 21, 2024
2317aa8
reindex
shreemaan-abhishek Aug 21, 2024
3ac0fe5
unsupported provider
shreemaan-abhishek Aug 22, 2024
6e31cfe
remove , nil
shreemaan-abhishek Aug 22, 2024
e302360
move to core.request
shreemaan-abhishek Aug 22, 2024
83f2197
update empty body test
shreemaan-abhishek Aug 22, 2024
bcc21cb
fix way to set upstream
shreemaan-abhishek Aug 22, 2024
10a07c1
add log
shreemaan-abhishek Aug 22, 2024
7220c08
response_streaming -> stream
shreemaan-abhishek Aug 22, 2024
a4afb30
refactor override schema
shreemaan-abhishek Aug 22, 2024
6248005
content type update
shreemaan-abhishek Aug 22, 2024
e88683c
remove completions
shreemaan-abhishek Aug 22, 2024
7d9c075
source -> type
shreemaan-abhishek Aug 22, 2024
9823570
fix lint
shreemaan-abhishek Aug 22, 2024
94d00f4
or -> and
shreemaan-abhishek Aug 23, 2024
b24e439
core.utils -> resolver
shreemaan-abhishek Aug 23, 2024
5ca70f3
global pcall
shreemaan-abhishek Aug 23, 2024
284ad76
rname test file
shreemaan-abhishek Aug 23, 2024
6baa7d1
use has_prefix
shreemaan-abhishek Aug 23, 2024
bdab563
fix upstream handling
shreemaan-abhishek Aug 23, 2024
2d0a7a1
dont modify tfsp
shreemaan-abhishek Aug 23, 2024
530448f
subrequest
shreemaan-abhishek Aug 27, 2024
ed11fa4
subrequest log check
shreemaan-abhishek Aug 27, 2024
cba307a
unused var
shreemaan-abhishek Aug 27, 2024
3febd29
http version check
shreemaan-abhishek Aug 28, 2024
e566a37
reindex
shreemaan-abhishek Aug 30, 2024
4c13cef
Merge branch 'master' of github.com:apache/apisix into ai-proxy
shreemaan-abhishek Aug 30, 2024
28c9c4d
Revert "subrequest"
shreemaan-abhishek Sep 3, 2024
780561d
Revert "subrequest log check"
shreemaan-abhishek Sep 3, 2024
4ffdd85
use httpc to request LLM
shreemaan-abhishek Sep 3, 2024
0521f89
pass through test
shreemaan-abhishek Sep 3, 2024
bd8309b
clean up
shreemaan-abhishek Sep 3, 2024
6f5d158
don't handle upstream when proxy_buffering
shreemaan-abhishek Sep 4, 2024
d4f3a5a
test scheme fix
shreemaan-abhishek Sep 4, 2024
1bfeac9
test path fix
shreemaan-abhishek Sep 4, 2024
25975c0
support SSE in tests
shreemaan-abhishek Sep 4, 2024
7c77ed6
cleanup
shreemaan-abhishek Sep 4, 2024
fa46abe
CLEANUP
shreemaan-abhishek Sep 4, 2024
45e4f98
cleanup
shreemaan-abhishek Sep 4, 2024
99af867
use `endpoint` instead of several config fields
shreemaan-abhishek Sep 4, 2024
11aef59
cleanup
shreemaan-abhishek Sep 4, 2024
4eb0ff4
`delayed_access` -> `disable_proxy_buffering_access_phase`
shreemaan-abhishek Sep 4, 2024
92ebd9d
cleanup
shreemaan-abhishek Sep 4, 2024
c0dc59e
scalable auth schema
shreemaan-abhishek Sep 4, 2024
99cf3a4
fix lint
shreemaan-abhishek Sep 5, 2024
243b5f5
remove body as auth param
shreemaan-abhishek Sep 5, 2024
217b5af
query param auth test
shreemaan-abhishek Sep 5, 2024
6661a0e
fix test
shreemaan-abhishek Sep 5, 2024
1c00e2c
fix lint
shreemaan-abhishek Sep 5, 2024
37cdd7b
remove forgotten header appender part
shreemaan-abhishek Sep 5, 2024
1062cc2
remove subrequest 😩
shreemaan-abhishek Sep 5, 2024
7b23623
Revert "remove subrequest 😩"
shreemaan-abhishek Sep 5, 2024
4278fd5
use encode_args instead of custom func
shreemaan-abhishek Sep 5, 2024
d915292
cleanup
shreemaan-abhishek Sep 5, 2024
8c3bcb3
clean upstream.lua
shreemaan-abhishek Sep 5, 2024
bcca3f2
optimize
shreemaan-abhishek Sep 5, 2024
929cbb1
fix lint
shreemaan-abhishek Sep 6, 2024
e149170
pass data to upstream in streaming way
shreemaan-abhishek Sep 6, 2024
d346d38
Reapply "remove subrequest 😩"
shreemaan-abhishek Sep 9, 2024
3429e03
update doc
shreemaan-abhishek Sep 9, 2024
7c08290
Apply suggestions from code review
shreemaan-abhishek Sep 10, 2024
977cf68
code review
shreemaan-abhishek Sep 10, 2024
073b3f8
code review
shreemaan-abhishek Sep 11, 2024
5831108
code review
shreemaan-abhishek Sep 11, 2024
ab4c37e
update priority
shreemaan-abhishek Sep 12, 2024
cdd37db
update priority
shreemaan-abhishek Sep 12, 2024
e0e3e15
fix test
shreemaan-abhishek Sep 12, 2024
f6768e4
Merge branch 'master' of github.com:apache/apisix into ai-proxy
shreemaan-abhishek Sep 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,12 @@ install: runtime
$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/utils
$(ENV_INSTALL) apisix/utils/*.lua $(ENV_INST_LUADIR)/apisix/utils/

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/ai-proxy
$(ENV_INSTALL) apisix/plugins/ai-proxy/*.lua $(ENV_INST_LUADIR)/apisix/plugins/ai-proxy

$(ENV_INSTALL) -d $(ENV_INST_LUADIR)/apisix/plugins/ai-proxy/drivers
$(ENV_INSTALL) apisix/plugins/ai-proxy/drivers/*.lua $(ENV_INST_LUADIR)/apisix/plugins/ai-proxy/drivers

$(ENV_INSTALL) bin/apisix $(ENV_INST_BINDIR)/apisix


Expand Down
1 change: 1 addition & 0 deletions apisix/cli/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ local _M = {
"proxy-rewrite",
"workflow",
"api-breaker",
"ai-proxy",
"limit-conn",
"limit-count",
"limit-req",
Expand Down
16 changes: 16 additions & 0 deletions apisix/core/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

local lfs = require("lfs")
local log = require("apisix.core.log")
local json = require("apisix.core.json")
local io = require("apisix.core.io")
local req_add_header
if ngx.config.subsystem == "http" then
Expand Down Expand Up @@ -334,6 +335,21 @@ function _M.get_body(max_size, ctx)
end


function _M.get_request_body_table()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decode body according content-type or rename the method, e.g. get_json_request_body_table?

local body, err = _M.get_body()
if not body then
return nil, { message = "could not get body: " .. (err or "request body is empty") }
end

local body_tab, err = json.decode(body)
if not body_tab then
return nil, { message = "could not get parse JSON request body: " .. err }
end

return body_tab
end


function _M.get_scheme(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
Expand Down
1 change: 1 addition & 0 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,7 @@ function _M.http_access_phase()
end



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no substantive changes, please do not change the code style.

function _M.dubbo_access_phase()
ngx.ctx = fetch_ctx()
end
Expand Down
136 changes: 136 additions & 0 deletions apisix/plugins/ai-proxy.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local schema = require("apisix.plugins.ai-proxy.schema")
local require = require
local pcall = pcall
local ngx_req = ngx.req
local ngx_print = ngx.print
local ngx_flush = ngx.flush

local plugin_name = "ai-proxy"
local _M = {
version = 0.5,
priority = 1004,
name = plugin_name,
schema = schema,
}


function _M.check_schema(conf)
local ai_driver = pcall(require, "apisix.plugins.ai-proxy.drivers." .. conf.model.provider)
if not ai_driver then
return false, "provider: " .. conf.model.provider .. " is not supported."
end
return core.schema.check(schema.plugin_schema, conf)
end


local CONTENT_TYPE_JSON = "application/json"
bzp2010 marked this conversation as resolved.
Show resolved Hide resolved


local function keepalive_or_close(conf, httpc)
if conf.set_keepalive then
httpc:set_keepalive(10000, 100)
bzp2010 marked this conversation as resolved.
Show resolved Hide resolved
return
end
httpc:close()
end


function _M.access(conf, ctx)
local ct = core.request.header(ctx, "Content-Type") or CONTENT_TYPE_JSON
if not core.string.has_prefix(ct, CONTENT_TYPE_JSON) then
return 400, "unsupported content-type: " .. ct
end

local request_table, err = core.request.get_request_body_table()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new naming style?

request_table, count_num, flag_boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although adding a suffix makes it more readable, it is not the style used in the project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I got it now 😂 . request_table makes it more readable and it can be understood that it's not a string.

if not request_table then
return 400, err
end

local ok, err = core.schema.check(schema.chat_request_schema, request_table)
if not ok then
return 400, "request format doesn't match schema: " .. err
end

if conf.model.name then
request_table.model = conf.model.name
end

if core.table.try_read_attr(conf, "model", "options", "stream") then
request_table.stream = true
end

local ai_driver = require("apisix.plugins.ai-proxy.drivers." .. conf.model.provider)
shreemaan-abhishek marked this conversation as resolved.
Show resolved Hide resolved
local res, err, httpc = ai_driver.request(conf, request_table, ctx)
if not res then
core.log.error("failed to send request to AI service: ", err)
return 500
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the predefined Ngx constants?

ngx.HTTP_NOT_ACCEPTABLE = 406
ngx.HTTP_REQUEST_TIMEOUT = 408
ngx.HTTP_CONFLICT = 409
ngx.HTTP_GONE = 410
ngx.HTTP_UPGRADE_REQUIRED = 426
ngx.HTTP_TOO_MANY_REQUESTS = 429
ngx.HTTP_CLOSE = 444
ngx.HTTP_ILLEGAL = 451
ngx.HTTP_INTERNAL_SERVER_ERROR = 500
ngx.HTTP_METHOD_NOT_IMPLEMENTED = 501
ngx.HTTP_BAD_GATEWAY = 502
ngx.HTTP_SERVICE_UNAVAILABLE = 503
ngx.HTTP_GATEWAY_TIMEOUT = 504
ngx.HTTP_VERSION_NOT_SUPPORTED = 505
ngx.HTTP_INSUFFICIENT_STORAGE = 507

end

local body_reader = res.body_reader
if not body_reader then
core.log.error("LLM sent no response body")
return 500
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

end

if conf.passthrough then
ngx_req.init_body()
while true do
local chunk, err = body_reader() -- will read chunk by chunk
if err then
core.log.error("failed to read response chunk: ", err)
break
end
if not chunk then
break
end
ngx_req.append_body(chunk)
end
ngx_req.finish_body()
keepalive_or_close(conf, httpc)
return
end

if core.table.try_read_attr(conf, "model", "options", "stream") then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if core.table.try_read_attr(conf, "model", "options", "stream") then
if request_table.stream then

while true do
local chunk, err = body_reader() -- will read chunk by chunk
if err then
core.log.error("failed to read response chunk: ", err)
break
end
if not chunk then
break
end
ngx_print(chunk)
ngx_flush(true)
end
keepalive_or_close(conf, httpc)
return
else
local res_body, err = res:read_body()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also using body_reader, ngx_print * N + ngx_flush * 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose is to save the intermediate buffering process and directly
write the response body to the response buffer by calling ngx.print
multiple times. Finally, call ngx.flush once more.

But,

$ restydoc -s ngx.print

Please note that both "ngx.print" and ngx.say will always invoke the
whole Nginx output body filter chain, which is an expensive operation.
So be careful when calling either of these two in a tight loop; buffer
the data yourself in Lua and save the calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, I think we don't need ngx_print * N + ngx_flush * 1 as reading the response body all at once is favourable for "stream": false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @shreemaan-abhishek Is it resolved?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

if not res_body then
core.log.error("failed to read response body: ", err)
return 500
end
keepalive_or_close(conf, httpc)
return res.status, res_body
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return res.status, res_body
httpc:set_keepalive(10000, 100)
return res.status, res_body

also keepalive before returning?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thank you.

end
end

return _M
bzp2010 marked this conversation as resolved.
Show resolved Hide resolved
92 changes: 92 additions & 0 deletions apisix/plugins/ai-proxy/drivers/openai.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local _M = {}

local core = require("apisix.core")
local http = require("resty.http")
local url = require("socket.url")

local pairs = pairs
local type = type

-- globals
local DEFAULT_HOST = "api.openai.com"
local DEFAULT_PORT = 443


function _M.request(conf, request_table, ctx)
local httpc, err = http.new()
if not httpc then
return nil, "failed to create http client to send request to LLM server: " .. err
end
httpc:set_timeout(conf.timeout)

local endpoint = core.table.try_read_attr(conf, "override", "endpoint")
local parsed_url
if endpoint then
parsed_url = url.parse(endpoint)
end

local ok, err = httpc:connect({
scheme = parsed_url.scheme or "https",
host = parsed_url.host or DEFAULT_HOST,
port = parsed_url.port or DEFAULT_PORT,
ssl_verify = conf.ssl_verify,
ssl_server_name = parsed_url.host or DEFAULT_HOST,
pool_size = conf.keepalive and conf.keepalive_pool,
})

if not ok then
return nil, "failed to connect to LLM server: " .. err
end

local query_params = ""
if conf.auth.query and type(conf.auth.query) == "table" then
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if conf.auth.query and type(conf.auth.query) == "table" then
if type(conf.auth.query) == "table" then

query_params = core.string.encode_args(conf.auth.query)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, from lua-resty-http, params.query already support table type

local function _format_request(self, params)
    local version = params.version
    local headers = params.headers or {}

    local query = params.query or ""
    if type(query) == "table" then
        query = "?" .. ngx_encode_args(query)
    elseif query ~= "" and str_sub(query, 1, 1) ~= "?" then
        query = "?" .. query
    end

    -- Initialize request
    local req = {
        str_upper(params.method),
        " ",
        self.path_prefix or "",
        params.path,
        query,
        HTTP[version],
        -- Pre-allocate slots for minimum headers and carriage return.
        "",
        "",
        "",
    }
    local c = 7 -- req table index it's faster to do this inline vs table.insert

    -- Append headers
    for key, values in pairs(headers) do
        key = tostring(key)

        if type(values) == "table" then
            for _, value in pairs(values) do
                req[c] = key .. ": " .. tostring(value) .. "\r\n"
                c = c + 1
            end

        else
            req[c] = key .. ": " .. tostring(values) .. "\r\n"
            c = c + 1
        end
    end

    -- Close headers
    req[c] = "\r\n"

    return tbl_concat(req)
end

if query_params and query_params ~= "" then
query_params = "?" .. query_params
end
end

local path = (parsed_url.path or "/v1/chat/completions") .. query_params
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
local path = (parsed_url.path or "/v1/chat/completions") .. query_params
local path = (parsed_url.path or DEFAULT_URI) .. query_params


local headers = (conf.auth.header or {})
headers["Content-Type"] = "application/json"
local params = {
method = "POST",
headers = headers,
keepalive = conf.keepalive,
ssl_verify = conf.ssl_verify,
path = path,
}

if conf.model.options then
for opt, val in pairs(conf.model.options) do
request_table[opt] = val
end
end
params.body = core.json.encode(request_table)

local res, err = httpc:request(params)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with httpc:request_uri to avoid invoking close or keepalive elsewhere?

#11450

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

request_uri doesn't allow streaming responses.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with httpc:request_uri to avoid invoking close or keepalive elsewhere?

#11450

You need to support streaming chunk outside, so httpc:request here.
But remember httpc:close when conf.keepalive is false.

if not res then
return 500, "failed to send request to LLM server: " .. err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return 500, "failed to send request to LLM server: " .. err
return internal_server_error, "failed to send request to LLM server: " .. err

end

return res, nil, httpc
end

return _M
Loading
Loading