Skip to content

Commit

Permalink
Merge pull request #309 from rabbitmq/set-default-props_to_return-bas…
Browse files Browse the repository at this point in the history
…ed-on-effective-machine-version

khepri_machine: Request `delete_reason` if effective machine version >= 2 only
  • Loading branch information
dumbbell authored Dec 17, 2024
2 parents 0db160f + e3d959c commit 8e5703e
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 26 deletions.
68 changes: 51 additions & 17 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@
%% </li>
%% <li>Moved the expiration of dedups to the `tick' aux effect (see {@link
%% handle_aux/5}). This also introduces a new command `#drop_dedups{}'.</li>
%% <li>Added the `delete_reason' to the list of properties that can be
%% returned. It is returned by default if the effective machine version is 2 or
%% more.</li>
%% </ul>
%% </td>
%% </tr>
Expand Down Expand Up @@ -98,8 +101,8 @@
%% For internal use only.
-export([clear_cache/1,
ack_triggers_execution/2,
split_query_options/1,
split_command_options/1,
split_query_options/2,
split_command_options/2,
split_put_options/1,
insert_or_update_node/6,
delete_matching_nodes/4,
Expand All @@ -118,7 +121,8 @@
get_projections/1,
has_projection/2,
get_metrics/1,
get_dedups/1]).
get_dedups/1,
get_store_id/1]).

-ifdef(TEST).
-export([do_process_sync_command/3,
Expand Down Expand Up @@ -283,7 +287,7 @@ fold(StoreId, PathPattern, Fun, Acc, Options)
is_function(Fun, 3) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
{QueryOptions, TreeOptions} = split_query_options(Options),
{QueryOptions, TreeOptions} = split_query_options(StoreId, Options),
Query = fun(State) ->
Tree = get_tree(State),
try
Expand Down Expand Up @@ -348,7 +352,8 @@ put(StoreId, PathPattern, Payload, Options)
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
Payload1 = khepri_payload:prepare(Payload),
{CommandOptions, TreeAndPutOptions} = split_command_options(Options),
{CommandOptions, TreeAndPutOptions} = split_command_options(
StoreId, Options),
Command = #put{path = PathPattern1,
payload = Payload1,
options = TreeAndPutOptions},
Expand Down Expand Up @@ -376,7 +381,7 @@ put(_StoreId, PathPattern, Payload, _Options) ->
delete(StoreId, PathPattern, Options) when ?IS_KHEPRI_STORE_ID(StoreId) ->
PathPattern1 = khepri_path:from_string(PathPattern),
khepri_path:ensure_is_valid(PathPattern1),
{CommandOptions, TreeOptions} = split_command_options(Options),
{CommandOptions, TreeOptions} = split_command_options(StoreId, Options),
%% TODO: Ensure `PutOptions' are not set this map.
Command = #delete{path = PathPattern1,
options = TreeOptions},
Expand Down Expand Up @@ -722,14 +727,16 @@ get_projections_state(StoreId, Options)
end,
process_query(StoreId, Query, Options).

-spec split_query_options(Options) -> {QueryOptions, TreeOptions} when
-spec split_query_options(StoreId, Options) ->
{QueryOptions, TreeOptions} when
StoreId :: khepri:store_id(),
Options :: QueryOptions | TreeOptions,
QueryOptions :: khepri:query_options(),
TreeOptions :: khepri:tree_options().
%% @private

