Skip to content

Commit

Permalink
Merge pull request #90 from liu-cong/body
Browse files Browse the repository at this point in the history
Add response body handler
  • Loading branch information
k8s-ci-robot authored Dec 17, 2024
2 parents e0080b9 + e907f60 commit 7732351
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 13 deletions.
2 changes: 1 addition & 1 deletion pkg/ext-proc/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
}

func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse {
klog.V(3).Info("--- In RequestHeaders processing ...")
klog.V(3).Info("Handling request headers ...")
r := req.Request
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)
klog.V(3).Infof("Headers: %+v\n", h)
Expand Down
58 changes: 58 additions & 0 deletions pkg/ext-proc/handlers/response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package handlers

import (
"encoding/json"
"fmt"

configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
klog "k8s.io/klog/v2"
Expand Down Expand Up @@ -33,3 +36,58 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr
}
return resp, nil
}

// HandleResponseBody parses response body to update information such as number of completion tokens.
// Example response
/*
{
"id": "cmpl-573498d260f2423f9e42817bbba3743a",
"object": "text_completion",
"created": 1732563765,
"model": "meta-llama/Llama-2-7b-hf",
"choices": [
{
"index": 0,
"text": " Chronicle\nThe San Francisco Chronicle has a new book review section, and it's a good one. The reviews are short, but they're well-written and well-informed. The Chronicle's book review section is a good place to start if you're looking for a good book review.\nThe Chronicle's book review section is a good place to start if you're looking for a good book review. The Chronicle's book review section",
"logprobs": null,
"finish_reason": "length",
"stop_reason": null,
"prompt_logprobs": null
}
],
"usage": {
"prompt_tokens": 11,
"total_tokens": 111,
"completion_tokens": 100
}
}*/
func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
klog.V(3).Info("Processing HandleResponseBody")
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)

res := Response{}
if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil {
return nil, fmt.Errorf("unmarshaling response body: %v", err)
}
reqCtx.Response = res
klog.V(3).Infof("Response: %+v", res)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
ResponseBody: &extProcPb.BodyResponse{
Response: &extProcPb.CommonResponse{},
},
},
}
return resp, nil
}

type Response struct {
Usage Usage `json:"usage"`
}

type Usage struct {
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
}
87 changes: 87 additions & 0 deletions pkg/ext-proc/handlers/response_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package handlers

import (
"testing"

extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"github.com/google/go-cmp/cmp"
)

const (
body = `
{
"id": "cmpl-573498d260f2423f9e42817bbba3743a",
"object": "text_completion",
"created": 1732563765,
"model": "meta-llama/Llama-2-7b-hf",
"choices": [
{
"index": 0,
"text": " Chronicle\nThe San Francisco Chronicle has a new book review section, and it's a good one. The reviews are short, but they're well-written and well-informed. The Chronicle's book review section is a good place to start if you're looking for a good book review.\nThe Chronicle's book review section is a good place to start if you're looking for a good book review. The Chronicle's book review section",
"logprobs": null,
"finish_reason": "length",
"stop_reason": null,
"prompt_logprobs": null
}
],
"usage": {
"prompt_tokens": 11,
"total_tokens": 111,
"completion_tokens": 100
}
}
`
)

func TestHandleResponseBody(t *testing.T) {
tests := []struct {
name string
req *extProcPb.ProcessingRequest_ResponseBody
want Response
wantErr bool
}{
{
name: "success",
req: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte(body),
},
},
want: Response{
Usage: Usage{
PromptTokens: 11,
TotalTokens: 111,
CompletionTokens: 100,
},
},
},
{
name: "malformed response",
req: &extProcPb.ProcessingRequest_ResponseBody{
ResponseBody: &extProcPb.HttpBody{
Body: []byte("malformed json"),
},
},
wantErr: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
server := &Server{}
reqCtx := &RequestContext{}
_, err := server.HandleResponseBody(reqCtx, &extProcPb.ProcessingRequest{Request: test.req})

if err != nil {
if !test.wantErr {
t.Fatalf("HandleResponseBody returned unexpected error: %v, want %v", err, test.wantErr)
}
return
}

if diff := cmp.Diff(test.want, reqCtx.Response); diff != "" {
t.Errorf("HandleResponseBody returned unexpected response, diff(-want, +got): %v", diff)
}
})
}
}
14 changes: 9 additions & 5 deletions pkg/ext-proc/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type Server struct {
}

