Skip to content

Commit

Permalink
feat: defer conn setting to handshake
Browse files Browse the repository at this point in the history
- remove QUIC_SETTINGS from acceptor
  • Loading branch information
qzhuyan committed Dec 9, 2024
1 parent 13f7b58 commit 44064b0
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 51 deletions.
51 changes: 25 additions & 26 deletions c_src/quicer_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -899,18 +899,12 @@ async_accept2(ErlNifEnv *env,
const ERL_NIF_TERM argv[])
{
ERL_NIF_TERM listener = argv[0];
ERL_NIF_TERM conn_opts = argv[1];
QuicerListenerCTX *l_ctx = NULL;
ERL_NIF_TERM active_val = ATOM_TRUE;
if (!enif_get_resource(env, listener, ctx_listener_t, (void **)&l_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

// Set parm active is optional
enif_get_map_value(
env, conn_opts, ATOM_QUIC_STREAM_OPTS_ACTIVE, &active_val);

ACCEPTOR *acceptor = AcceptorAlloc();
if (!acceptor)
{
Expand All @@ -923,24 +917,11 @@ async_accept2(ErlNifEnv *env,
return ERROR_TUPLE_2(ATOM_BAD_PID);
}

if (!set_owner_recv_mode(acceptor, env, active_val))
{
AcceptorDestroy(acceptor);
return ERROR_TUPLE_2(ATOM_BADARG);
}

if (!create_settings(env, &conn_opts, &acceptor->Settings))
{
AcceptorDestroy(acceptor);
return ERROR_TUPLE_2(ATOM_PARAM_ERROR);
}

AcceptorEnqueue(l_ctx->acceptor_queue, acceptor);

assert(enif_is_process_alive(env, &(acceptor->Pid)));

ERL_NIF_TERM listenHandle = enif_make_resource(env, l_ctx);
return SUCCESS(listenHandle);
return SUCCESS(listener);
}

ERL_NIF_TERM
Expand Down Expand Up @@ -1080,7 +1061,7 @@ get_conn_rid1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
}

QUIC_STATUS
continue_connection_handshake(QuicerConnCTX *c_ctx)
continue_connection_handshake(QuicerConnCTX *c_ctx, QUIC_SETTINGS *Settings)
{
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;

Expand All @@ -1104,20 +1085,28 @@ continue_connection_handshake(QuicerConnCTX *c_ctx)
Status = MsQuic->SetParam(c_ctx->Connection,
QUIC_PARAM_CONN_SETTINGS,
sizeof(QUIC_SETTINGS),
&c_ctx->owner->Settings);
Settings);
return Status;
}

ERL_NIF_TERM
async_handshake_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
async_handshake_2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])

{
QuicerConnCTX *c_ctx;
QUIC_STATUS Status = QUIC_STATUS_SUCCESS;
ERL_NIF_TERM res = ATOM_OK;
CXPLAT_FRE_ASSERT(argc == 1);
CXPLAT_FRE_ASSERT(argc == 2);
ERL_NIF_TERM econn = argv[0];
ERL_NIF_TERM econn_opts = argv[1];
QUIC_SETTINGS Settings = { 0 };
ERL_NIF_TERM active_val = ATOM_TRUE;

if (!enif_get_resource(env, argv[0], ctx_connection_t, (void **)&c_ctx))
// Set parm active is optional
enif_get_map_value(
env, econn_opts, ATOM_QUIC_STREAM_OPTS_ACTIVE, &active_val);

if (!enif_get_resource(env, econn, ctx_connection_t, (void **)&c_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}
Expand All @@ -1129,11 +1118,21 @@ async_handshake_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
return ERROR_TUPLE_2(ATOM_CLOSED);
}

if (QUIC_FAILED(Status = continue_connection_handshake(c_ctx)))
if (!create_settings(env, &econn_opts, &Settings))
{
return ERROR_TUPLE_2(ATOM_PARAM_ERROR);
}

if (QUIC_FAILED(Status = continue_connection_handshake(c_ctx, &Settings)))
{
res = ERROR_TUPLE_2(ATOM_STATUS(Status));
}

if (!set_owner_recv_mode(c_ctx->owner, env, active_val))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

put_conn_handle(c_ctx);
return res;
}
Expand Down
5 changes: 3 additions & 2 deletions c_src/quicer_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ ERL_NIF_TERM
get_conn_rid1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
async_handshake_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
async_handshake_2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);

QUIC_STATUS continue_connection_handshake(QuicerConnCTX *c_ctx);
QUIC_STATUS continue_connection_handshake(QuicerConnCTX *c_ctx,
QUIC_SETTINGS *settings);

