Skip to content

Commit

Permalink
Merged branch 'jetty-12.0.x' into 'jetty-12.1.x'.
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Dec 17, 2024
2 parents b817ba8 + 68735dc commit 4d6049e
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public HttpRequest getRequest()

public Throwable getRequestFailure()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return requestFailure;
}
Expand All @@ -87,7 +87,7 @@ public HttpResponse getResponse()

public Throwable getResponseFailure()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return responseFailure;
}
Expand All @@ -110,7 +110,7 @@ boolean associate(HttpChannel channel)
{
boolean result = false;
boolean abort = false;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
// Only associate if the exchange state is initial,
// as the exchange could be already failed.
Expand All @@ -134,7 +134,7 @@ boolean associate(HttpChannel channel)
void disassociate(HttpChannel channel)
{
boolean abort = false;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED)
abort = true;
Expand All @@ -147,22 +147,23 @@ void disassociate(HttpChannel channel)

private HttpChannel getHttpChannel()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return _channel;
}
}

public boolean requestComplete(Throwable failure)
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return completeRequest(failure);
return lockedCompleteRequest(failure);
}
}

private boolean completeRequest(Throwable failure)
private boolean lockedCompleteRequest(Throwable failure)
{
assert lock.isHeldByCurrentThread();
if (requestState == State.PENDING)
{
requestState = State.COMPLETED;
Expand All @@ -174,22 +175,23 @@ private boolean completeRequest(Throwable failure)

public boolean isResponseComplete()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return responseState == State.COMPLETED;
}
}

public boolean responseComplete(Throwable failure)
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return completeResponse(failure);
return lockedCompleteResponse(failure);
}
}

