From 68735dcb2325f815b88e1881bfa6cf82b3c86d53 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 17 Dec 2024 11:25:09 +0200 Subject: [PATCH] Fixes #12646 - CompleteListener may be invoked twice. (#12647) Fixed by capturing the HttpChannel before other code could disassociate it from the HttpExchange. Signed-off-by: Simone Bordet --- .../jetty/client/transport/HttpExchange.java | 52 +++--- .../jetty/client/HttpClientFailureTest.java | 154 +++++++++++------- 2 files changed, 128 insertions(+), 78 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpExchange.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpExchange.java index 0028e6c3a4bd..b0c34c390635 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpExchange.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpExchange.java @@ -69,7 +69,7 @@ public HttpRequest getRequest() public Throwable getRequestFailure() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { return requestFailure; } @@ -87,7 +87,7 @@ public HttpResponse getResponse() public Throwable getResponseFailure() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { return responseFailure; } @@ -110,7 +110,7 @@ boolean associate(HttpChannel channel) { boolean result = false; boolean abort = false; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { // Only associate if the exchange state is initial, // as the exchange could be already failed. @@ -134,7 +134,7 @@ boolean associate(HttpChannel channel) void disassociate(HttpChannel channel) { boolean abort = false; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED) abort = true; @@ -147,7 +147,7 @@ void disassociate(HttpChannel channel) private HttpChannel getHttpChannel() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { return _channel; } @@ -155,14 +155,15 @@ private HttpChannel getHttpChannel() public boolean requestComplete(Throwable failure) { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { - return completeRequest(failure); + return lockedCompleteRequest(failure); } } - private boolean completeRequest(Throwable failure) + private boolean lockedCompleteRequest(Throwable failure) { + assert lock.isHeldByCurrentThread(); if (requestState == State.PENDING) { requestState = State.COMPLETED; @@ -174,7 +175,7 @@ private boolean completeRequest(Throwable failure) public boolean isResponseComplete() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { return responseState == State.COMPLETED; } @@ -182,14 +183,15 @@ public boolean isResponseComplete() public boolean responseComplete(Throwable failure) { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { - return completeResponse(failure); + return lockedCompleteResponse(failure); } } - private boolean completeResponse(Throwable failure) + private boolean lockedCompleteResponse(Throwable failure) { + assert lock.isHeldByCurrentThread(); if (responseState == State.PENDING) { responseState = State.COMPLETED; @@ -202,7 +204,7 @@ private boolean completeResponse(Throwable failure) public Result terminateRequest() { Result result = null; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { if (requestState == State.COMPLETED) requestState = State.TERMINATED; @@ -219,7 +221,7 @@ public Result terminateRequest() public Result terminateResponse() { Result result = null; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { if (responseState == State.COMPLETED) responseState = State.TERMINATED; @@ -235,7 +237,7 @@ public Result terminateResponse() boolean isResponseCompleteOrTerminated() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { return responseState == State.COMPLETED || responseState == State.TERMINATED; } @@ -245,12 +247,14 @@ public void abort(Throwable failure, Promise promise) { // Atomically change the state of this exchange to be completed. // This will avoid that this exchange can be associated to a channel. + HttpChannel channel; boolean abortRequest; boolean abortResponse; - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { - abortRequest = completeRequest(failure); - abortResponse = completeResponse(failure); + channel = _channel; + abortRequest = lockedCompleteRequest(failure); + abortResponse = lockedCompleteResponse(failure); } if (!abortRequest && !abortResponse) @@ -268,7 +272,12 @@ public void abort(Throwable failure, Promise promise) // request content, notify them of the failure. Request.Content body = request.getBody(); if (abortRequest && body != null) + { + // This may eventually complete the request, + // and if the response is already completed + // also invoke the Response.CompleteListeners. body.fail(failure); + } // Case #1: exchange was in the destination queue. if (destination.remove(this)) @@ -280,8 +289,7 @@ public void abort(Throwable failure, Promise promise) return; } - HttpChannel channel = getHttpChannel(); - if (channel == null) + if (channel == null && abortRequest) { // Case #2: exchange was not yet associated. // Because this exchange is failed, when associate() is called @@ -309,7 +317,7 @@ private void notifyFailureComplete(Throwable failure) public void resetResponse() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { responseState = State.PENDING; responseFailure = null; @@ -327,7 +335,7 @@ public void proceed(Runnable proceedAction, Throwable failure) @Override public String toString() { - try (AutoLock l = lock.lock()) + try (AutoLock ignored = lock.lock()) { return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}", HttpExchange.class.getSimpleName(), diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java index 99f97ce18a73..e3d290060dce 100644 --- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java +++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientFailureTest.java @@ -19,20 +19,29 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP; import org.eclipse.jetty.client.transport.HttpDestination; import org.eclipse.jetty.client.transport.internal.HttpConnectionOverHTTP; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -41,7 +50,6 @@ public class HttpClientFailureTest { private Server server; private ServerConnector connector; - private HttpClient client; private void startServer(Handler handler) throws Exception { @@ -57,10 +65,7 @@ private void startServer(Handler handler) throws Exception @AfterEach public void dispose() throws Exception { - if (server != null) - server.stop(); - if (client != null) - client.stop(); + LifeCycle.stop(server); } @Test @@ -68,19 +73,21 @@ public void testFailureBeforeRequestCommit() throws Exception { startServer(new EmptyServerHandler()); - client = new HttpClient(new HttpClientTransportOverHTTP(1)); - client.start(); - - Request request = client.newRequest("localhost", connector.getLocalPort()) - .onRequestHeaders(r -> r.getConnection().close()) - .timeout(5, TimeUnit.SECONDS); - assertThrows(ExecutionException.class, request::send); - - HttpDestination destination = (HttpDestination)client.resolveDestination(request); - DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); - assertEquals(0, connectionPool.getConnectionCount()); - assertEquals(0, connectionPool.getActiveConnections().size()); - assertEquals(0, connectionPool.getIdleConnections().size()); + try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1))) + { + client.start(); + + Request request = client.newRequest("localhost", connector.getLocalPort()) + .onRequestHeaders(r -> r.getConnection().close()) + .timeout(5, TimeUnit.SECONDS); + assertThrows(ExecutionException.class, request::send); + + HttpDestination destination = (HttpDestination)client.resolveDestination(request); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); + assertEquals(0, connectionPool.getConnectionCount()); + assertEquals(0, connectionPool.getActiveConnections().size()); + assertEquals(0, connectionPool.getIdleConnections().size()); + } } @Test @@ -89,7 +96,7 @@ public void testFailureAfterRequestCommit() throws Exception startServer(new EmptyServerHandler()); AtomicReference connectionRef = new AtomicReference<>(); - client = new HttpClient(new HttpClientTransportOverHTTP(1) + try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1) { @Override public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) throws IOException @@ -98,48 +105,83 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map - { - connectionRef.get().getEndPoint().close(); - commitLatch.countDown(); - }) - .body(content) - .idleTimeout(2, TimeUnit.SECONDS) - .send(result -> + })) + { + client.start(); + + CountDownLatch commitLatch = new CountDownLatch(1); + CountDownLatch completeLatch = new CountDownLatch(1); + AsyncRequestContent content = new AsyncRequestContent(); + client.newRequest("localhost", connector.getLocalPort()) + .onRequestCommit(request -> + { + connectionRef.get().getEndPoint().close(); + commitLatch.countDown(); + }) + .body(content) + .idleTimeout(2, TimeUnit.SECONDS) + .send(result -> + { + if (result.isFailed()) + completeLatch.countDown(); + }); + + assertTrue(commitLatch.await(5, TimeUnit.SECONDS)); + + // The first chunk will be read but its write will fail. + content.write(ByteBuffer.allocate(1024), Callback.NOOP); + + // The second chunk is failed because the content is failed. + CountDownLatch contentLatch = new CountDownLatch(1); + content.write(ByteBuffer.allocate(1024), new Callback() { - if (result.isFailed()) - completeLatch.countDown(); + @Override + public void failed(Throwable x) + { + contentLatch.countDown(); + } }); - assertTrue(commitLatch.await(5, TimeUnit.SECONDS)); - - // The first chunk will be read but its write will fail. - content.write(ByteBuffer.allocate(1024), Callback.NOOP); + assertTrue(contentLatch.await(5, TimeUnit.SECONDS)); + assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); - // The second chunk is failed because the content is failed. - CountDownLatch contentLatch = new CountDownLatch(1); - content.write(ByteBuffer.allocate(1024), new Callback() - { - @Override - public void failed(Throwable x) - { - contentLatch.countDown(); - } - }); + DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool(); + assertEquals(0, connectionPool.getConnectionCount()); + assertEquals(0, connectionPool.getActiveConnections().size()); + assertEquals(0, connectionPool.getIdleConnections().size()); + } + } - assertTrue(contentLatch.await(5, TimeUnit.SECONDS)); - assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + @Test + public void testPendingRequestContentThenTotalTimeout() throws Exception + { + startServer(new EmptyServerHandler()); - DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool(); - assertEquals(0, connectionPool.getConnectionCount()); - assertEquals(0, connectionPool.getActiveConnections().size()); - assertEquals(0, connectionPool.getIdleConnections().size()); + try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1))) + { + client.start(); + + long timeout = 1000; + AsyncRequestContent content = new AsyncRequestContent(); + AtomicInteger completed = new AtomicInteger(); + CountDownLatch resultLatch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .method(HttpMethod.POST) + .body(content) + .timeout(timeout, TimeUnit.MILLISECONDS) + .send(result -> + { + // This is invoked only when the total timeout elapses. + completed.incrementAndGet(); + assertThat(result.getRequestFailure(), notNullValue()); + assertThat(result.getResponseFailure(), nullValue()); + assertThat(result.getResponse().getStatus(), is(HttpStatus.OK_200)); + resultLatch.countDown(); + }); + + assertTrue(resultLatch.await(2 * timeout, TimeUnit.MILLISECONDS)); + // Verify that the CompleteListener is invoked only once. + await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(completed::get, is(1)); + } } }