Skip to content

Commit

Permalink
rabbit_db_*: Handle breaking change in khepri adv API return type
Browse files Browse the repository at this point in the history
All callers of `khepri_adv` and `khepri_tx_adv` need updates to handle
the now consistent return type of `khepri:node_props_map()` in Khepri
0.17.

We don't need any compatibility code to handle "either the old return
type or the new return type" because the translation is done entirely
in the "client side" code in Khepri - meaning that the return value from
the Ra server is the same but it is translated differently by the
functions in `khepri_adv` and `khepri_tx_adv`.
  • Loading branch information
the-mikedavis committed Dec 23, 2024
1 parent 56c447e commit d4c49f2
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 126 deletions.
48 changes: 32 additions & 16 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -837,17 +837,25 @@ delete_all_for_exchange_in_khepri(X = #exchange{name = XName}, OnlyDurable, Remo
end,
{deleted, X, Bindings, delete_for_destination_in_khepri(XName, OnlyDurable)}.

delete_for_source_in_khepri(#resource{virtual_host = VHost, name = Name}) ->
Path = khepri_route_path(
VHost,
Name,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = #if_has_data{}),
{ok, Bindings} = khepri_tx_adv:delete_many(Path),
maps:fold(fun(_P, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], Bindings).
delete_for_source_in_khepri(#resource{virtual_host = VHost, name = SrcName}) ->
Pattern = khepri_route_path(
VHost,
SrcName,
?KHEPRI_WILDCARD_STAR, %% Kind
?KHEPRI_WILDCARD_STAR, %% DstName
#if_has_data{}), %% RoutingKey
{ok, Bindings} = khepri_tx_adv:delete_many(Pattern),
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_ROUTE_PATH(
VHost, SrcName, _Kind, _Name, _RoutingKey),
#{data := Set}} ->
sets:to_list(Set) ++ Acc;
{_, _} ->
Acc
end
end, [], Bindings).

%% -------------------------------------------------------------------
%% delete_for_destination_in_mnesia().
Expand Down Expand Up @@ -892,14 +900,22 @@ delete_for_destination_in_mnesia(DstName, OnlyDurable, Fun) ->
delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, name = Name}, OnlyDurable) ->
Pattern = khepri_route_path(
VHost,
_SrcName = ?KHEPRI_WILDCARD_STAR,
?KHEPRI_WILDCARD_STAR, %% SrcName
Kind,
Name,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
?KHEPRI_WILDCARD_STAR), %% RoutingKey
{ok, BindingsMap} = khepri_tx_adv:delete_many(Pattern),
Bindings = maps:fold(fun(_, #{data := Set}, Acc) ->
sets:to_list(Set) ++ Acc
end, [], BindingsMap),
Bindings = maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_ROUTE_PATH(
VHost, _SrcName, Kind, Name, _RoutingKey),
#{data := Set}} ->
sets:to_list(Set) ++ Acc;
{_, _} ->
Acc
end
end, [], BindingsMap),
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
lists:keysort(#binding.source, Bindings), OnlyDurable).

Expand Down
26 changes: 16 additions & 10 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ update_in_khepri(XName, Fun) ->
Path = khepri_exchange_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := X, payload_version := Vsn}} ->
{ok, #{Path := #{data := X, payload_version := Vsn}}} ->
X1 = Fun(X),
UpdatePath =
khepri_path:combine_with_conditions(
Expand Down Expand Up @@ -534,8 +534,7 @@ next_serial_in_khepri(XName) ->
Path = khepri_exchange_serial_path(XName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Serial,
payload_version := Vsn}} ->
{ok, #{Path := #{data := Serial, payload_version := Vsn}}} ->
UpdatePath =
khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Expand Down Expand Up @@ -711,13 +710,20 @@ delete_all_in_khepri_tx(VHostName) ->
{ok, NodeProps} = khepri_tx_adv:delete_many(Pattern),
Deletions =
maps:fold(
fun(_Path, #{data := X}, Deletions) ->
{deleted, #exchange{name = XName}, Bindings, XDeletions} =
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
XName, X, deleted, Bindings, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1)
fun(Path, Props, Deletions) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_EXCHANGE_PATH(VHostName, _),
#{data := X}} ->
{deleted,
#exchange{name = XName}, Bindings, XDeletions} =
rabbit_db_binding:delete_all_for_exchange_in_khepri(
X, false, true),
Deletions1 = rabbit_binding:add_deletion(
XName, X, deleted, Bindings, XDeletions),
rabbit_binding:combine_deletions(Deletions, Deletions1);
{_, _} ->
Deletions
end
end, rabbit_binding:new_deletions(), NodeProps),
{ok, Deletions}.

Expand Down
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_db_msup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ create_or_update_in_khepri(Group, Overall, Delegate, ChildSpec, Id) ->
mirroring_pid = Overall,
childspec = ChildSpec},
case rabbit_khepri:adv_get(Path) of
{ok, #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
payload_version := Vsn}} ->
{ok, #{Path := #{data := #mirrored_sup_childspec{mirroring_pid = Pid},
payload_version := Vsn}}} ->
case Overall of
Pid ->
Delegate;
Expand Down
46 changes: 24 additions & 22 deletions deps/rabbit/src/rabbit_db_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ delete_in_khepri(QueueName, OnlyDurable) ->
fun () ->
Path = khepri_queue_path(QueueName),
case khepri_tx_adv:delete(Path) of
{ok, #{data := _}} ->
{ok, #{Path := #{data := _}}} ->
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_db_binding:delete_for_destination_in_khepri(QueueName, OnlyDurable);
Expand Down Expand Up @@ -606,7 +606,7 @@ update_in_khepri(QName, Fun) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Q, payload_version := Vsn}} ->
{ok, #{Path := #{data := Q, payload_version := Vsn}}} ->
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Q1 = Fun(Q),
Expand Down Expand Up @@ -657,7 +657,7 @@ update_decorators_in_khepri(QName, Decorators) ->
Path = khepri_queue_path(QName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := Q1, payload_version := Vsn}} ->
{ok, #{Path := #{data := Q1, payload_version := Vsn}}} ->
Q2 = amqqueue:set_decorators(Q1, Decorators),
UpdatePath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
Expand Down Expand Up @@ -1075,15 +1075,12 @@ delete_transient_in_khepri(FilterFun) ->
case rabbit_khepri:adv_get_many(PathPattern) of
{ok, Props} ->
Qs = maps:fold(
fun(Path0, #{data := Q, payload_version := Vsn}, Acc)
fun(Path, #{data := Q, payload_version := Vsn}, Acc)
when ?is_amqqueue(Q) ->
case FilterFun(Q) of
true ->
Path = khepri_path:combine_with_conditions(
Path0,
[#if_payload_version{version = Vsn}]),
QName = amqqueue:get_name(Q),
[{Path, QName} | Acc];
[{Path, Vsn, QName} | Acc];
false ->
Acc
end
Expand All @@ -1102,20 +1099,7 @@ do_delete_transient_queues_in_khepri([], _FilterFun) ->
do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Res = rabbit_khepri:transaction(
fun() ->
rabbit_misc:fold_while_ok(
fun({Path, QName}, Acc) ->
%% Also see `delete_in_khepri/2'.
case khepri_tx_adv:delete(Path) of
{ok, #{data := _}} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
{ok, [{QName, Deletions} | Acc]};
{ok, _} ->
{ok, Acc};
{error, _} = Error ->
Error
end
end, [], Qs)
do_delete_transient_queues_in_khepri_tx(Qs, [])
end),
case Res of
{ok, Items} ->
Expand All @@ -1129,6 +1113,24 @@ do_delete_transient_queues_in_khepri(Qs, FilterFun) ->
Error
end.

do_delete_transient_queues_in_khepri_tx([], Acc) ->
{ok, Acc};
do_delete_transient_queues_in_khepri_tx([{Path, Vsn, QName} | Rest], Acc) ->
%% Also see `delete_in_khepri/2'.
VersionedPath = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = Vsn}]),
case khepri_tx_adv:delete(VersionedPath) of
{ok, #{Path := #{data := _}}} ->
Deletions = rabbit_db_binding:delete_for_destination_in_khepri(
QName, false),
Acc1 = [{QName, Deletions} | Acc],
do_delete_transient_queues_in_khepri_tx(Rest, Acc1);
{ok, _} ->
do_delete_transient_queues_in_khepri_tx(Rest, Acc);
{error, _} = Error ->
Error
end.

%% -------------------------------------------------------------------
%% foreach_transient().
%% -------------------------------------------------------------------
Expand Down
26 changes: 19 additions & 7 deletions deps/rabbit/src/rabbit_db_rtparams.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ set_in_khepri(Key, Term) ->
Record = #runtime_parameters{key = Key,
value = Term},
case rabbit_khepri:adv_put(Path, Record) of
{ok, #{data := Params}} ->
{ok, #{Path := #{data := Params}}} ->
{old, Params#runtime_parameters.value};
{ok, _} ->
new
Expand Down Expand Up @@ -114,7 +114,7 @@ set_in_khepri_tx(Key, Term) ->
Record = #runtime_parameters{key = Key,
value = Term},
case khepri_tx_adv:put(Path, Record) of
{ok, #{data := Params}} ->
{ok, #{Path := #{data := Params}}} ->
{old, Params#runtime_parameters.value};
{ok, _} ->
new
Expand Down Expand Up @@ -347,11 +347,23 @@ delete_vhost_in_mnesia_tx(VHostName) ->
<- mnesia:match_object(?MNESIA_TABLE, Match, read)].

delete_vhost_in_khepri(VHostName) ->
Path = khepri_vhost_rp_path(
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:adv_delete_many(Path) of
{ok, Props} ->
{ok, rabbit_khepri:collect_payloads(Props)};
Pattern = khepri_vhost_rp_path(
VHostName, ?KHEPRI_WILDCARD_STAR, ?KHEPRI_WILDCARD_STAR),
case rabbit_khepri:adv_delete_many(Pattern) of
{ok, NodePropsMap} ->
RTParams =
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_VHOST_RUNTIME_PARAM_PATH(
VHostName, _, _),
#{data := RTParam}} ->
[RTParam | Acc];
{_, _} ->
Acc
end
end, [], NodePropsMap),
{ok, RTParams};
{error, _} = Err ->
Err
end.
Expand Down
46 changes: 34 additions & 12 deletions deps/rabbit/src/rabbit_db_user.erl
Original file line number Diff line number Diff line change
Expand Up @@ -628,20 +628,42 @@ clear_all_permissions_for_vhost_in_mnesia(VHostName) ->
clear_all_permissions_for_vhost_in_khepri(VHostName) ->
rabbit_khepri:transaction(
fun() ->
UserPermissionsPath = khepri_user_permission_path(
?KHEPRI_WILDCARD_STAR, VHostName),
TopicPermissionsPath = khepri_topic_permission_path(
?KHEPRI_WILDCARD_STAR, VHostName,
?KHEPRI_WILDCARD_STAR),
{ok, UserProps} = khepri_tx_adv:delete_many(UserPermissionsPath),
{ok, TopicProps} = khepri_tx_adv:delete_many(
TopicPermissionsPath),
Deletions = rabbit_khepri:collect_payloads(
TopicProps,
rabbit_khepri:collect_payloads(UserProps)),
{ok, Deletions}
clear_all_permissions_for_vhost_in_khepri_tx(VHostName)
end, rw, #{timeout => infinity}).

clear_all_permissions_for_vhost_in_khepri_tx(VHostName) ->
UserPermissionsPattern = khepri_user_permission_path(
?KHEPRI_WILDCARD_STAR, VHostName),
TopicPermissionsPattern = khepri_topic_permission_path(
?KHEPRI_WILDCARD_STAR, VHostName,
?KHEPRI_WILDCARD_STAR),
{ok, UserNodePropsMap} = khepri_tx_adv:delete_many(UserPermissionsPattern),
{ok, TopicNodePropsMap} = khepri_tx_adv:delete_many(
TopicPermissionsPattern),
Deletions0 =
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_USER_PERMISSION_PATH(VHostName, _),
#{data := Permission}} ->
[Permission | Acc];
{_, _} ->
Acc
end
end, [], UserNodePropsMap),
Deletions1 =
maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_TOPIC_PERMISSION_PATH(VHostName, _, _),
#{data := Permission}} ->
[Permission | Acc];
{_, _} ->
Acc
end
end, Deletions0, TopicNodePropsMap),
{ok, Deletions1}.

%% -------------------------------------------------------------------
%% get_topic_permissions().
%% -------------------------------------------------------------------
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_db_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ merge_metadata_in_khepri(VHostName, Metadata) ->
Path = khepri_vhost_path(VHostName),
Ret1 = rabbit_khepri:adv_get(Path),
case Ret1 of
{ok, #{data := VHost0, payload_version := DVersion}} ->
{ok, #{Path := #{data := VHost0, payload_version := DVersion}}} ->
VHost = vhost:merge_metadata(VHost0, Metadata),
rabbit_log:debug("Updating a virtual host record ~p", [VHost]),
Path1 = khepri_path:combine_with_conditions(
Expand Down Expand Up @@ -411,10 +411,10 @@ update_in_mnesia_tx(VHostName, UpdateFun)
update_in_khepri(VHostName, UpdateFun) ->
Path = khepri_vhost_path(VHostName),
case rabbit_khepri:adv_get(Path) of
{ok, #{data := V, payload_version := DVersion}} ->
{ok, #{Path := #{data := V, payload_version := Vsn}}} ->
V1 = UpdateFun(V),
Path1 = khepri_path:combine_with_conditions(
Path, [#if_payload_version{version = DVersion}]),
Path, [#if_payload_version{version = Vsn}]),
case rabbit_khepri:put(Path1, V1) of
ok ->
V1;
Expand Down
Loading

0 comments on commit d4c49f2

Please sign in to comment.