private boolean completeResponse(Throwable failure)
private boolean lockedCompleteResponse(Throwable failure)
{
assert lock.isHeldByCurrentThread();
if (responseState == State.PENDING)
{
responseState = State.COMPLETED;
Expand All @@ -202,7 +204,7 @@ private boolean completeResponse(Throwable failure)
public Result terminateRequest()
{
Result result = null;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (requestState == State.COMPLETED)
requestState = State.TERMINATED;
Expand All @@ -219,7 +221,7 @@ public Result terminateRequest()
public Result terminateResponse()
{
Result result = null;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
if (responseState == State.COMPLETED)
responseState = State.TERMINATED;
Expand All @@ -235,7 +237,7 @@ public Result terminateResponse()

boolean isResponseCompleteOrTerminated()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return responseState == State.COMPLETED || responseState == State.TERMINATED;
}
Expand All @@ -245,12 +247,14 @@ public void abort(Throwable failure, Promise<Boolean> promise)
{
// Atomically change the state of this exchange to be completed.
// This will avoid that this exchange can be associated to a channel.
HttpChannel channel;
boolean abortRequest;
boolean abortResponse;
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
abortRequest = completeRequest(failure);
abortResponse = completeResponse(failure);
channel = _channel;
abortRequest = lockedCompleteRequest(failure);
abortResponse = lockedCompleteResponse(failure);
}

if (!abortRequest && !abortResponse)
Expand All @@ -268,7 +272,12 @@ public void abort(Throwable failure, Promise<Boolean> promise)
// request content, notify them of the failure.
Request.Content body = request.getBody();
if (abortRequest && body != null)
{
// This may eventually complete the request,
// and if the response is already completed
// also invoke the Response.CompleteListeners.
body.fail(failure);
}

// Case #1: exchange was in the destination queue.
if (destination.remove(this))
Expand All @@ -280,8 +289,7 @@ public void abort(Throwable failure, Promise<Boolean> promise)
return;
}

HttpChannel channel = getHttpChannel();
if (channel == null)
if (channel == null && abortRequest)
{
// Case #2: exchange was not yet associated.
// Because this exchange is failed, when associate() is called
Expand Down Expand Up @@ -309,7 +317,7 @@ private void notifyFailureComplete(Throwable failure)

public void resetResponse()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
responseState = State.PENDING;
responseFailure = null;
Expand All @@ -327,7 +335,7 @@ public void proceed(Runnable proceedAction, Throwable failure)
@Override
public String toString()
{
try (AutoLock l = lock.lock())
try (AutoLock ignored = lock.lock())
{
return String.format("%s@%x{req=%s[%s/%s] res=%s[%s/%s]}",
HttpExchange.class.getSimpleName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,29 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.client.transport.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.client.transport.internal.HttpConnectionOverHTTP;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -41,7 +50,6 @@ public class HttpClientFailureTest
{
private Server server;
private ServerConnector connector;
private HttpClient client;

private void startServer(Handler handler) throws Exception
{
Expand All @@ -57,30 +65,29 @@ private void startServer(Handler handler) throws Exception
@AfterEach
public void dispose() throws Exception
{
if (server != null)
server.stop();
if (client != null)
client.stop();
LifeCycle.stop(server);
}

@Test
public void testFailureBeforeRequestCommit() throws Exception
{
startServer(new EmptyServerHandler());

client = new HttpClient(new HttpClientTransportOverHTTP(1));
client.start();

Request request = client.newRequest("localhost", connector.getLocalPort())
.onRequestHeaders(r -> r.getConnection().close())
.timeout(5, TimeUnit.SECONDS);
assertThrows(ExecutionException.class, request::send);

HttpDestination destination = (HttpDestination)client.resolveDestination(request);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
assertEquals(0, connectionPool.getConnectionCount());
assertEquals(0, connectionPool.getActiveConnections().size());
assertEquals(0, connectionPool.getIdleConnections().size());
try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1)))
{
client.start();

Request request = client.newRequest("localhost", connector.getLocalPort())
.onRequestHeaders(r -> r.getConnection().close())
.timeout(5, TimeUnit.SECONDS);
assertThrows(ExecutionException.class, request::send);

HttpDestination destination = (HttpDestination)client.resolveDestination(request);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
assertEquals(0, connectionPool.getConnectionCount());
assertEquals(0, connectionPool.getActiveConnections().size());
assertEquals(0, connectionPool.getIdleConnections().size());
}
}

@Test
Expand All @@ -89,7 +96,7 @@ public void testFailureAfterRequestCommit() throws Exception
startServer(new EmptyServerHandler());

AtomicReference<HttpConnectionOverHTTP> connectionRef = new AtomicReference<>();
client = new HttpClient(new HttpClientTransportOverHTTP(1)
try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1)
{
@Override
public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
Expand All @@ -98,48 +105,83 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<Stri
connectionRef.set(connection);
return connection;
}
});
client.start();

CountDownLatch commitLatch = new CountDownLatch(1);
CountDownLatch completeLatch = new CountDownLatch(1);
AsyncRequestContent content = new AsyncRequestContent();
client.newRequest("localhost", connector.getLocalPort())
.onRequestCommit(request ->
{
connectionRef.get().getEndPoint().close();
commitLatch.countDown();
})
.body(content)
.idleTimeout(2, TimeUnit.SECONDS)
.send(result ->
}))
{
client.start();

CountDownLatch commitLatch = new CountDownLatch(1);
CountDownLatch completeLatch = new CountDownLatch(1);
AsyncRequestContent content = new AsyncRequestContent();
client.newRequest("localhost", connector.getLocalPort())
.onRequestCommit(request ->
{
connectionRef.get().getEndPoint().close();
commitLatch.countDown();
})
.body(content)
.idleTimeout(2, TimeUnit.SECONDS)
.send(result ->
{
if (result.isFailed())
completeLatch.countDown();
});

assertTrue(commitLatch.await(5, TimeUnit.SECONDS));

// The first chunk will be read but its write will fail.
content.write(ByteBuffer.allocate(1024), Callback.NOOP);

// The second chunk is failed because the content is failed.
CountDownLatch contentLatch = new CountDownLatch(1);
content.write(ByteBuffer.allocate(1024), new Callback()
{
if (result.isFailed())
completeLatch.countDown();
@Override
public void failed(Throwable x)
{
contentLatch.countDown();
}
});

assertTrue(commitLatch.await(5, TimeUnit.SECONDS));

// The first chunk will be read but its write will fail.
content.write(ByteBuffer.allocate(1024), Callback.NOOP);
assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
assertTrue(completeLatch.await(5, TimeUnit.SECONDS));

// The second chunk is failed because the content is failed.
CountDownLatch contentLatch = new CountDownLatch(1);
content.write(ByteBuffer.allocate(1024), new Callback()
{
@Override
public void failed(Throwable x)
{
contentLatch.countDown();
}
});
DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool();
assertEquals(0, connectionPool.getConnectionCount());
assertEquals(0, connectionPool.getActiveConnections().size());
assertEquals(0, connectionPool.getIdleConnections().size());
}
}

assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
@Test
public void testPendingRequestContentThenTotalTimeout() throws Exception
{
startServer(new EmptyServerHandler());

DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool();
assertEquals(0, connectionPool.getConnectionCount());
assertEquals(0, connectionPool.getActiveConnections().size());
assertEquals(0, connectionPool.getIdleConnections().size());
try (HttpClient client = new HttpClient(new HttpClientTransportOverHTTP(1)))
{
client.start();

long timeout = 1000;
AsyncRequestContent content = new AsyncRequestContent();
AtomicInteger completed = new AtomicInteger();
CountDownLatch resultLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.method(HttpMethod.POST)
.body(content)
.timeout(timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
// This is invoked only when the total timeout elapses.
completed.incrementAndGet();
assertThat(result.getRequestFailure(), notNullValue());
assertThat(result.getResponseFailure(), nullValue());
assertThat(result.getResponse().getStatus(), is(HttpStatus.OK_200));
resultLatch.countDown();
});

assertTrue(resultLatch.await(2 * timeout, TimeUnit.MILLISECONDS));
// Verify that the CompleteListener is invoked only once.
await().during(1, TimeUnit.SECONDS).atMost(5, TimeUnit.SECONDS).until(completed::get, is(1));
}
}
}

0 comments on commit 4d6049e

Please sign in to comment.