Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSGO-39 Add query attempt interceptor #1820

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ type ClusterConfig struct {
// See https://issues.apache.org/jira/browse/CASSANDRA-10786
DisableSkipMetadata bool

// QueryAttemptInterceptor will set the provided query interceptor on all queries created from this session.
// Use it to intercept and modify queries by providing an implementation of QueryAttemptInterceptor.
QueryAttemptInterceptor QueryAttemptInterceptor

// QueryObserver will set the provided query observer on all queries created from this session.
// Use it to collect metrics / stats from queries by providing an implementation of QueryObserver.
QueryObserver QueryObserver
Expand Down
12 changes: 12 additions & 0 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,18 @@
//
// See Example_userDefinedTypesMap, Example_userDefinedTypesStruct, ExampleUDTMarshaler, ExampleUDTUnmarshaler.
//
// # Interceptors
//
// A QueryAttemptInterceptor wraps query execution and can be used to inject logic that should apply to all query
// and batch execution attempts. For example, interceptors can be used for rate limiting, logging, attaching
// distributed tracing metadata to the context, modifying queries, and inspecting query results.
//
// A QueryAttemptInterceptor will be invoked once prior to each query execution attempt, including retry attempts
// and speculative execution attempts. Interceptors are responsible for calling the provided handler and returning
// a non-nil Iter.
//
// See Example_interceptor for full example.
//
// # Metrics and tracing
//
// It is possible to provide observer implementations that could be used to gather metrics:
Expand Down
92 changes: 92 additions & 0 deletions example_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Content before git sha 34fdeebefcbf183ed7f916f931aa0586fdaa1b40
* Copyright (c) 2016, The Gocql authors,
* provided under the BSD-3-Clause License.
* See the NOTICE file distributed with this work for additional information.
*/

package gocql_test

import (
"context"
"fmt"
"log"
"time"

gocql "github.com/gocql/gocql"
)

type MyQueryAttemptInterceptor struct {
injectFault bool
}

func (q MyQueryAttemptInterceptor) Intercept(
ctx context.Context,
query gocql.ExecutableQuery,
conn *gocql.Conn,
handler gocql.QueryAttemptHandler,
) *gocql.Iter {
switch q := query.(type) {
case *gocql.Query:
// Inspect or modify query
query = q
case *gocql.Batch:
// Inspect or modify batch
query = q
}

// Inspect or modify context
ctx = context.WithValue(ctx, "trace-id", "123")

// Optionally bypass the handler and return an error to prevent query execution.
// For example, to simulate query timeouts.
if q.injectFault && query.Attempts() == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the attempt number be a parameter to Intercept? It seems that calling Attempts() could be racy, especially when speculative execution is enabled:

  1. Attempt 0 started
  2. Attempt 1 started
  3. Interceptor 0 called - gets attempt 0
  4. Interceptor 1 called - gets attempt 0
  5. attempts incremented
  6. attempts incremented

Perhaps the signature should be Intercept(ctx context.Context, attempt gocql.QueryAttempt, handler gocql.QueryAttemptHandler)? That would enable adding more information about the attempt in the future.

type QueryAttempt struct {
	// query to execute
	query gocql.ExecutableQuery
	// conn to use to execute the query
	conn *gocql.Conn
	// number of the attempt. 0 is the initial attempt, 1 is first retry, etc.
	number int
}
type QueryAttemptHandler = func(context.Context, QueryAttempt) *Iter

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-sucha passing a QueryAttempt type seems like a great way to keep the interceptor API stable 👍

<-time.After(1 * time.Second)
return gocql.NewIterWithErr(gocql.RequestErrWriteTimeout{})
}

// The interceptor *must* invoke the handler to execute the query.
return handler(ctx, query, conn)
}

// Example_interceptor demonstrates how to implement a QueryAttemptInterceptor.
func Example_interceptor() {
cluster := gocql.NewCluster("localhost:9042")
cluster.QueryAttemptInterceptor = MyQueryAttemptInterceptor{injectFault: true}

session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
}
defer session.Close()

ctx := context.Background()

