From 8b3f9f9e59ae64896db95cb87360d9a2efb877ab Mon Sep 17 00:00:00 2001 From: shirly121 Date: Tue, 31 Dec 2024 18:36:00 +0800 Subject: [PATCH 1/3] support gremlin http service --- .../gremlin/auth/IrAuthenticationHandler.java | 80 ++++++- .../gremlin/plugin/processor/HttpContext.java | 113 +++++++++ .../gremlin/service/IrHttpGremlinHandler.java | 217 ++++++++++++++++++ .../service/IrWsAndHttpChannelizer.java | 10 +- .../server/handler/HttpHandlerUtils.java | 52 +++++ 5 files changed, 466 insertions(+), 6 deletions(-) create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java create mode 100644 interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrHttpGremlinHandler.java create mode 100644 interactive_engine/compiler/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtils.java diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/auth/IrAuthenticationHandler.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/auth/IrAuthenticationHandler.java index 002b9468b1a9..2b9cf66b2d37 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/auth/IrAuthenticationHandler.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/auth/IrAuthenticationHandler.java @@ -28,6 +28,8 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpMessage; +import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.util.Attribute; import io.netty.util.AttributeMap; @@ -42,6 +44,7 @@ import org.apache.tinkerpop.gremlin.server.auth.Authenticator; import org.apache.tinkerpop.gremlin.server.authz.Authorizer; import org.apache.tinkerpop.gremlin.server.handler.AbstractAuthenticationHandler; +import org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtils; import org.apache.tinkerpop.gremlin.server.handler.SaslAuthenticationHandler; import org.apache.tinkerpop.gremlin.server.handler.StateKey; import org.slf4j.Logger; @@ -50,6 +53,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.nio.charset.Charset; import java.util.Base64; import java.util.HashMap; import java.util.Map; @@ -205,9 +209,70 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw ctx.writeAndFlush(error); } } + } else if (msg instanceof FullHttpMessage) { // add Authentication for HTTP requests + FullHttpMessage request = (FullHttpMessage) msg; + + if (!authenticator.requireAuthentication()) { + ctx.fireChannelRead(request); + return; + } + + String errorMsg = + "Invalid HTTP Header for Authentication. Expected format: 'Authorization: Basic" + + " '"; + + if (!request.headers().contains("Authorization")) { + sendError(ctx, errorMsg, request); + return; + } + + String authorizationHeader = request.headers().get("Authorization"); + if (!authorizationHeader.startsWith("Basic ")) { + sendError(ctx, errorMsg, request); + return; + } + + String authorization; + byte[] decodedUserPass; + try { + authorization = authorizationHeader.substring("Basic ".length()); + decodedUserPass = BASE64_DECODER.decode(authorization); + } catch (Exception e) { + sendError(ctx, errorMsg, request); + return; + } + + authorization = new String(decodedUserPass, Charset.forName("UTF-8")); + String[] split = authorization.split(":"); + if (split.length != 2) { + sendError( + ctx, + "Invalid username or password after decoding the Base64 Authorization" + + " header.", + request); + return; + } + + Map credentials = new HashMap(); + credentials.put("username", split[0]); + credentials.put("password", split[1]); + String address = ctx.channel().remoteAddress().toString(); + if (address.startsWith("/") && address.length() > 1) { + address = address.substring(1); + } + + credentials.put("address", address); + + try { + AuthenticatedUser user = authenticator.authenticate(credentials); + ctx.channel().attr(StateKey.AUTHENTICATED_USER).set(user); + ctx.fireChannelRead(request); + } catch (AuthenticationException e) { + sendError(ctx, e.getMessage(), request); + } } else { logger.warn( - "{} only processes RequestMessage instances - received {} - channel closing", + "{} received invalid request message {} - channel closing", this.getClass().getSimpleName(), msg.getClass()); ctx.close(); @@ -226,4 +291,17 @@ private InetAddress getRemoteInetAddress(final ChannelHandlerContext ctx) { return ((InetSocketAddress) genericSocketAddr).getAddress(); } + + private void sendError( + final ChannelHandlerContext ctx, String errorMsg, FullHttpMessage request) { + HttpHandlerUtils.sendError(ctx, HttpResponseStatus.UNAUTHORIZED, errorMsg, false); + if (request.refCnt() > 0) { + boolean fullyReleased = request.release(); + if (!fullyReleased) { + logger.warn( + "http request message was not fully released, may cause a" + + " memory leak"); + } + } + } } diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java new file mode 100644 index 000000000000..1c33014d15ad --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java @@ -0,0 +1,113 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.gremlin.plugin.processor; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.*; + +import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; +import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; +import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer; +import org.apache.tinkerpop.gremlin.driver.ser.SerializationException; +import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; +import org.apache.tinkerpop.gremlin.server.Context; +import org.apache.tinkerpop.gremlin.server.GraphManager; +import org.apache.tinkerpop.gremlin.server.Settings; +import org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtils; +import org.javatuples.Pair; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Maintain the gremlin execution context for http request. + */ +public class HttpContext extends Context { + private final Pair> serializer; + private final boolean keepAlive; + private final AtomicReference headerSent; + + public HttpContext( + RequestMessage requestMessage, + ChannelHandlerContext ctx, + Settings settings, + GraphManager graphManager, + GremlinExecutor gremlinExecutor, + ScheduledExecutorService scheduledExecutorService, + Pair> serializer, + boolean keepAlive) { + super( + requestMessage, + ctx, + settings, + graphManager, + gremlinExecutor, + scheduledExecutorService); + this.serializer = Objects.requireNonNull(serializer); + this.keepAlive = keepAlive; + this.headerSent = new AtomicReference<>(false); + } + + /** + * serialize the response message to http response and write to http channel. + * @param responseMessage + */ + @Override + public void writeAndFlush(final ResponseMessage responseMessage) { + try { + // send header once + if (!headerSent.compareAndSet(false, true)) { + FullHttpResponse chunkedResponse = + new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + chunkedResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, serializer.getValue0()); + chunkedResponse + .headers() + .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + this.getChannelHandlerContext().writeAndFlush(chunkedResponse); + } + ByteBuf byteBuf = + Unpooled.wrappedBuffer( + serializer + .getValue1() + .serializeResponseAsString(responseMessage) + .getBytes(StandardCharsets.UTF_8)); + FullHttpResponse response = + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); + ChannelFuture channelFuture = this.getChannelHandlerContext().writeAndFlush(response); + ResponseStatusCode statusCode = responseMessage.getStatus().getCode(); + if (!keepAlive && statusCode.isFinalResponse()) { + channelFuture.addListener(ChannelFutureListener.CLOSE); + } + } catch (SerializationException e) { + HttpHandlerUtils.sendError( + this.getChannelHandlerContext(), + HttpResponseStatus.INTERNAL_SERVER_ERROR, + e.getMessage(), + keepAlive); + } + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrHttpGremlinHandler.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrHttpGremlinHandler.java new file mode 100644 index 000000000000..13fc9fb6998d --- /dev/null +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrHttpGremlinHandler.java @@ -0,0 +1,217 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package com.alibaba.graphscope.gremlin.service; + +import com.alibaba.graphscope.gremlin.Utils; +import com.alibaba.graphscope.gremlin.plugin.processor.HttpContext; +import com.alibaba.graphscope.gremlin.plugin.processor.IrOpLoader; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.TooLongFrameException; +import io.netty.handler.codec.http.*; + +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.tinkerpop.gremlin.driver.MessageSerializer; +import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; +import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer; +import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; +import org.apache.tinkerpop.gremlin.server.GraphManager; +import org.apache.tinkerpop.gremlin.server.OpProcessor; +import org.apache.tinkerpop.gremlin.server.Settings; +import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler; +import org.apache.tinkerpop.gremlin.server.handler.HttpHandlerUtils; +import org.javatuples.Pair; +import org.javatuples.Quartet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class IrHttpGremlinHandler extends HttpGremlinEndpointHandler { + private static final Logger logger = LoggerFactory.getLogger(HttpGremlinEndpointHandler.class); + private final Map> serializers; + private final GremlinExecutor gremlinExecutor; + private final GraphManager graphManager; + private final Settings settings; + private final Pattern pattern; + + public IrHttpGremlinHandler( + final Map> serializers, + final GremlinExecutor gremlinExecutor, + final GraphManager graphManager, + final Settings settings) { + super(serializers, gremlinExecutor, graphManager, settings); + this.serializers = + Utils.getFieldValue(HttpGremlinEndpointHandler.class, this, "serializers"); + this.gremlinExecutor = + Utils.getFieldValue(HttpGremlinEndpointHandler.class, this, "gremlinExecutor"); + this.graphManager = + Utils.getFieldValue(HttpGremlinEndpointHandler.class, this, "graphManager"); + this.settings = Utils.getFieldValue(HttpGremlinEndpointHandler.class, this, "settings"); + this.pattern = Pattern.compile("(.*);q=(.*)"); + } + + /** + * Convert {@code FullHttpRequest} to {@code RequestMessage}, and process the request by {@code IrStandardOpProcessor} + * @param ctx + * @param msg + */ + @Override + public void channelRead(final ChannelHandlerContext ctx, final Object msg) { + if (msg instanceof FullHttpRequest) { + FullHttpRequest req = (FullHttpRequest) msg; + boolean keepAlive = HttpUtil.isKeepAlive(req); + try { + if ("/favicon.ico".equals(req.uri())) { + HttpHandlerUtils.sendError( + ctx, + HttpResponseStatus.NOT_FOUND, + "Gremlin Server doesn't have a favicon.ico", + keepAlive); + return; + } + + if (HttpUtil.is100ContinueExpected(req)) { + ctx.write( + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE)); + } + + if (req.method() != HttpMethod.GET && req.method() != HttpMethod.POST) { + HttpHandlerUtils.sendError( + ctx, + HttpResponseStatus.METHOD_NOT_ALLOWED, + HttpResponseStatus.METHOD_NOT_ALLOWED.toString(), + keepAlive); + return; + } + + RequestMessage requestMessage; + try { + requestMessage = createRequestMessage(req); + } catch (Exception e) { + HttpHandlerUtils.sendError( + ctx, HttpResponseStatus.BAD_REQUEST, e.getMessage(), keepAlive); + return; + } + + String acceptString = + Optional.ofNullable(req.headers().get("Accept")).orElse("application/json"); + Pair> serializer = + this.chooseSerializer(acceptString); + if (null == serializer) { + HttpHandlerUtils.sendError( + ctx, + HttpResponseStatus.BAD_REQUEST, + String.format( + "no serializer for requested Accept header: %s", acceptString), + keepAlive); + return; + } + + HttpContext context = + new HttpContext( + requestMessage, + ctx, + this.settings, + this.graphManager, + this.gremlinExecutor, + this.gremlinExecutor.getScheduledExecutorService(), + serializer, + keepAlive); + + OpProcessor opProcessor = IrOpLoader.getProcessor("").get(); + opProcessor.select(context).accept(context); + } catch (Exception e) { + Throwable t = ExceptionUtils.getRootCause(e); + if (t instanceof TooLongFrameException) { + HttpHandlerUtils.sendError( + ctx, + HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE, + t.getMessage() + " - increase the maxContentLength", + false); + } else { + String message = (t != null) ? t.getMessage() : e.getMessage(); + HttpHandlerUtils.sendError( + ctx, HttpResponseStatus.INTERNAL_SERVER_ERROR, message, keepAlive); + } + } finally { + if (req.refCnt() > 0) { + boolean fullyRelease = req.release(); + if (!fullyRelease) { + logger.warn( + "http request message was not fully released, may cause a" + + " memory leak"); + } + } + } + } + } + + private Pair> chooseSerializer(final String acceptString) { + List> ordered = + Stream.of(acceptString.split(",")) + .map( + (mediaType) -> { + Matcher matcher = pattern.matcher(mediaType); + return matcher.matches() + ? Pair.with( + matcher.group(1), + Double.parseDouble(matcher.group(2))) + : Pair.with(mediaType, 1.0); + }) + .sorted( + (o1, o2) -> { + return ((String) o2.getValue0()) + .compareTo((String) o1.getValue0()); + }) + .collect(Collectors.toList()); + Iterator var3 = ordered.iterator(); + + String accept; + do { + if (!var3.hasNext()) { + return null; + } + + Pair p = (Pair) var3.next(); + accept = p.getValue0().equals("*/*") ? "application/json" : p.getValue0(); + } while (!this.serializers.containsKey(accept)); + + return Pair.with(accept, (MessageTextSerializer) this.serializers.get(accept)); + } + + private RequestMessage createRequestMessage(FullHttpRequest httpRequest) { + Quartet, String, Map> req = + HttpHandlerUtils.getRequestArguments(httpRequest); + return RequestMessage.build("eval") + .addArg("gremlin", req.getValue0()) + .addArg("bindings", req.getValue1()) + .addArg("language", req.getValue2()) + .addArg("aliases", req.getValue3()) + .create(); + } +} diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrWsAndHttpChannelizer.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrWsAndHttpChannelizer.java index 479587b5bfbc..b81be891efd0 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrWsAndHttpChannelizer.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/service/IrWsAndHttpChannelizer.java @@ -20,22 +20,22 @@ import org.apache.tinkerpop.gremlin.server.AbstractChannelizer; import org.apache.tinkerpop.gremlin.server.channel.WsAndHttpChannelizer; -import org.apache.tinkerpop.gremlin.server.handler.HttpGremlinEndpointHandler; import org.apache.tinkerpop.gremlin.server.handler.OpSelectorHandler; import org.apache.tinkerpop.gremlin.server.handler.WsAndHttpChannelizerHandler; import org.apache.tinkerpop.gremlin.server.util.ServerGremlinExecutor; // config the channelizer in conf/gremlin-server.yaml to set the IrOpSelectorHandler as the default public class IrWsAndHttpChannelizer extends WsAndHttpChannelizer { - private WsAndHttpChannelizerHandler handler; @Override public void init(ServerGremlinExecutor serverGremlinExecutor) { super.init(serverGremlinExecutor); - this.handler = new WsAndHttpChannelizerHandler(); - this.handler.init( + WsAndHttpChannelizerHandler handler = + Utils.getFieldValue(WsAndHttpChannelizer.class, this, "handler"); + // reset http handler for gremlin request + handler.init( serverGremlinExecutor, - new HttpGremlinEndpointHandler( + new IrHttpGremlinHandler( this.serializers, this.gremlinExecutor, this.graphManager, this.settings)); OpSelectorHandler irOpSelectorHandler = new IrOpSelectorHandler( diff --git a/interactive_engine/compiler/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtils.java b/interactive_engine/compiler/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtils.java new file mode 100644 index 000000000000..926e038047db --- /dev/null +++ b/interactive_engine/compiler/src/main/java/org/apache/tinkerpop/gremlin/server/handler/HttpHandlerUtils.java @@ -0,0 +1,52 @@ +/* + * + * * Copyright 2020 Alibaba Group Holding Limited. + * * + * * Licensed under the Apache License, Version 2.0 (the "License"); + * * you may not use this file except in compliance with the License. + * * You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.tinkerpop.gremlin.server.handler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; + +import org.javatuples.Quartet; + +import java.util.Map; +import java.util.Optional; + +public class HttpHandlerUtils { + public static final Quartet, String, Map> + getRequestArguments(final FullHttpRequest request) { + return HttpHandlerUtil.getRequestArguments(request); + } + + public static final void sendError( + final ChannelHandlerContext ctx, + final HttpResponseStatus status, + final String message, + final boolean keepAlive) { + sendError(ctx, status, message, Optional.empty(), keepAlive); + } + + public static final void sendError( + final ChannelHandlerContext ctx, + final HttpResponseStatus status, + final String message, + final Optional t, + final boolean keepAlive) { + HttpHandlerUtil.sendError(ctx, status, message, t, keepAlive); + } +} From bcd5db5504c777fcfa6ffc4dfc53cf558f4385cb Mon Sep 17 00:00:00 2001 From: shirly121 Date: Mon, 6 Jan 2025 15:01:13 +0800 Subject: [PATCH 2/3] skip last empty result in gremlin http response --- .../gremlin/plugin/processor/HttpContext.java | 60 ++++++++++--------- 1 file changed, 32 insertions(+), 28 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java index 1c33014d15ad..e236c0bf7429 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java @@ -23,13 +23,16 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.http.*; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import org.apache.commons.lang3.ObjectUtils; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; import org.apache.tinkerpop.gremlin.driver.ser.MessageTextSerializer; -import org.apache.tinkerpop.gremlin.driver.ser.SerializationException; import org.apache.tinkerpop.gremlin.groovy.engine.GremlinExecutor; import org.apache.tinkerpop.gremlin.server.Context; import org.apache.tinkerpop.gremlin.server.GraphManager; @@ -48,7 +51,7 @@ public class HttpContext extends Context { private final Pair> serializer; private final boolean keepAlive; - private final AtomicReference headerSent; + private final AtomicReference finalResponse; public HttpContext( RequestMessage requestMessage, @@ -68,7 +71,7 @@ public HttpContext( scheduledExecutorService); this.serializer = Objects.requireNonNull(serializer); this.keepAlive = keepAlive; - this.headerSent = new AtomicReference<>(false); + this.finalResponse = new AtomicReference<>(false); } /** @@ -78,31 +81,32 @@ public HttpContext( @Override public void writeAndFlush(final ResponseMessage responseMessage) { try { - // send header once - if (!headerSent.compareAndSet(false, true)) { - FullHttpResponse chunkedResponse = - new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - chunkedResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, serializer.getValue0()); - chunkedResponse - .headers() - .set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); - this.getChannelHandlerContext().writeAndFlush(chunkedResponse); + if (finalResponse.compareAndSet( + false, responseMessage.getStatus().getCode().isFinalResponse())) { + if (responseMessage.getStatus().getCode() == ResponseStatusCode.SUCCESS) { + Object data = responseMessage.getResult().getData(); + if (!keepAlive && ObjectUtils.isEmpty(data)) { + this.getChannelHandlerContext().close(); + return; + } + } + ByteBuf byteBuf = + Unpooled.wrappedBuffer( + serializer + .getValue1() + .serializeResponseAsString(responseMessage) + .getBytes(StandardCharsets.UTF_8)); + FullHttpResponse response = + new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); + ChannelFuture channelFuture = + this.getChannelHandlerContext().writeAndFlush(response); + ResponseStatusCode statusCode = responseMessage.getStatus().getCode(); + if (!keepAlive && statusCode.isFinalResponse()) { + channelFuture.addListener(ChannelFutureListener.CLOSE); + } } - ByteBuf byteBuf = - Unpooled.wrappedBuffer( - serializer - .getValue1() - .serializeResponseAsString(responseMessage) - .getBytes(StandardCharsets.UTF_8)); - FullHttpResponse response = - new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); - ChannelFuture channelFuture = this.getChannelHandlerContext().writeAndFlush(response); - ResponseStatusCode statusCode = responseMessage.getStatus().getCode(); - if (!keepAlive && statusCode.isFinalResponse()) { - channelFuture.addListener(ChannelFutureListener.CLOSE); - } - } catch (SerializationException e) { + } catch (Exception e) { HttpHandlerUtils.sendError( this.getChannelHandlerContext(), HttpResponseStatus.INTERNAL_SERVER_ERROR, From 088d803d1c0b7e5b2a738d4fc73b50af83474f2b Mon Sep 17 00:00:00 2001 From: shirly121 Date: Mon, 6 Jan 2025 17:41:18 +0800 Subject: [PATCH 3/3] minor fix --- .../graphscope/gremlin/plugin/processor/HttpContext.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java index e236c0bf7429..be817817b91c 100644 --- a/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java +++ b/interactive_engine/compiler/src/main/java/com/alibaba/graphscope/gremlin/plugin/processor/HttpContext.java @@ -28,7 +28,6 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; -import org.apache.commons.lang3.ObjectUtils; import org.apache.tinkerpop.gremlin.driver.message.RequestMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseMessage; import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode; @@ -83,13 +82,6 @@ public void writeAndFlush(final ResponseMessage responseMessage) { try { if (finalResponse.compareAndSet( false, responseMessage.getStatus().getCode().isFinalResponse())) { - if (responseMessage.getStatus().getCode() == ResponseStatusCode.SUCCESS) { - Object data = responseMessage.getResult().getData(); - if (!keepAlive && ObjectUtils.isEmpty(data)) { - this.getChannelHandlerContext().close(); - return; - } - } ByteBuf byteBuf = Unpooled.wrappedBuffer( serializer