From daae2234804102bcfedefa39e831440561249966 Mon Sep 17 00:00:00 2001 From: Nathan Johnson Date: Thu, 17 Nov 2022 19:15:54 -0600 Subject: [PATCH] Updating grpc handler to attempt to wait for state change before closing GRPC connections. This should address part 1 of #807 and #912 --- config/config.go | 1 + config/default.go | 25 +++++++------- config/kvslice.go | 4 +-- config/load.go | 1 + demo/server/server.go | 37 ++++++++++---------- logger/logger.go | 63 +++++++++++++++++----------------- proxy/grpc_handler.go | 26 ++++++++------ proxy/http_headers.go | 1 - proxy/http_integration_test.go | 2 +- registry/consul/register.go | 5 ++- route/picker.go | 2 -- route/picker_test.go | 2 +- route/table.go | 2 +- route/table_test.go | 2 +- 14 files changed, 88 insertions(+), 85 deletions(-) diff --git a/config/config.go b/config/config.go index 484af7293..90637f6e4 100644 --- a/config/config.go +++ b/config/config.go @@ -93,6 +93,7 @@ type Proxy struct { AuthSchemes map[string]AuthScheme GRPCMaxRxMsgSize int GRPCMaxTxMsgSize int + GRPCGShutdownTimeout time.Duration } type STSHeader struct { diff --git a/config/default.go b/config/default.go index de9f00594..011e9e315 100644 --- a/config/default.go +++ b/config/default.go @@ -42,18 +42,19 @@ var defaultConfig = &Config{ }, }, Proxy: Proxy{ - MaxConn: 10000, - Strategy: "rnd", - Matcher: "prefix", - NoRouteStatus: 404, - DialTimeout: 30 * time.Second, - FlushInterval: time.Second, - GlobalFlushInterval: 0, - LocalIP: LocalIPString(), - AuthSchemes: map[string]AuthScheme{}, - IdleConnTimeout: 15 * time.Second, - GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M - GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M + MaxConn: 10000, + Strategy: "rnd", + Matcher: "prefix", + NoRouteStatus: 404, + DialTimeout: 30 * time.Second, + FlushInterval: time.Second, + GlobalFlushInterval: 0, + LocalIP: LocalIPString(), + AuthSchemes: map[string]AuthScheme{}, + IdleConnTimeout: 15 * time.Second, + GRPCMaxRxMsgSize: 4 * 1024 * 1024, // 4M + GRPCMaxTxMsgSize: 4 * 1024 * 1024, // 4M + GRPCGShutdownTimeout: time.Second * 2, }, Registry: Registry{ Backend: "consul", diff --git a/config/kvslice.go b/config/kvslice.go index c9843e648..8ae2cbc79 100644 --- a/config/kvslice.go +++ b/config/kvslice.go @@ -8,7 +8,7 @@ import ( // parseKVSlice parses a configuration string in the form // -// key=val;key=val,key=val;key=val +// key=val;key=val,key=val;key=val // // into a list of string maps. maps are separated by comma and key/value // pairs within a map are separated by semicolons. The first key/value @@ -16,7 +16,7 @@ import ( // empty key. This allows support of legacy configuration formats which // are // -// val;opt1=val1;opt2=val2;... +// val;opt1=val1;opt2=val2;... func parseKVSlice(in string) ([]map[string]string, error) { var keyOrFirstVal string maps := []map[string]string{} diff --git a/config/load.go b/config/load.go index f0e38397a..ce26057b0 100644 --- a/config/load.go +++ b/config/load.go @@ -153,6 +153,7 @@ func load(cmdline, environ, envprefix []string, props *properties.Properties) (c f.BoolVar(&cfg.Proxy.STSHeader.Preload, "proxy.header.sts.preload", defaultConfig.Proxy.STSHeader.Preload, "direct HSTS to pass the preload directive") f.IntVar(&cfg.Proxy.GRPCMaxRxMsgSize, "proxy.grpcmaxrxmsgsize", defaultConfig.Proxy.GRPCMaxRxMsgSize, "max grpc receive message size (in bytes)") f.IntVar(&cfg.Proxy.GRPCMaxTxMsgSize, "proxy.grpcmaxtxmsgsize", defaultConfig.Proxy.GRPCMaxTxMsgSize, "max grpc transmit message size (in bytes)") + f.DurationVar(&cfg.Proxy.GRPCGShutdownTimeout, "proxy.grpcshutdowntimeout", defaultConfig.Proxy.GRPCGShutdownTimeout, "amount of time to wait for graceful shutdown of grpc backend") f.StringVar(&gzipContentTypesValue, "proxy.gzip.contenttype", defaultValues.GZIPContentTypesValue, "regexp of content types to compress") f.StringVar(&listenerValue, "proxy.addr", defaultValues.ListenerValue, "listener config") f.StringVar(&certSourcesValue, "proxy.cs", defaultValues.CertSourcesValue, "certificate sources") diff --git a/demo/server/server.go b/demo/server/server.go index d2ba0308d..6daa065f8 100644 --- a/demo/server/server.go +++ b/demo/server/server.go @@ -4,34 +4,33 @@ // // During startup the server performs the following steps: // -// * Add a handler for each prefix which provides a unique -// response for that instance and endpoint -// * Add a `/health` handler for the consul health check -// * Register the service in consul with the listen address, -// a health check under the given name and with one `urlprefix-` -// tag per prefix -// * Install a signal handler to deregister the service on exit +// - Add a handler for each prefix which provides a unique +// response for that instance and endpoint +// - Add a `/health` handler for the consul health check +// - Register the service in consul with the listen address, +// a health check under the given name and with one `urlprefix-` +// tag per prefix +// - Install a signal handler to deregister the service on exit // // If the protocol is set to "ws" the registered endpoints function // as websocket echo servers. // // Example: // -// # http server -// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar -// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar -// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/" +// # http server +// ./server -addr 127.0.0.1:5000 -name svc-a -prefix /foo -prefix /bar +// ./server -addr 127.0.0.1:5001 -name svc-b -prefix /baz -prefix /bar +// ./server -addr 127.0.0.1:5002 -name svc-c -prefix "/gogl redirect=301,https://www.google.de/" // -// # https server -// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo -// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true" +// # https server +// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix /foo +// ./server -addr 127.0.0.1:5000 -name svc-a -proto https -certFile ... -keyFile ... -prefix "/foo tlsskipverify=true" // -// # websocket server -// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2 -// -// # tcp server -// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234 +// # websocket server +// ./server -addr 127.0.0.1:6000 -name ws-a -proto ws -prefix /echo1 -prefix /echo2 // +// # tcp server +// ./server -addr 127.0.0.1:7000 -name tcp-a -proto tcp -prefix :1234 package main import ( diff --git a/logger/logger.go b/logger/logger.go index b9077320e..01361d2f1 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -5,38 +5,37 @@ // takes place. Text between two fields is printed verbatim. See the common // log file formats for an example. // -// $header. - request http header (name: [a-zA-Z0-9-]+) -// $remote_addr - host:port of remote client -// $remote_host - host of remote client -// $remote_port - port of remote client -// $request - request -// $request_args - request query parameters -// $request_host - request host header (aka server name) -// $request_method - request method -// $request_scheme - request scheme -// $request_uri - request URI -// $request_url - request URL -// $request_proto - request protocol -// $response_body_size - response body size in bytes -// $response_status - response status code -// $response_time_ms - response time in S.sss format -// $response_time_us - response time in S.ssssss format -// $response_time_ns - response time in S.sssssssss format -// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format -// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format -// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format -// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format -// $time_unix_ms - log timestamp in unix epoch ms -// $time_unix_us - log timestamp in unix epoch us -// $time_unix_ns - log timestamp in unix epoch ns -// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ -// $upstream_addr - host:port of upstream server -// $upstream_host - host of upstream server -// $upstream_port - port of upstream server -// $upstream_request_scheme - upstream request scheme -// $upstream_request_uri - upstream request URI -// $upstream_request_url - upstream request URL -// +// $header. - request http header (name: [a-zA-Z0-9-]+) +// $remote_addr - host:port of remote client +// $remote_host - host of remote client +// $remote_port - port of remote client +// $request - request +// $request_args - request query parameters +// $request_host - request host header (aka server name) +// $request_method - request method +// $request_scheme - request scheme +// $request_uri - request URI +// $request_url - request URL +// $request_proto - request protocol +// $response_body_size - response body size in bytes +// $response_status - response status code +// $response_time_ms - response time in S.sss format +// $response_time_us - response time in S.ssssss format +// $response_time_ns - response time in S.sssssssss format +// $time_rfc3339 - log timestamp in YYYY-MM-DDTHH:MM:SSZ format +// $time_rfc3339_ms - log timestamp in YYYY-MM-DDTHH:MM:SS.sssZ format +// $time_rfc3339_us - log timestamp in YYYY-MM-DDTHH:MM:SS.ssssssZ format +// $time_rfc3339_ns - log timestamp in YYYY-MM-DDTHH:MM:SS.sssssssssZ format +// $time_unix_ms - log timestamp in unix epoch ms +// $time_unix_us - log timestamp in unix epoch us +// $time_unix_ns - log timestamp in unix epoch ns +// $time_common - log timestamp in DD/MMM/YYYY:HH:MM:SS -ZZZZ +// $upstream_addr - host:port of upstream server +// $upstream_host - host of upstream server +// $upstream_port - port of upstream server +// $upstream_request_scheme - upstream request scheme +// $upstream_request_uri - upstream request URI +// $upstream_request_url - upstream request URL package logger import ( diff --git a/proxy/grpc_handler.go b/proxy/grpc_handler.go index 1c4be8b09..418cbe525 100644 --- a/proxy/grpc_handler.go +++ b/proxy/grpc_handler.go @@ -165,13 +165,13 @@ func (g GrpcProxyInterceptor) lookup(ctx context.Context, fullMethodName string) return route.GetTable().Lookup(req, req.Header.Get("trace"), pick, match, g.GlobCache, g.Config.GlobMatchingDisabled), nil } -//grpc client can specify a destination host in metadata by key 'dsthost', e.g. dsthost=betatest -//the backend service(s) tags should be urlprefix-betatest/grpcpackage.servicename proto=grpc -//the 'betatest' will be parsed as 'host' and '/grpcpackage.servicename' is the 'path', -//a route record will be setup in route Table, t['betatest'] -//the dstHost is extracted from context's metadata of grpc client, that will trigger t[dstHost] is used. -//if t[dstHost] not exists, fallback to t[""] is used -//dstHost will be "" as before if not specified by grpc client side. +// grpc client can specify a destination host in metadata by key 'dsthost', e.g. dsthost=betatest +// the backend service(s) tags should be urlprefix-betatest/grpcpackage.servicename proto=grpc +// the 'betatest' will be parsed as 'host' and '/grpcpackage.servicename' is the 'path', +// a route record will be setup in route Table, t['betatest'] +// the dstHost is extracted from context's metadata of grpc client, that will trigger t[dstHost] is used. +// if t[dstHost] not exists, fallback to t[""] is used +// dstHost will be "" as before if not specified by grpc client side. func (g GrpcProxyInterceptor) getDestinationHostFromMetadata(md metadata.MD) (dstHost string) { dstHost = "" hosts := md["dsthost"] @@ -299,15 +299,21 @@ func (p *grpcConnectionPool) cleanup() { p.lock.Lock() table := route.GetTable() for tKey, cs := range p.connections { - if cs.GetState() == connectivity.Shutdown { + state := cs.GetState() + if state == connectivity.Shutdown { delete(p.connections, tKey) continue } if !hasTarget(tKey, table) { log.Println("[DEBUG] grpc: cleaning up connection to", tKey) - cs.Close() - delete(p.connections, tKey) + go func(cs *grpc.ClientConn) { + ctx, cancel := context.WithTimeout(context.Background(), p.cfg.Proxy.GRPCGShutdownTimeout) + defer cancel() + // wait for state to change, or timeout, before closing, in case it's still handling traffic. + cs.WaitForStateChange(ctx, state) + cs.Close() + }(cs) } } p.lock.Unlock() diff --git a/proxy/http_headers.go b/proxy/http_headers.go index db5aa05b5..7f610c99c 100644 --- a/proxy/http_headers.go +++ b/proxy/http_headers.go @@ -33,7 +33,6 @@ func addResponseHeaders(w http.ResponseWriter, r *http.Request, cfg config.Proxy // * add X-Real-Ip, if not present // * ClientIPHeader != "": Set header with that name to // * TLS connection: Set header with name from `cfg.TLSHeader` to `cfg.TLSHeaderValue` -// func addHeaders(r *http.Request, cfg config.Proxy, stripPath string) error { remoteIP, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { diff --git a/proxy/http_integration_test.go b/proxy/http_integration_test.go index fa081bd3c..45c88e42e 100644 --- a/proxy/http_integration_test.go +++ b/proxy/http_integration_test.go @@ -33,7 +33,7 @@ const ( globDisabled = true ) -//Global GlobCache for Testing +// Global GlobCache for Testing var globCache = route.NewGlobCache(1000) func TestProxyProducesCorrectXForwardedSomethingHeader(t *testing.T) { diff --git a/registry/consul/register.go b/registry/consul/register.go index 89fd65778..7596431ae 100644 --- a/registry/consul/register.go +++ b/registry/consul/register.go @@ -26,9 +26,8 @@ const ( // consul. To wait for completion the caller should read the next value from // the dereg channel. // -// dereg <- true // trigger deregistration -// <-dereg // wait for completion -// +// dereg <- true // trigger deregistration +// <-dereg // wait for completion func register(c *api.Client, service *api.AgentServiceRegistration) chan bool { registered := func(serviceID string) bool { if serviceID == "" { diff --git a/route/picker.go b/route/picker.go index 7828d7ad1..888b46270 100644 --- a/route/picker.go +++ b/route/picker.go @@ -41,5 +41,3 @@ var randIntn = func(n int) int { } return rand.Intn(n) } - - diff --git a/route/picker_test.go b/route/picker_test.go index 42ebd3d01..fe4434797 100644 --- a/route/picker_test.go +++ b/route/picker_test.go @@ -65,7 +65,7 @@ var oldRandInt = func(n int) int { if n == 0 { return 0 } - return int(time.Now().UnixNano()/int64(time.Microsecond) % int64(n)) + return int(time.Now().UnixNano() / int64(time.Microsecond) % int64(n)) } var result int // prevent compiler optimization diff --git a/route/table.go b/route/table.go index 239dd2ea8..1e6c0369f 100644 --- a/route/table.go +++ b/route/table.go @@ -72,7 +72,7 @@ func SetTable(t Table) { type Table map[string]Routes // hostpath splits a 'host/path' prefix into 'host' and '/path' or it returns a -// ':port' prefix as ':port' and '' since there is no path component for TCP +// ':port' prefix as ':port' and ” since there is no path component for TCP // connections. func hostpath(prefix string) (host string, path string) { if strings.HasPrefix(prefix, ":") { diff --git a/route/table_test.go b/route/table_test.go index 934581460..72080b012 100644 --- a/route/table_test.go +++ b/route/table_test.go @@ -19,7 +19,7 @@ const ( globDisabled = true ) -//Global GlobCache for Testing +// Global GlobCache for Testing var globCache = NewGlobCache(1000) func TestTableParse(t *testing.T) {