Skip to content

Commit

Permalink
Updating grpc handler to attempt to wait for state change before clos…
Browse files Browse the repository at this point in the history
…ing GRPC connections.

This should address part 1 of #807 and #912
  • Loading branch information
nathanejohnson committed Nov 18, 2022
1 parent 87c4da1 commit daae223
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 85 deletions.
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ type Proxy struct {
AuthSchemes map[string]AuthScheme
GRPCMaxRxMsgSize int
GRPCMaxTxMsgSize int
GRPCGShutdownTimeout time.Duration
}

type STSHeader struct {
Expand Down
25 changes: 13 additions & 12 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,19 @@ var defaultConfig = &Config{
},
},
Proxy: Proxy{
MaxConn: 10000,
Strategy: "rnd",
Matcher: "prefix",
NoRouteStatus: 404,
DialTimeout: 30 * time.Second,
FlushInterval: time.Second,
GlobalFlushInterval: 0,
LocalIP: LocalIPString(),
AuthSchemes: map[string]AuthScheme{},
IdleConnTimeout: 15 * time.Second,
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
MaxConn: 10000,
Strategy: "rnd",
Matcher: "prefix",
NoRouteStatus: 404,
DialTimeout: 30 * time.Second,
FlushInterval: time.Second,
GlobalFlushInterval: 0,
LocalIP: LocalIPString(),
AuthSchemes: map[string]AuthScheme{},
IdleConnTimeout: 15 * time.Second,
GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M
GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M
GRPCGShutdownTimeout: time.Second * 2,
},
Registry: Registry{
Backend: "consul",
Expand Down
4 changes: 2 additions & 2 deletions config/kvslice.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ import (

// parseKVSlice parses a configuration string in the form
//
// key=val;key=val,key=val;key=val
// key=val;key=val,key=val;key=val
//
// into a list of string maps. maps are separated by comma and key/value
// pairs within a map are separated by semicolons. The first key/value
// pair of a map can omit the key and its value will be stored under the
// empty key. This allows support of legacy configuration formats which
// are
//
// val;opt1=val1;opt2=val2;...
// val;opt1=val1;opt2=val2;...
func parseKVSlice(in string) ([]map[string]string, error) {
var keyOrFirstVal string
maps := []map[string]string{}
Expand Down
1 change: 1 addition & 0 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c
f.BoolVar(&cfg.Proxy.STSHeader.Preload, "proxy.header.sts.preload", defaultConfig.Proxy.STSHeader.Preload, "direct HSTS to pass the preload directive")
f.IntVar(&cfg.Proxy.GRPCMaxRxMsgSize, "proxy.grpcmaxrxmsgsize", defaultConfig.Proxy.GRPCMaxRxMsgSize, "max grpc receive message size (in bytes)")
f.IntVar(&cfg.Proxy.GRPCMaxTxMsgSize, "proxy.grpcmaxtxmsgsize", defaultConfig.Proxy.GRPCMaxTxMsgSize, "max grpc transmit message size (in bytes)")
f.DurationVar(&cfg.Proxy.GRPCGShutdownTimeout, "proxy.grpcshutdowntimeout", defaultConfig.Proxy.GRPCGShutdownTimeout, "amount of time to wait for graceful shutdown of grpc backend")
f.StringVar(&gzipContentTypesValue, "proxy.gzip.contenttype", defaultValues.GZIPContentTypesValue, "regexp of content types to compress")
f.StringVar(&listenerValue, "proxy.addr", defaultValues.ListenerValue, "listener config")
f.StringVar(&certSourcesValue, "proxy.cs", defaultValues.CertSourcesValue, "certificate sources")
Expand Down
37 changes: 18 additions & 19 deletions demo/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,33 @@
//
// During startup the server performs the following steps:
//
// * Add a handler for each prefix which provides a unique
// response for that instance and endpoint
// * Add a `/health` handler for the consul health check
// * Register the service in consul with the listen address,
// a health check under the given name and with one `urlprefix-`
// tag per prefix
// * Install a signal handler to deregister the service on exit
// - Add a handler for each prefix which provides a unique
// response for that instance and endpoint
// - Add a `/health` handler for the consul health check
// - Register the service in consul with the listen address,
// a health check under the given name and with one `urlprefix-`
// tag per prefix
// - Install a signal handler to deregister the service on exit
//
// If the protocol is set to "ws" the registered endpoints function
// as websocket echo servers.
//
// Example:
//
// # http server
// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
// # http server
// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar
// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar
// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/"
//
// # https server
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
// # https server
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo
// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true"
//
// # websocket server
// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
//
// # tcp server
// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
// # websocket server
// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2
//
// # tcp server
// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234
package main

import (
Expand Down
63 changes: 31 additions & 32 deletions logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,37 @@
// takes place. Text between two fields is printed verbatim. See the common
// log file formats for an example.
//
// $header.<name> - request http header (name: [a-zA-Z0-9-]+)
// $remote_addr - host:port of remote client
// $remote_host - host of remote client
// $remote_port - port of remote client
// $request - request <method> <uri> <proto>
// $request_args - request query parameters
// $request_host - request host header (aka server name)
// $request_method - request method
// $request_scheme - request scheme
// $request_uri - request URI
// $request_url - request URL
// $request_proto - request protocol
// $response_body_size - response body size in bytes
// $response_status - response status code
// $response_time_ms - response time in S.sss format
// $response_time_us - response time in S.ssssss format
// $response_time_ns - response time in S.sssssssss format
// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
// $time_unix_ms - log timestamp in unix epoch ms
// $time_unix_us - log timestamp in unix epoch us
// $time_unix_ns - log timestamp in unix epoch ns
// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
// $upstream_addr - host:port of upstream server
// $upstream_host - host of upstream server
// $upstream_port - port of upstream server
// $upstream_request_scheme - upstream request scheme
// $upstream_request_uri - upstream request URI
// $upstream_request_url - upstream request URL
//
// $header.<name> - request http header (name: [a-zA-Z0-9-]+)
// $remote_addr - host:port of remote client
// $remote_host - host of remote client
// $remote_port - port of remote client
// $request - request <method> <uri> <proto>
// $request_args - request query parameters
// $request_host - request host header (aka server name)
// $request_method - request method
// $request_scheme - request scheme
// $request_uri - request URI
// $request_url - request URL
// $request_proto - request protocol
// $response_body_size - response body size in bytes
// $response_status - response status code
// $response_time_ms - response time in S.sss format
// $response_time_us - response time in S.ssssss format
// $response_time_ns - response time in S.sssssssss format
// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format
// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format
// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format
// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format
// $time_unix_ms - log timestamp in unix epoch ms
// $time_unix_us - log timestamp in unix epoch us
// $time_unix_ns - log timestamp in unix epoch ns
// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ
// $upstream_addr - host:port of upstream server
// $upstream_host - host of upstream server
// $upstream_port - port of upstream server
// $upstream_request_scheme - upstream request scheme
// $upstream_request_uri - upstream request URI
// $upstream_request_url - upstream request URL
package logger

import (
Expand Down
26 changes: 16 additions & 10 deletions proxy/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string)
return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.GlobCache, g.Config.GlobMatchingDisabled), nil
}

//grpc client can specify a destination host in metadata by key 'dsthost', e.g. dsthost=betatest
//the backend service(s) tags should be urlprefix-betatest/grpcpackage.servicename proto=grpc
//the 'betatest' will be parsed as 'host' and '/grpcpackage.servicename' is the 'path',
//a route record will be setup in route Table, t['betatest']
//the dstHost is extracted from context's metadata of grpc client, that will trigger t[dstHost] is used.
//if t[dstHost] not exists, fallback to t[""] is used
//dstHost will be "" as before if not specified by grpc client side.
// grpc client can specify a destination host in metadata by key 'dsthost', e.g. dsthost=betatest
// the backend service(s) tags should be urlprefix-betatest/grpcpackage.servicename proto=grpc
// the 'betatest' will be parsed as 'host' and '/grpcpackage.servicename' is the 'path',
// a route record will be setup in route Table, t['betatest']
// the dstHost is extracted from context's metadata of grpc client, that will trigger t[dstHost] is used.
// if t[dstHost] not exists, fallback to t[""] is used
// dstHost will be "" as before if not specified by grpc client side.
func (g GrpcProxyInterceptor) getDestinationHostFromMetadata(md metadata.MD) (dstHost string) {
dstHost = ""
hosts := md["dsthost"]
Expand Down Expand Up @@ -299,15 +299,21 @@ func (p *grpcConnectionPool) cleanup() {
p.lock.Lock()
table := route.GetTable()
for tKey, cs := range p.connections {
if cs.GetState() == connectivity.Shutdown {
state := cs.GetState()
if state == connectivity.Shutdown {
delete(p.connections, tKey)
continue
}

if !hasTarget(tKey, table) {
log.Println("[DEBUG] grpc: cleaning up connection to", tKey)
cs.Close()
delete(p.connections, tKey)
go func(cs *grpc.ClientConn) {
ctx, cancel := context.WithTimeout(context.Background(), p.cfg.Proxy.GRPCGShutdownTimeout)
defer cancel()
// wait for state to change, or timeout, before closing, in case it's still handling traffic.
cs.WaitForStateChange(ctx, state)
cs.Close()
}(cs)
}
}
p.lock.Unlock()
Expand Down
1 change: 0 additions & 1 deletion proxy/http_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func addResponseHeaders(w http.ResponseWriter, r *http.Request, cfg config.Proxy
// * add X-Real-Ip, if not present
// * ClientIPHeader != "": Set header with that name to <remote ip>
// * TLS connection: Set header with name from `cfg.TLSHeader` to `cfg.TLSHeaderValue`
//
func addHeaders(r *http.Request, cfg config.Proxy, stripPath string) error {
remoteIP, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion proxy/http_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (
globDisabled = true
)

//Global GlobCache for Testing
// Global GlobCache for Testing
var globCache = route.NewGlobCache(1000)

func TestProxyProducesCorrectXForwardedSomethingHeader(t *testing.T) {
Expand Down
5 changes: 2 additions & 3 deletions registry/consul/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ const (
// consul. To wait for completion the caller should read the next value from
// the dereg channel.
//
// dereg <- true // trigger deregistration
// <-dereg // wait for completion
//
// dereg <- true // trigger deregistration
// <-dereg // wait for completion
func register(c *api.Client, service *api.AgentServiceRegistration) chan bool {
registered := func(serviceID string) bool {
if serviceID == "" {
Expand Down
2 changes: 0 additions & 2 deletions route/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,3 @@ var randIntn = func(n int) int {
}
return rand.Intn(n)
}


2 changes: 1 addition & 1 deletion route/picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ var oldRandInt = func(n int) int {
if n == 0 {
return 0
}
return int(time.Now().UnixNano()/int64(time.Microsecond) % int64(n))
return int(time.Now().UnixNano() / int64(time.Microsecond) % int64(n))
}

var result int // prevent compiler optimization
Expand Down
2 changes: 1 addition & 1 deletion route/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func SetTable(t Table) {
type Table map[string]Routes

// hostpath splits a 'host/path' prefix into 'host' and '/path' or it returns a
// ':port' prefix as ':port' and '' since there is no path component for TCP
// ':port' prefix as ':port' and since there is no path component for TCP
// connections.
func hostpath(prefix string) (host string, path string) {
if strings.HasPrefix(prefix, ":") {
Expand Down
2 changes: 1 addition & 1 deletion route/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
globDisabled = true
)

//Global GlobCache for Testing
// Global GlobCache for Testing
var globCache = NewGlobCache(1000)

func TestTableParse(t *testing.T) {
Expand Down

0 comments on commit daae223

Please sign in to comment.