split_query_options(Options) ->
Options1 = set_default_options(Options),
split_query_options(StoreId, Options) ->
Options1 = set_default_options(StoreId, Options),
maps:fold(
fun
(Option, Value, {Q, T}) when
Expand All @@ -748,15 +755,16 @@ split_query_options(Options) ->
{Q, T1}
end, {#{}, #{}}, Options1).

-spec split_command_options(Options) ->
-spec split_command_options(StoreId, Options) ->
{CommandOptions, TreeAndPutOptions} when
StoreId :: khepri:store_id(),
Options :: CommandOptions | TreeAndPutOptions,
CommandOptions :: khepri:command_options(),
TreeAndPutOptions :: khepri:tree_options() | khepri:put_options().
%% @private

split_command_options(Options) ->
Options1 = set_default_options(Options),
split_command_options(StoreId, Options) ->
Options1 = set_default_options(StoreId, Options),
maps:fold(
fun
(Option, Value, {C, TP}) when
Expand Down Expand Up @@ -799,7 +807,7 @@ split_put_options(TreeAndPutOptions) ->
{T1, P}
end, {#{}, #{}}, TreeAndPutOptions).

set_default_options(Options) ->
set_default_options(StoreId, Options) ->
%% By default, return payload-related properties. The caller can set
%% `props_to_return' to an empty map to get a minimal return value.
Options1 = case Options of
Expand All @@ -808,7 +816,21 @@ set_default_options(Options) ->
_ ->
Options#{props_to_return => ?DEFAULT_PROPS_TO_RETURN}
end,
Options1.
%% We need to remove `delete_reason' from the list if the whole cluster is
%% still using a machine version that doesn't know about it. Otherwise old
%% versions of Khepri will crash when gathering the properties.
PropsToReturn0 = maps:get(props_to_return, Options1),
PropsToReturn1 = case effective_version(StoreId) of
{ok, EffectiveMacVer} when EffectiveMacVer >= 2 ->
PropsToReturn0;
_ ->
%% `delete_reason' was added in machine version
%% 2. Also, previous versions didn't ignore
%% unknown props_to_return and crashed.
PropsToReturn0 -- [delete_reason]
end,
Options2 = Options1#{props_to_return => PropsToReturn1},
Options2.

-spec process_command(StoreId, Command, Options) -> Ret when
StoreId :: khepri:store_id(),
Expand Down Expand Up @@ -1214,11 +1236,12 @@ can_skip_fence_preliminary_query(StoreId) ->

-spec init(Params) -> State when
Params :: machine_init_args(),
State :: state().
State :: khepri_machine_v0:state().
%% @private

init(Params) ->
%% Initialize the state.
%% Initialize the state. This function always returns the oldest supported
%% state format.
State = khepri_machine_v0:init(Params),

%% Create initial "schema" if provided.
Expand Down Expand Up @@ -1742,7 +1765,7 @@ which_module(0) -> ?MODULE.
EffectiveMacVer :: ra_machine:version().
%% @doc Returns the effective state machine version of the local Ra server.

effective_version(StoreId) ->
effective_version(StoreId) when ?IS_KHEPRI_STORE_ID(StoreId) ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
case ra_counters:counters(RaServer, [effective_machine_version]) of
Expand Down Expand Up @@ -2392,6 +2415,17 @@ set_dedups(#khepri_machine{} = State, Dedups) ->
set_dedups(State, _Dedups) ->
State.

-spec get_store_id(State) -> StoreId when
State :: khepri_machine:state(),
StoreId :: khepri:store_id().
%% @doc Returns the store ID from the given state.
%%
%% @private

get_store_id(State) ->
#config{store_id = StoreId} = get_config(State),
StoreId.

-ifdef(TEST).
-spec make_virgin_state(Params) -> State when
Params :: khepri_machine:machine_init_args(),
Expand Down
8 changes: 6 additions & 2 deletions src/khepri_tx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -466,9 +466,11 @@ count(PathPattern) ->
count(PathPattern, Options) ->
PathPattern1 = khepri_tx_adv:path_from_string(PathPattern),
{State, _SideEffects} = khepri_tx_adv:get_tx_state(),
StoreId = khepri_machine:get_store_id(State),
Tree = khepri_machine:get_tree(State),
Fun = fun khepri_tree:count_node_cb/3,
{_QueryOptions, TreeOptions} = khepri_machine:split_query_options(Options),
{_QueryOptions, TreeOptions} =
khepri_machine:split_query_options(StoreId, Options),
TreeOptions1 = TreeOptions#{expect_specific_node => false},
Ret = khepri_tree:fold(Tree, PathPattern1, Fun, 0, TreeOptions1),
case Ret of
Expand Down Expand Up @@ -517,8 +519,10 @@ fold(PathPattern, Fun, Acc) ->
fold(PathPattern, Fun, Acc, Options) ->
PathPattern1 = khepri_tx_adv:path_from_string(PathPattern),
{State, _SideEffects} = khepri_tx_adv:get_tx_state(),
StoreId = khepri_machine:get_store_id(State),
Tree = khepri_machine:get_tree(State),
{_QueryOptions, TreeOptions} = khepri_machine:split_query_options(Options),
{_QueryOptions, TreeOptions} =
khepri_machine:split_query_options(StoreId, Options),
TreeOptions1 = TreeOptions#{expect_specific_node => false},
Ret = khepri_tree:fold(Tree, PathPattern1, Fun, Acc, TreeOptions1),
case Ret of
Expand Down
20 changes: 13 additions & 7 deletions src/khepri_tx_adv.erl
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ get_many(PathPattern, Options) ->

do_get_many(PathPattern, Fun, Acc, Options) ->
PathPattern1 = path_from_string(PathPattern),
{_QueryOptions, TreeOptions} = khepri_machine:split_query_options(Options),
{State, _SideEffects} = get_tx_state(),
StoreId = khepri_machine:get_store_id(State),
Tree = khepri_machine:get_tree(State),
{_QueryOptions, TreeOptions} =
khepri_machine:split_query_options(StoreId, Options),
Ret = khepri_tree:fold(Tree, PathPattern1, Fun, Acc, TreeOptions),
case Ret of
{error, ?khepri_exception(_, _) = Exception} ->
Expand Down Expand Up @@ -210,14 +212,16 @@ put_many(PathPattern, Data, Options) ->
ensure_updates_are_allowed(),
PathPattern1 = path_from_string(PathPattern),
Payload1 = khepri_payload:wrap(Data),
{State, _SideEffects} = get_tx_state(),
StoreId = khepri_machine:get_store_id(State),
{_CommandOptions, TreeAndPutOptions} =
khepri_machine:split_command_options(Options),
khepri_machine:split_command_options(StoreId, Options),
{TreeOptions, PutOptions} =
khepri_machine:split_put_options(TreeAndPutOptions),
%% TODO: Ensure `CommandOptions' is unset.
Fun = fun(State, SideEffects) ->
Fun = fun(State1, SideEffects) ->
khepri_machine:insert_or_update_node(
State, PathPattern1, Payload1, PutOptions, TreeOptions,
State1, PathPattern1, Payload1, PutOptions, TreeOptions,
SideEffects)
end,
handle_state_for_call(Fun).
Expand Down Expand Up @@ -401,13 +405,15 @@ delete_many(PathPattern) ->
delete_many(PathPattern, Options) ->
ensure_updates_are_allowed(),
PathPattern1 = path_from_string(PathPattern),
{State, _SideEffects} = get_tx_state(),
StoreId = khepri_machine:get_store_id(State),
{_CommandOptions, TreeOptions} =
khepri_machine:split_command_options(Options),
khepri_machine:split_command_options(StoreId, Options),
%% TODO: Ensure `CommandOptions' is empty and `TreeOptions' doesn't
%% contains put options.
Fun = fun(State, SideEffects) ->
Fun = fun(State1, SideEffects) ->
khepri_machine:delete_matching_nodes(
State, PathPattern1, TreeOptions, SideEffects)
State1, PathPattern1, TreeOptions, SideEffects)
end,
handle_state_for_call(Fun).

Expand Down

0 comments on commit 8e5703e

Please sign in to comment.