type Scheduler interface {
Schedule(b *scheduling.LLMRequest) (targetPod *backend.Pod, err error)
Schedule(b *scheduling.LLMRequest) (targetPod backend.Pod, err error)
}

// PodProvider is an interface to provide set of pods in the backend and information such as metrics.
Expand Down Expand Up @@ -77,13 +77,16 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
switch v := req.Request.(type) {
case *extProcPb.ProcessingRequest_RequestHeaders:
resp = HandleRequestHeaders(reqCtx, req)
klog.V(3).Infof("Request context after HandleRequestHeaders: %v", reqCtx)
klog.V(3).Infof("Request context after HandleRequestHeaders: %+v", reqCtx)
case *extProcPb.ProcessingRequest_RequestBody:
resp, err = s.HandleRequestBody(reqCtx, req)
klog.V(3).Infof("Request context after HandleRequestBody: %v", reqCtx)
klog.V(3).Infof("Request context after HandleRequestBody: %+v", reqCtx)
case *extProcPb.ProcessingRequest_ResponseHeaders:
resp, err = s.HandleResponseHeaders(reqCtx, req)
klog.V(3).Infof("Request context after HandleResponseHeaders: %v", reqCtx)
klog.V(3).Infof("Request context after HandleResponseHeaders: %+v", reqCtx)
case *extProcPb.ProcessingRequest_ResponseBody:
resp, err = s.HandleResponseBody(reqCtx, req)
klog.V(3).Infof("Request context after HandleResponseBody: %+v", reqCtx)
default:
klog.Errorf("Unknown Request type %+v", v)
return status.Error(codes.Unknown, "unknown request type")
Expand Down Expand Up @@ -119,6 +122,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {

// RequestContext stores context information during the life time of an HTTP request.
type RequestContext struct {
TargetPod *backend.Pod
TargetPod backend.Pod
Model string
Response Response
}
6 changes: 3 additions & 3 deletions pkg/ext-proc/scheduling/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ func leastKVCacheFilterFunc(req *LLMRequest, pods []*backend.PodMetrics) ([]*bac
type podPredicate func(req *LLMRequest, pod *backend.PodMetrics) bool

// We consider serving an adapter low cost it the adapter is active in the model server, or the
// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by spreading the
// load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to a single pod.
// This gave good performance in our initial benchmarking results in the scenario where # of lora slots > # of lora adapters.
// model server has room to load the adapter. The lowLoRACostPredicate ensures weak affinity by spreading the
// load of a LoRA adapter across multiple pods, avoiding "pinning" all requests to a single pod.
// This gave good performance in our initial benchmarking results in the scenario where # of lora slots > # of lora adapters.
func lowLoRACostPredicate(req *LLMRequest, pod *backend.PodMetrics) bool {
_, ok := pod.ActiveModels[req.ResolvedTargetModel]
return ok || len(pod.ActiveModels) < pod.MaxActiveModels
Expand Down
6 changes: 3 additions & 3 deletions pkg/ext-proc/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,13 @@ type PodMetricsProvider interface {
}

// Schedule finds the target pod based on metrics and the requested lora adapter.
func (s *Scheduler) Schedule(req *LLMRequest) (targetPod *backend.Pod, err error) {
func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) {
klog.V(3).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics())
pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics())
if err != nil || len(pods) == 0 {
return nil, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
return backend.Pod{}, fmt.Errorf("failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
}
klog.V(3).Infof("Going to randomly select a pod from the candidates: %+v", pods)
i := rand.Intn(len(pods))
return &pods[i].Pod, nil
return pods[i].Pod, nil
}
3 changes: 2 additions & 1 deletion pkg/manifests/ext_proc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ spec:
processingMode:
request:
body: Buffered
response: {}
response:
body: Buffered
# The timeouts are likely not needed here. We can experiment with removing/tuning them slowly.
# The connection limits are more important and will cause the opaque: ext_proc_gRPC_error_14 error in Envoy GW if not configured correctly.
messageTimeout: 1000s
Expand Down

0 comments on commit 7732351

Please sign in to comment.