Skip to content

Commit

Permalink
Merge pull request #4187 from esl/shapers
Browse files Browse the repository at this point in the history
Migrate from inside shapers to opuntia
  • Loading branch information
chrzaszcz authored Dec 20, 2023
2 parents b29ab06 + 565e636 commit 2235b76
Show file tree
Hide file tree
Showing 18 changed files with 122 additions and 331 deletions.
2 changes: 1 addition & 1 deletion big_tests/tests/mam_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ delete_users(Config) ->

increase_limits(Config) ->
Config1 = mongoose_helper:backup_and_set_config(Config, increased_limits()),
rpc_apply(shaper_srv, reset_all_shapers, [host_type()]),
rpc_apply(mongoose_shaper, reset_all_shapers, [host_type()]),
Config1.

increased_limits() ->
Expand Down
6 changes: 3 additions & 3 deletions include/mod_muc_room.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@

-record(activity, {message_time = 0,
presence_time = 0,
message_shaper :: shaper:shaper(),
presence_shaper :: shaper:shaper(),
message_shaper :: mongoose_shaper:shaper(),
presence_shaper :: mongoose_shaper:shaper(),
message,
presence
}).
Expand All @@ -85,7 +85,7 @@
subject_timestamp = <<>>,
just_created = false :: boolean(),
activity = treap:empty() :: treap:treap(),
room_shaper :: shaper:shaper(),
room_shaper :: mongoose_shaper:shaper(),
room_queue = queue:new(),
http_auth_pool = none :: none | mongoose_http_client:pool(),
http_auth_pids = [] :: [pid()],
Expand Down
1 change: 1 addition & 0 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
{flatlog, "0.1.2"},

%%% Stateless libraries
{opuntia, "0.3.0"},
{fast_tls, "1.1.16"},
{fast_scram, "0.5.0"},
{idna, "6.1.1"},
Expand Down
3 changes: 3 additions & 0 deletions rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
{ref,"c71265c00bd7b465b7214770895503383dbe299e"}},
0},
{<<"observer_cli">>,{pkg,<<"observer_cli">>,<<"1.7.4">>},0},
{<<"opuntia">>,{pkg,<<"opuntia">>,<<"0.3.0">>},0},
{<<"p1_utils">>,{pkg,<<"p1_utils">>,<<"1.0.25">>},1},
{<<"pa">>,
{git,"https://github.com/erszcz/pa.git",
Expand Down Expand Up @@ -170,6 +171,7 @@
{<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>},
{<<"mysql">>, <<"0E4BBCF701B7D8EA5C3750F220F26323C0FC18B844960988BE196B33A8A9F3C1">>},
{<<"observer_cli">>, <<"3C1BFB6D91BF68F6A3D15F46AE20DA0F7740D363EE5BC041191CE8722A6C4FAE">>},
{<<"opuntia">>, <<"FB365A48C48CEE76C76407FB0B0E13065E6CB23F4918F7C6B084499FAEB1CCDF">>},
{<<"p1_utils">>, <<"2D39B5015A567BBD2CC7033EEB93A7C60D8C84EFE1EF69A3473FAA07FA268187">>},
{<<"parse_trans">>, <<"BB87AC362A03CA674EBB7D9D498F45C03256ADED7214C9101F7035EF44B798C7">>},
{<<"pooler">>, <<"898CD1FA301FC42D4A8ED598CE139B71CA85B54C16AB161152B5CC5FBDCFA1A8">>},
Expand Down Expand Up @@ -233,6 +235,7 @@
{<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>},
{<<"mysql">>, <<"D473C479C19E5CDE20237458EEAD6673C3C00E0EF84AFD30615AEBBB67FEE7B3">>},
{<<"observer_cli">>, <<"50DE6D95D814F447458BD5D72666A74624EDDB0EF98BDCEE61A0153AAE0865FF">>},
{<<"opuntia">>, <<"AB49DB4DA96D30116C85A6D28AC4D6C4BE9C91799D7E2452C37F92C477B9BB88">>},
{<<"p1_utils">>, <<"9219214428F2C6E5D3187FF8EB9A8783695C2427420BE9A259840E07ADA32847">>},
{<<"parse_trans">>, <<"F99E368830BEA44552224E37E04943A54874F08B8590485DE8D13832B63A2DC3">>},
{<<"pooler">>, <<"058D85C5081289B90E97E4DDDBC3BB5A3B4A19A728AB3BC88C689EFCC36A07C7">>},
Expand Down
6 changes: 3 additions & 3 deletions src/c2s/mongoose_c2s.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
jid :: undefined | jid:jid(),
socket :: undefined | mongoose_c2s_socket:socket(),
parser :: undefined | exml_stream:parser(),
shaper :: undefined | shaper:shaper(),
shaper :: undefined | mongoose_shaper:shaper(),
listener_opts :: undefined | listener_opts(),
state_mod = #{} :: #{module() => term()},
info = #{} :: info()
Expand Down Expand Up @@ -90,7 +90,7 @@ handle_event(internal, {connect, {SocketModule, SocketOpts}}, connect,
StateData = #c2s_data{listener_opts = #{shaper := ShaperName,
max_stanza_size := MaxStanzaSize} = LOpts}) ->
{ok, Parser} = exml_stream:new_parser([{max_child_size, MaxStanzaSize}]),
Shaper = shaper:new(ShaperName),
Shaper = mongoose_shaper:new(ShaperName),
C2SSocket = mongoose_c2s_socket:new(SocketModule, SocketOpts, LOpts),
StateData1 = StateData#c2s_data{socket = C2SSocket, parser = Parser, shaper = Shaper},
{next_state, {wait_for_stream, stream_start}, StateData1, state_timeout(LOpts)};
Expand Down Expand Up @@ -246,7 +246,7 @@ handle_socket_packet(StateData = #c2s_data{parser = Parser}, Packet) ->

-spec handle_socket_elements(data(), [exml:element()], non_neg_integer()) -> fsm_res().
handle_socket_elements(StateData = #c2s_data{shaper = Shaper}, Elements, Size) ->
{NewShaper, Pause} = shaper:update(Shaper, Size),
{NewShaper, Pause} = mongoose_shaper:update(Shaper, Size),
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], Size),
NewStateData = StateData#c2s_data{shaper = NewShaper},
MaybePauseTimeout = maybe_pause(NewStateData, Pause),
Expand Down
4 changes: 2 additions & 2 deletions src/ejabberd_s2s_in.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