ERL_NIF_TERM
peercert1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]);
Expand Down
2 changes: 1 addition & 1 deletion c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ static ErlNifFunc nif_funcs[] = {
{ "open_connection", 1, open_connectionX, 0},
{ "async_connect", 3, async_connect3, 0},
{ "async_accept", 2, async_accept2, 0},
{ "async_handshake", 1, async_handshake_1, 0},
{ "async_handshake", 2, async_handshake_2, 0},
{ "async_shutdown_connection", 3, shutdown_connection3, 0},
{ "async_accept_stream", 2, async_accept_stream2, 0},
{ "start_stream", 2, async_start_stream2, 0},
Expand Down
1 change: 0 additions & 1 deletion c_src/quicer_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ typedef struct ACCEPTOR
ErlNifPid Pid;
ACCEPTOR_RECV_MODE active;
uint16_t active_count; /* counter for active_n */
QUIC_SETTINGS Settings;
void *reserved1;
void *reserved2;
void *reserved3;
Expand Down
21 changes: 16 additions & 5 deletions src/quicer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@
async_connect/3,
handshake/1,
handshake/2,
handshake/3,
async_handshake/1,
async_handshake/2,
accept/2,
accept/3,
async_accept/2,
Expand Down Expand Up @@ -445,16 +447,21 @@ async_connect(Host, Port, Opts) when is_map(Opts) ->
{ok, connection_handle()}
| {error, any()}.
handshake(Conn) ->
handshake(Conn, 5000).
handshake(Conn, #{}, 5000).

-spec handshake(connection_handle(), timeout()) ->
{ok, connection_handle()} | {error, any()}.
handshake(Conn, Timeout) ->
handshake(Conn, #{}, Timeout).

%% @doc Complete TLS handshake after accepted a Connection
%% @see handshake/2
%% @see async_handshake/1
-spec handshake(connection_handle(), timeout()) ->
-spec handshake(connection_handle(), conn_opts(), timeout()) ->
{ok, connection_handle()}
| {error, any()}.
handshake(Conn, Timeout) ->
case async_handshake(Conn) of
handshake(Conn, ConnOpts, Timeout) ->
case async_handshake(Conn, ConnOpts) of
{error, _} = E ->
E;
ok ->
Expand All @@ -472,7 +479,11 @@ handshake(Conn, Timeout) ->
%% @see handshake/2
-spec async_handshake(connection_handle()) -> ok | {error, any()}.
async_handshake(Conn) ->
quicer_nif:async_handshake(Conn).
quicer_nif:async_handshake(Conn, #{}).

-spec async_handshake(connection_handle(), conn_opts()) -> ok | {error, any()}.
async_handshake(Conn, ConnOpts) ->
quicer_nif:async_handshake(Conn, ConnOpts).

%% @doc Accept new Connection (Server)
%%
Expand Down
56 changes: 44 additions & 12 deletions src/quicer_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -217,12 +217,12 @@ start_link(undefined, Listener, {_LOpts, COpts, _SOpts} = Opts, Sup) when is_map
start_link(CallbackModule, Listener, Opts, Sup) ->
gen_server:start_link(?MODULE, [CallbackModule, Listener, Opts, Sup], []).

-define(DEFAULT_SPAWN_OPTS, [{spawn_opt, [link]}]).
-define(DEFAULT_ACCEPTOR_START_OPTS, [{spawn_opt, [link]}]).
%% @doc start acceptor with shared Listener confs
start_acceptor(ListenHandle, Tab, Sup) when is_pid(Sup) ->
[{c_opts, #{conn_callback := CallbackModule} = Conf}] = ets:lookup(Tab, c_opts),
StartOpts = maps:get(spawn_opts, Conf, ?DEFAULT_SPAWN_OPTS),
gen_server:start(?MODULE, [CallbackModule, ListenHandle, Tab, Sup], StartOpts).
StartOpts = maps:get(proc_start_opts, Conf, ?DEFAULT_ACCEPTOR_START_OPTS),
gen_server:start_link(?MODULE, [CallbackModule, ListenHandle, Tab, Sup], StartOpts).

-spec get_cb_state(ConnPid :: pid()) -> cb_state() | {error, any()}.
get_cb_state(ConnPid) ->
Expand Down Expand Up @@ -285,7 +285,8 @@ init([CallbackModule, {Host, Port}, {COpts, SOpts}]) when
callback => CallbackModule,
conn_opts => COpts,
stream_opts => SOpts,
sup => undefined
sup => undefined,
is_server => false
},
{ok, Conn} = quicer:async_connect(Host, Port, COpts),
State1 = State0#{conn := Conn},
Expand All @@ -298,7 +299,7 @@ init([CallbackModule, {Host, Port}, {COpts, SOpts}]) when
Other ->
Other
end;
%% For Server
%% For Server, deprecating since v0.2
init([CallbackModule, Listener, {LOpts, COpts, SOpts}, Sup]) when is_list(COpts) ->
init([CallbackModule, Listener, {LOpts, maps:from_list(COpts), SOpts}, Sup]);
init([CallbackModule, Listener, {_LOpts, COpts, SOpts}, Sup]) when CallbackModule =/= undefined ->
Expand All @@ -309,7 +310,8 @@ init([CallbackModule, Listener, {_LOpts, COpts, SOpts}, Sup]) when CallbackModul
callback => CallbackModule,
conn_opts => COpts,
stream_opts => SOpts,
sup => Sup
sup => Sup,
is_server => true
},
%% Async Acceptor
{ok, Listener} = quicer_nif:async_accept(Listener, COpts),
Expand All @@ -322,11 +324,26 @@ init([CallbackModule, Listener, {_LOpts, COpts, SOpts}, Sup]) when CallbackModul
Other ->
Other
end;
init([CallbackModule, Listener, ConfTab, Sup]) ->
[{l_opts, LOpts}] = ets:lookup(ConfTab, l_opts),
[{c_opts, COpts}] = ets:lookup(ConfTab, c_opts),
[{s_opts, SOpts}] = ets:lookup(ConfTab, s_opts),
init([CallbackModule, Listener, {LOpts, COpts, SOpts}, Sup]).
%% For Acceptor since v0.2
init([_, _, _ConfTab, _] = Args) ->
acceptor_init(Args).
acceptor_init([CallbackModule, Listener, ConfTab, Sup]) ->
process_flag(trap_exit, true),
State0 = #{
listener => Listener,
callback => CallbackModule,
conf_tab => ConfTab,
sup => Sup,
is_server => true,
conn => undefined,
conn_opts => undefined,
stream_opts => undefined,
callback_state => undefined
},
%% Async Acceptor
NOOP = #{},
{ok, Listener} = quicer_nif:async_accept(Listener, NOOP),
{ok, State0}.

%%--------------------------------------------------------------------
%% @private
Expand Down Expand Up @@ -402,8 +419,23 @@ handle_cast(_Request, State) ->
| {stop, Reason :: normal | term(), NewState :: term()}.
handle_info(
{quic, new_conn, C, Props},
#{callback := M, sup := Sup, callback_state := CBState} = State
#{callback := M, sup := Sup, callback_state := undefined, conf_tab := ConfTab} = State
) ->
%% deferred init
COpts = ets:lookup_element(ConfTab, c_opts, 2),
SOpts = ets:lookup_element(ConfTab, s_opts, 2),
Sup =/= undefined andalso (catch supervisor:start_child(Sup, [Sup])),
case M:init(COpts#{stream_opts => SOpts}) of
{ok, CBState0} ->
default_cb_ret(M:new_conn(C, Props, CBState0), State#{conn := C});
Other ->
Other
end;
%% deprecating this clause
handle_info(
{quic, new_conn, C, Props},
#{callback := M, sup := Sup, callback_state := CBState} = State
) when is_map(CBState) ->
?tp_ignore_side_effects_in_prod(debug, #{
module => ?MODULE, conn => C, props => Props, event => new_conn
}),
Expand Down
6 changes: 3 additions & 3 deletions src/quicer_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
close_listener/1,
async_connect/3,
async_accept/2,
async_handshake/1,
async_handshake/2,
async_shutdown_connection/3,
async_accept_stream/2,
start_stream/2,
Expand Down Expand Up @@ -272,9 +272,9 @@ async_connect(_Host, _Port, _Opts) ->
async_accept(_Listener, _Opts) ->
erlang:nif_error(nif_library_not_loaded).

-spec async_handshake(connection_handle()) ->
-spec async_handshake(connection_handle(), conn_opts()) ->
ok | {error, badarg | atom_reason()}.
async_handshake(_Connection) ->
async_handshake(_Connection, _ConnOpts) ->
erlang:nif_error(nif_library_not_loaded).

-spec async_shutdown_connection(connection_handle(), conn_shutdown_flag(), app_errno()) ->
Expand Down
2 changes: 1 addition & 1 deletion test/quicer_snb_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ tc_slow_conn(Config) ->
},
#{
?snk_kind := debug,
function := "async_handshake_1",
function := "async_handshake_2",
tag := "start",
mark := 0,
resource_id := _Rid
Expand Down

0 comments on commit 44064b0

Please sign in to comment.