var stringValue string
err = session.Query("select now() from system.local").
WithContext(ctx).
RetryPolicy(&gocql.SimpleRetryPolicy{NumRetries: 2}).
Scan(&stringValue)
if err != nil {
Comment on lines +81 to +85
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding an option to apply the interceptor selectively to specific queries, like .WithInterceptor() method to the Query struct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not strongly opposed, but I'd favor using the query context to selectively enable an interceptor because it avoids introducing potential conflicts between a session-level interceptor config and query-level interceptor config.

log.Fatalf("query failed %T", err)
}
fmt.Println(stringValue)
// Output: MOOOO!
}
40 changes: 34 additions & 6 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ExecutableQuery interface {
borrowForExecution() // Used to ensure that the query stays alive for lifetime of a particular execution goroutine.
releaseAfterExecution() // Used when a goroutine finishes its execution attempts, either with ok result or an error.
execute(ctx context.Context, conn *Conn) *Iter
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
attempt(ctx context.Context, keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
retryPolicy() RetryPolicy
speculativeExecutionPolicy() SpeculativeExecutionPolicy
GetRoutingKey() ([]byte, error)
Expand All @@ -48,16 +48,44 @@ type ExecutableQuery interface {
}

type queryExecutor struct {
pool *policyConnPool
policy HostSelectionPolicy
pool *policyConnPool
policy HostSelectionPolicy
interceptor QueryAttemptInterceptor
}

// QueryAttemptHandler is a function that attempts query execution.
type QueryAttemptHandler = func(context.Context, ExecutableQuery, *Conn) *Iter

// QueryAttemptInterceptor is the interface implemented by query interceptors / middleware.
//
// Interceptors are well-suited to logic that is not specific to a single query or batch.
type QueryAttemptInterceptor interface {
// Intercept is invoked once immediately before a query execution attempt, including retry attempts and
// speculative execution attempts.

// The interceptor is responsible for calling the `handler` function and returning the handler result. Failure to
// call the handler will panic. If the interceptor wants to halt query execution and prevent retries, it should
// return an error.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: The function does not have an error return value. Should we add an error return value or should we reference NewIterWithErr instead?

question: How does returning an error prevent a retry?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add an error return value or should we reference NewIterWithErr instead?

Updated Intercept to optionally return an error and removed NewIterWithErr.

How does returning an error prevent a retry?

Returning a non-retriable error like context.Canceled would prevent retries, but admittedly that's not clear from the function docs. I removed the reference to preventing retries and will assume that users can probably figure out how to configure retry policies with whatever error handling and retry prevention makes sense for them.

Intercept(ctx context.Context, query ExecutableQuery, conn *Conn, handler QueryAttemptHandler) *Iter
}

func (q *queryExecutor) attemptQuery(ctx context.Context, qry ExecutableQuery, conn *Conn) *Iter {
start := time.Now()
iter := qry.execute(ctx, conn)
end := time.Now()

qry.attempt(q.pool.keyspace, end, start, iter, conn.host)
var iter *Iter
if q.interceptor != nil {
// Propagate interceptor context modifications.
_ctx := ctx
iter = q.interceptor.Intercept(_ctx, qry, conn, func(_ctx context.Context, qry ExecutableQuery, c *Conn) *Iter {
ctx = _ctx
return qry.execute(ctx, conn)
})
} else {
iter = qry.execute(ctx, conn)
}

end := time.Now()
qry.attempt(ctx, q.pool.keyspace, end, start, iter, conn.host)

return iter
}
Expand Down
18 changes: 12 additions & 6 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
s.policy.Init(s)

s.executor = &queryExecutor{
pool: s.pool,
policy: cfg.PoolConfig.HostSelectionPolicy,
pool: s.pool,
policy: cfg.PoolConfig.HostSelectionPolicy,
interceptor: cfg.QueryAttemptInterceptor,
}

s.queryObserver = cfg.QueryObserver
Expand Down Expand Up @@ -1111,12 +1112,12 @@ func (q *Query) execute(ctx context.Context, conn *Conn) *Iter {
return conn.executeQuery(ctx, q)
}

func (q *Query) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
func (q *Query) attempt(ctx context.Context, keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
latency := end.Sub(start)
attempt, metricsForHost := q.metrics.attempt(1, latency, host, q.observer != nil)

if q.observer != nil {
q.observer.ObserveQuery(q.Context(), ObservedQuery{
q.observer.ObserveQuery(ctx, ObservedQuery{
Keyspace: keyspace,
Statement: q.stmt,
Values: q.values,
Expand Down Expand Up @@ -1448,6 +1449,11 @@ func (iter *Iter) Columns() []ColumnInfo {
return iter.meta.columns
}

// NewIterWithErr return a new *Iter with an error.
func NewIterWithErr(err error) *Iter {
return &Iter{err: err}
}

type Scanner interface {
// Next advances the row pointer to point at the next row, the row is valid until
// the next call of Next. It returns true if there is a row which is available to be
Expand Down Expand Up @@ -1942,7 +1948,7 @@ func (b *Batch) WithTimestamp(timestamp int64) *Batch {
return b
}

func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
func (b *Batch) attempt(ctx context.Context, keyspace string, end, start time.Time, iter *Iter, host *HostInfo) {
latency := end.Sub(start)
attempt, metricsForHost := b.metrics.attempt(1, latency, host, b.observer != nil)

Expand All @@ -1958,7 +1964,7 @@ func (b *Batch) attempt(keyspace string, end, start time.Time, iter *Iter, host
values[i] = entry.Args
}

b.observer.ObserveBatch(b.Context(), ObservedBatch{
b.observer.ObserveBatch(ctx, ObservedBatch{
Keyspace: keyspace,
Statements: statements,
Values: values,
Expand Down