From 363bfb915dbd4485b36e9b9058875aab15f42e6d Mon Sep 17 00:00:00 2001 From: "Padmanabh Gupta (from Dev Box)" Date: Mon, 23 Dec 2024 11:28:58 +0530 Subject: [PATCH] segregate TLS and non TLS paths --- .../Embedded/EmbeddedNetworkHandler.cs | 4 +- .../BDN.benchmark/Network/BasicOperations.cs | 4 +- .../BDN.benchmark/Network/NetworkBase.cs | 5 +- .../Network/RawStringOperations.cs | 40 ++++++------ libs/common/Networking/NetworkHandler.cs | 64 ++++++++++--------- .../Networking/TcpNetworkHandlerBase.cs | 36 ++++++++++- 6 files changed, 91 insertions(+), 62 deletions(-) diff --git a/benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs b/benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs index 0dc367cd7f..a58a5e44bc 100644 --- a/benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs +++ b/benchmark/BDN.benchmark/Embedded/EmbeddedNetworkHandler.cs @@ -25,12 +25,12 @@ public override void Dispose() public override bool TryClose() => throw new NotImplementedException(); - public async ValueTask Send(Request request) + public void Send(Request request) { networkReceiveBuffer = request.buffer; unsafe { networkReceiveBufferPtr = request.bufferPtr; } - await OnNetworkReceiveAsync(request.buffer.Length); + OnNetworkReceive(request.buffer.Length); Debug.Assert(networkBytesRead == 0); Debug.Assert(networkReadHead == 0); diff --git a/benchmark/BDN.benchmark/Network/BasicOperations.cs b/benchmark/BDN.benchmark/Network/BasicOperations.cs index deda2d4955..ebd537e042 100644 --- a/benchmark/BDN.benchmark/Network/BasicOperations.cs +++ b/benchmark/BDN.benchmark/Network/BasicOperations.cs @@ -22,9 +22,9 @@ public override void GlobalSetup() } [Benchmark] - public async ValueTask InlinePing() + public void InlinePing() { - await Send(ping); + Send(ping); } } } \ No newline at end of file diff --git a/benchmark/BDN.benchmark/Network/NetworkBase.cs b/benchmark/BDN.benchmark/Network/NetworkBase.cs index 8f34e5f0e6..b08feb658b 100644 --- a/benchmark/BDN.benchmark/Network/NetworkBase.cs +++ b/benchmark/BDN.benchmark/Network/NetworkBase.cs @@ -64,10 +64,7 @@ public virtual void GlobalCleanup() server.Dispose(); } - protected ValueTask Send(Request request) - { - return networkHandler.Send(request); - } + protected void Send(Request request) => networkHandler.Send(request); protected unsafe void SetupOperation(ref Request request, ReadOnlySpan operation, int batchSize = batchSize) { diff --git a/benchmark/BDN.benchmark/Network/RawStringOperations.cs b/benchmark/BDN.benchmark/Network/RawStringOperations.cs index 5eb6e3eec4..326194c3de 100644 --- a/benchmark/BDN.benchmark/Network/RawStringOperations.cs +++ b/benchmark/BDN.benchmark/Network/RawStringOperations.cs @@ -65,63 +65,63 @@ public override void GlobalSetup() } [Benchmark] - public async ValueTask Set() + public void Set() { - await Send(set); + Send(set); } [Benchmark] - public async ValueTask SetEx() + public void SetEx() { - await Send(setex); + Send(setex); } [Benchmark] - public async ValueTask SetNx() + public void SetNx() { - await Send(setnx); + Send(setnx); } [Benchmark] - public async ValueTask SetXx() + public void SetXx() { - await Send(setxx); + Send(setxx); } [Benchmark] - public async ValueTask GetFound() + public void GetFound() { - await Send(getf); + Send(getf); } [Benchmark] - public async ValueTask GetNotFound() + public void GetNotFound() { - await Send(getnf); + Send(getnf); } [Benchmark] - public async ValueTask Increment() + public void Increment() { - await Send(incr); + Send(incr); } [Benchmark] - public async ValueTask Decrement() + public void Decrement() { - await Send(decr); + Send(decr); } [Benchmark] - public async ValueTask IncrementBy() + public void IncrementBy() { - await Send(incrby); + Send(incrby); } [Benchmark] - public async ValueTask DecrementBy() + public void DecrementBy() { - await Send(decrby); + Send(decrby); } } } \ No newline at end of file diff --git a/libs/common/Networking/NetworkHandler.cs b/libs/common/Networking/NetworkHandler.cs index 91fabc2a01..6cd6cbc56b 100644 --- a/libs/common/Networking/NetworkHandler.cs +++ b/libs/common/Networking/NetworkHandler.cs @@ -267,11 +267,38 @@ async Task AuthenticateAsClientAsync(SslClientAuthenticationOptions sslClientOpt } } + public void OnNetworkReceive(int bytesTransferred) + { + networkBytesRead += bytesTransferred; + transportReceiveBuffer = networkReceiveBuffer; + unsafe + { + transportReceiveBufferPtr = networkReceiveBufferPtr; + } + transportBytesRead = networkBytesRead; + + // We do not have an active read task, so we will process on the network thread + Process(); + EndTransformNetworkToTransport(); + UpdateNetworkBuffers(); + } + + private void UpdateNetworkBuffers() + { + // Shift network buffer after processing is done + if (networkReadHead > 0) + ShiftNetworkReceiveBuffer(); + + // Double network buffer if out of space after processing is complete + if (networkBytesRead == networkReceiveBuffer.Length) + DoubleNetworkReceiveBuffer(); + } + /// /// On network receive /// /// Number of bytes transferred - public async ValueTask OnNetworkReceiveAsync(int bytesTransferred) + public async ValueTask OnNetworkReceiveWithTLS(int bytesTransferred) { // Wait for SslStream async processing to complete, if any (e.g., authentication phase) while (readerStatus == TlsReaderStatus.Active) @@ -283,26 +310,10 @@ public async ValueTask OnNetworkReceiveAsync(int bytesTransferred) switch (readerStatus) { case TlsReaderStatus.Rest: - // Synchronously try to process the received data - if (sslStream == null) - { - transportReceiveBuffer = networkReceiveBuffer; - unsafe - { - transportReceiveBufferPtr = networkReceiveBufferPtr; - } - transportBytesRead = networkBytesRead; - - // We do not have an active read task, so we will process on the network thread - Process(); - } - else - { - readerStatus = TlsReaderStatus.Active; - Read(); - while (readerStatus == TlsReaderStatus.Active) - await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false); - } + readerStatus = TlsReaderStatus.Active; + Read(); + while (readerStatus == TlsReaderStatus.Active) + await expectingData.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false); break; case TlsReaderStatus.Waiting: // We have a ReadAsync task waiting for new data, set it to active status @@ -320,16 +331,7 @@ public async ValueTask OnNetworkReceiveAsync(int bytesTransferred) } Debug.Assert(readerStatus != TlsReaderStatus.Active); - - EndTransformNetworkToTransport(); - - // Shift network buffer after processing is done - if (networkReadHead > 0) - ShiftNetworkReceiveBuffer(); - - // Double network buffer if out of space after processing is complete - if (networkBytesRead == networkReceiveBuffer.Length) - DoubleNetworkReceiveBuffer(); + UpdateNetworkBuffers(); } [MethodImpl(MethodImplOptions.NoInlining)] diff --git a/libs/common/Networking/TcpNetworkHandlerBase.cs b/libs/common/Networking/TcpNetworkHandlerBase.cs index 2399a5b8ce..1d79bef227 100644 --- a/libs/common/Networking/TcpNetworkHandlerBase.cs +++ b/libs/common/Networking/TcpNetworkHandlerBase.cs @@ -26,6 +26,7 @@ public abstract class TcpNetworkHandlerBase : Netwo readonly string remoteEndpoint; readonly string localEndpoint; int closeRequested; + readonly bool useTLS; /// /// Constructor @@ -39,7 +40,7 @@ public TcpNetworkHandlerBase(TServerHook serverHook, TNetworkSender networkSende remoteEndpoint = socket.RemoteEndPoint is IPEndPoint remote ? $"{remote.Address}:{remote.Port}" : ""; localEndpoint = socket.LocalEndPoint is IPEndPoint local ? $"{local.Address}:{local.Port}" : ""; - + this.useTLS = useTLS; AllocateNetworkReceiveBuffer(); } @@ -137,7 +138,36 @@ void Dispose(SocketAsyncEventArgs e) void RecvEventArg_Completed(object sender, SocketAsyncEventArgs e) { // Complete receive event and release thread while we process data async - _ = HandleReceiveAsync(sender, e); + if (this.useTLS) + { + _ = HandleReceiveAsync(sender, e); + } + else + { + HandleReceiveSync(sender, e); + } + } + + private void HandleReceiveSync(object sender, SocketAsyncEventArgs e) + { + try + { + do + { + if (e.BytesTransferred == 0 || e.SocketError != SocketError.Success || serverHook.Disposed) + { + // No more things to receive + Dispose(e); + break; + } + OnNetworkReceive(e.BytesTransferred); + e.SetBuffer(networkReceiveBuffer, networkBytesRead, networkReceiveBuffer.Length - networkBytesRead); + } while (!e.AcceptSocket.ReceiveAsync(e)); + } + catch (Exception ex) + { + HandleReceiveFailure(ex, e); + } } private async ValueTask HandleReceiveAsync(object sender, SocketAsyncEventArgs e) @@ -152,7 +182,7 @@ private async ValueTask HandleReceiveAsync(object sender, SocketAsyncEventArgs e Dispose(e); break; } - var receiveTask = OnNetworkReceiveAsync(e.BytesTransferred); + var receiveTask = OnNetworkReceiveWithTLS(e.BytesTransferred); if (!receiveTask.IsCompletedSuccessfully) { await receiveTask;