-record(state, {socket :: mongoose_transport:socket_data(),
streamid :: ejabberd_s2s:stream_id(),
shaper :: shaper:shaper(),
shaper :: mongoose_shaper:shaper(),
tls = false :: boolean(),
tls_enabled = false :: boolean(),
tls_required = false :: boolean(),
Expand All @@ -84,7 +84,7 @@
tls_enabled => boolean(),
tls_options => mongoose_tls:options(),
authenticated => boolean(),
shaper => shaper:shaper(),
shaper => mongoose_shaper:shaper(),
domains => [jid:lserver()]}.

-type statename() :: 'stream_established' | 'wait_for_feature_request'.
Expand Down
2 changes: 1 addition & 1 deletion src/ejabberd_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ init([]) ->
SMBackendSupervisor = supervisor_spec(ejabberd_sm_backend_sup),
OutgoingPoolsSupervisor = supervisor_spec(mongoose_wpool_sup),
Listener = supervisor_spec(mongoose_listener_sup),
ShaperSup = supervisor_spec(mongoose_shaper_sup),
ShaperSup = mongoose_shaper:child_spec(),
DomainSup = supervisor_spec(mongoose_domain_sup),
ReceiverSupervisor =
ejabberd_tmp_sup_spec(mongoose_transport_sup, [mongoose_transport_sup, mongoose_transport]),
Expand Down
4 changes: 2 additions & 2 deletions src/mam/mod_mam_muc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
%%%
%%% This module should be started for each host.
%%% Message archivation is not shaped here (use standard support for this).
%%% MAM's IQs are shaped inside {@link shaper_srv}.
%%% MAM's IQs are shaped inside {@link opuntia_srv}.
%%%
%%% Message identifiers (or UIDs in the spec) are generated based on:
%%%
Expand Down Expand Up @@ -225,7 +225,7 @@ room_process_mam_iq(Acc, From, To, IQ, #{host_type := HostType}) ->
case check_action_allowed(HostType, Acc, To#jid.lserver, Action, MucAction, From, To) of
ok ->
case mod_mam_utils:wait_shaper(HostType, To#jid.lserver, MucAction, From) of
ok ->
continue ->
handle_error_iq(Acc, HostType, To, Action,
handle_mam_iq(HostType, Action, From, To, IQ));
{error, max_delay_reached} ->
Expand Down
4 changes: 2 additions & 2 deletions src/mam/mod_mam_pm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
%%%
%%% This module should be started for each host.
%%% Message archivation is not shaped here (use standard support for this).
%%% MAM's IQs are shaped inside {@link shaper_srv}.
%%% MAM's IQs are shaped inside {@link opuntia_srv}.
%%%
%%% Message identifiers (or UIDs in the spec) are generated based on:
%%%
Expand Down Expand Up @@ -142,7 +142,7 @@ process_mam_iq(Acc, From, To, IQ, _Extra) ->
case is_action_allowed(HostType, Action, From, To) of
true ->
case mod_mam_utils:wait_shaper(HostType, To#jid.lserver, Action, From) of
ok ->
continue ->
handle_error_iq(HostType, Acc, To, Action,
handle_mam_iq(Action, From, To, IQ, Acc));
{error, max_delay_reached} ->
Expand Down
14 changes: 8 additions & 6 deletions src/mam/mod_mam_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1022,13 +1022,15 @@ action_to_global_shaper_name(Action) ->
list_to_atom(atom_to_list(Action) ++ "_global_shaper").

-spec wait_shaper(mongooseim:host_type(), jid:server(), mam_iq:action(), jid:jid()) ->
'ok' | {'error', 'max_delay_reached'}.
continue | {error, max_delay_reached}.
wait_shaper(HostType, Host, Action, From) ->
case shaper_srv:wait(HostType, Host, action_to_shaper_name(Action), From, 1) of
ok ->
shaper_srv:wait(HostType, Host, action_to_global_shaper_name(Action), global, 1);
Err ->
Err
case mongoose_shaper:wait(
HostType, Host, action_to_shaper_name(Action), From, 1) of
continue ->
mongoose_shaper:wait(
global, Host, action_to_global_shaper_name(Action), From, 1);
{error, max_delay_reached} ->
{error, max_delay_reached}
end.

%% -----------------------------------------------------------------------
Expand Down
70 changes: 70 additions & 0 deletions src/mongoose_shaper.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
-module(mongoose_shaper).

-export([child_spec/0]).
-export([new/1, update/2, wait/5, reset_all_shapers/1]).
-ignore_xref([reset_all_shapers/1]).

-type shaper() :: opuntia:shaper().
-export_type([shaper/0]).

-spec child_spec() -> supervisor:child_spec().
child_spec() ->
WPoolOpts = [{workers, 10}, {worker, {opuntia_srv, {?MODULE, #{}}}}],
#{id => ?MODULE,
start => {wpool, start_pool, [?MODULE, WPoolOpts]},
restart => permanent,
shutdown => infinity,
type => supervisor}.

-spec new(atom()) -> opuntia:shaper().
new(Name) ->
opuntia:new(get_shaper_config(Name)).

-spec update(opuntia:shaper(), opuntia:tokens()) -> {opuntia:shaper(), opuntia:delay()}.
update(Shaper, Tokens) ->
opuntia:update(Shaper, Tokens).

%% @doc Shapes the caller from executing the action.
-spec wait(HostType :: mongooseim:host_type_or_global(),
Domain :: jid:lserver(),
Action :: atom(),
FromJID :: jid:jid(),
Size :: integer()) -> continue | {error, max_delay_reached}.
wait(HostType, Domain, Action, FromJID, Size) ->
Worker = wpool_pool:hash_worker(?MODULE, FromJID),
Config = get_shaper_config(get_shaper_name(HostType, Domain, Action, FromJID)),
Key = new_key(Domain, Action, FromJID),
opuntia_srv:wait(Worker, Key, Size, Config).

-type key() :: {global | jid:server(), atom(), jid:jid()}.
-spec new_key(jid:server() | global, atom(), jid:jid()) -> key().
new_key(Domain, Action, FromJID) ->
{Domain, Action, FromJID}.

%% @doc Ask all shaper servers to forget current shapers and read settings again
reset_all_shapers(_HostType) ->
[ opuntia_srv:reset_shapers(ProcName) || ProcName <- wpool:get_workers(?MODULE) ],
ok.

-spec get_shaper_name(HostType :: mongooseim:host_type_or_global(),
Domain :: global | jid:server(),
Action :: atom(),
FromJID :: jid:jid()) -> allow | none.
get_shaper_name(HostType, Domain, Action, FromJID) ->
case acl:match_rule(HostType, Domain, Action, FromJID) of
deny -> default_shaper();
Value -> Value
end.

-spec get_shaper_config(atom()) -> number().
get_shaper_config(Name) ->
case mongoose_config:lookup_opt([shaper, Name]) of
{ok, #{max_rate := MaxRatePerSecond}} ->
Rate = MaxRatePerSecond div 1000,
#{bucket_size => MaxRatePerSecond, rate => Rate, start_full => true};
{error, not_found} ->
0
end.

default_shaper() ->
none.
58 changes: 0 additions & 58 deletions src/mongoose_shaper_sup.erl

This file was deleted.

18 changes: 9 additions & 9 deletions src/mongoose_transport.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

-record(state, {socket :: socket(),
sockmod = gen_tcp :: socket_module(),
shaper_state :: shaper:shaper(),
shaper_state :: mongoose_shaper:shaper(),
dest_pid :: undefined | pid(), %% gen_fsm_compat pid
max_stanza_size :: stanza_size(),
parser :: exml_stream:parser(),
Expand Down Expand Up @@ -164,7 +164,7 @@ send_text(SocketData, Data) ->
#socket_data{sockmod = SockMod, socket = Socket,
connection_type = ConnectionType} = SocketData,
case catch SockMod:send(Socket, Data) of
ok ->
ok ->
update_transport_metrics(byte_size(Data), sent, ConnectionType),
ok;
{error, timeout} ->
Expand Down Expand Up @@ -215,7 +215,7 @@ start_link(Socket, Shaper, Opts) ->
gen_server:start_link(?MODULE, [Socket, Shaper, Opts], []).

init([Socket, Shaper, Opts]) ->
ShaperState = shaper:new(Shaper),
ShaperState = mongoose_shaper:new(Shaper),
#{max_stanza_size := MaxStanzaSize,
hibernate_after := HibernateAfter,
connection_type := ConnectionType} = Opts,
Expand Down Expand Up @@ -267,7 +267,7 @@ handle_call(_Request, _From, State) ->
{reply, ok, State, hibernate_or_timeout(State)}.

handle_cast({change_shaper, Shaper}, State) ->
NewShaperState = shaper:new(Shaper),
NewShaperState = mongoose_shaper:new(Shaper),
NewState = State#state{shaper_state = NewShaperState},
{noreply, NewState, hibernate_or_timeout(NewState)};
handle_cast(close, State) ->
Expand All @@ -277,7 +277,7 @@ handle_cast(_Msg, State) ->

handle_info({tcp, _TCPSocket, Data}, #state{sockmod = gen_tcp} = State) ->
NewState = process_data(Data, State),
{noreply, NewState, hibernate_or_timeout(NewState)};
{noreply, NewState, hibernate_or_timeout(NewState)};
handle_info({Tag, _TCPSocket, Data},
#state{socket = Socket,
sockmod = mongoose_tls} = State) when Tag == tcp; Tag == ssl ->
Expand Down Expand Up @@ -394,21 +394,21 @@ process_data(Data, #state{parser = Parser,
Size = byte_size(Data),
{Events, NewParser} =
case exml_stream:parse(Parser, Data) of
{ok, NParser, Elems} ->
{ok, NParser, Elems} ->
{[wrap_xml_elements_and_update_metrics(E) || E <- Elems], NParser};
{error, Reason} ->
{[{xmlstreamerror, Reason}], Parser}
end,
{NewShaperState, Pause} = shaper:update(ShaperState, Size),
{NewShaperState, Pause} = mongoose_shaper:update(ShaperState, Size),
update_transport_metrics(Size, received, State#state.connection_type),
[gen_fsm_compat:send_event(DestPid, Event) || Event <- Events],
maybe_pause(Pause, State),
State#state{parser = NewParser, shaper_state = NewShaperState}.

wrap_xml_elements_and_update_metrics(#xmlel{} = E) ->
wrap_xml_elements_and_update_metrics(#xmlel{} = E) ->
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)),
{xmlstreamelement, E};
wrap_xml_elements_and_update_metrics(E) ->
wrap_xml_elements_and_update_metrics(E) ->
mongoose_metrics:update(global, [data, xmpp, received, xml_stanza_size], exml:xml_size(E)),
E.

Expand Down
3 changes: 2 additions & 1 deletion src/mongooseim.app.src
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@
cowboy_swagger,
tomerl,
flatlog,
segmented_cache
segmented_cache,
opuntia
]},
{included_applications, [mnesia, cets]},
{env, []},
Expand Down
Loading

0 comments on commit 2235b76

Please sign in to comment.