Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(clustering/rpc): rpc batching on concentrator #14055

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 95 additions & 47 deletions kong/clustering/rpc/concentrator.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ local queue = require("kong.clustering.rpc.queue")
local cjson = require("cjson")
local jsonrpc = require("kong.clustering.rpc.json_rpc_v2")
local rpc_utils = require("kong.clustering.rpc.utils")
local isarray = require("table.isarray")
local isempty = require("table.isempty")
local tb_insert = table.insert


local setmetatable = setmetatable
Expand Down Expand Up @@ -90,6 +93,67 @@ local function enqueue_notifications(notifications, notifications_queue)
end


function _M:process_one_response(payload)
assert(payload.jsonrpc == jsonrpc.VERSION)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle errors here. It could be an error of the opponent thus we should not fail in this case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this PR just refactored and moved the original code, kept the logic, we could change it later.

local payload_id = payload.id

-- response
local cb = self.interest[payload_id]
self.interest[payload_id] = nil -- edge trigger only once

if not cb then
ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload_id, ", dropping it")
return
end

local res, err = cb(payload)
if not res then
ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ",
payload_id, ", err: ", err)
end
end


function _M:process_one_request(target_id, reply_to, payload, collection)
local payload_id = payload.id
StarlightIbuki marked this conversation as resolved.
Show resolved Hide resolved

local res, err = self.manager:_local_call(target_id, payload.method,
payload.params, not payload_id)

-- notification has no callback or id
if not payload_id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
return
end

if res then
-- call success
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload_id,
result = res,
}, collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err)
end

else
-- call failure
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload_id,
error = {
code = jsonrpc.SERVER_ERROR,
message = tostring(err),
}
}, collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
end
end
end


function _M:_event_loop(lconn)
local notifications_queue = queue.new(4096)
local rpc_resp_channel_name = RESP_CHANNEL_PREFIX .. self.worker_id
Expand All @@ -116,21 +180,16 @@ function _M:_event_loop(lconn)
if n.channel == rpc_resp_channel_name then
-- an response for a previous RPC call we asked for
local payload = cjson_decode(n.payload)
assert(payload.jsonrpc == jsonrpc.VERSION)

-- response
local cb = self.interest[payload.id]
self.interest[payload.id] = nil -- edge trigger only once

if cb then
local res, err = cb(payload)
if not res then
ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ",
payload.id, ", err: ", err)
end
if not isarray(payload) then
StarlightIbuki marked this conversation as resolved.
Show resolved Hide resolved
-- one rpc response
self:process_one_response(payload)

else
ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload.id, ", dropping it")
-- batch rpc response
for _, v in ipairs(payload) do
self:process_one_response(v)
end
end

else
Expand All @@ -153,45 +212,29 @@ function _M:_event_loop(lconn)
local reply_to = assert(call.reply_to,
"unknown requester for RPC")

local res, err = self.manager:_local_call(target_id, payload.method,
payload.params, not payload.id)
if not isarray(payload) then
-- one rpc call
self:process_one_request(target_id, reply_to, payload)

-- notification has no callback or id
if not payload.id then
ngx_log(ngx_DEBUG, "[rpc] notification has no response")
goto continue
end
else
local collection = {}

if res then
-- call success
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload.id,
result = res,
})
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err)
-- batching rpc call
for _, v in ipairs(payload) do
self:process_one_request(target_id, reply_to, v, collection)
end

else
-- call failure
res, err = self:_enqueue_rpc_response(reply_to, {
jsonrpc = jsonrpc.VERSION,
id = payload.id,
error = {
code = jsonrpc.SERVER_ERROR,
message = tostring(err),
}
})
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err)
if not isempty(collection) then
local res, err = self:_enqueue_rpc_response(reply_to, collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err)
end
end
end
end -- if not isarray(payload)

::continue::
end
end
end
end -- for _, call
end -- if n.channel == rpc_resp_channel_name
end -- while true

local res, err = lconn:wait_for_notification()
if not res then
Expand All @@ -217,7 +260,7 @@ function _M:_event_loop(lconn)
else
notifications_queue:push(res)
end
end
end -- while not exiting()
end


Expand Down Expand Up @@ -270,7 +313,12 @@ end


-- enqueue a RPC response from CP worker with ID worker_id
function _M:_enqueue_rpc_response(worker_id, payload)
function _M:_enqueue_rpc_response(worker_id, payload, collection)
if collection then
tb_insert(collection, payload)
return
end

local sql = string_format("SELECT pg_notify(%s, %s);",
self.db.connector:escape_literal(RESP_CHANNEL_PREFIX .. worker_id),
self.db.connector:escape_literal(cjson_encode(payload)))
Expand Down
Loading