Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSGO-1 CASSGO-30 Native Protocol 5 Support #1822

Open
wants to merge 26 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fbdaa2b
Support for Native Protocol 5 release version
worryg0d Jul 18, 2024
93fb4ad
Added 'IF NOT EXISTS' to table creation queries in integration tests
worryg0d Oct 16, 2024
d926108
Clarified comment on startup completed
worryg0d Oct 16, 2024
a1baea2
Returned panics on setting keyspace and now_in_seconds field when pro…
worryg0d Oct 16, 2024
b77aa18
Added description for TestLargeSizeQuery and TestQueryCompressionNotW…
worryg0d Oct 17, 2024
465332b
Removed startupCompleted from Conn obj
worryg0d Oct 17, 2024
f613eaa
Creating new inflight instance when newMetadataID is received
worryg0d Oct 17, 2024
380d660
Moved panics on flags computing stage for now_in_seconds and keyspace…
worryg0d Oct 17, 2024
8b7e64b
Methods renames and comments corrections
worryg0d Oct 18, 2024
2eb3fbe
Improved reading/writing segments code
worryg0d Oct 21, 2024
7a210f2
Removed replace lz4 from go.mod
worryg0d Oct 21, 2024
d8ab2fa
Improved frame segmentation before writing it to conn
worryg0d Oct 21, 2024
34db5ed
Improved error messages
worryg0d Oct 21, 2024
df5adc0
Additional check in recvPartialFrames
worryg0d Oct 21, 2024
a79c985
removed net.Conn and *bufio.Reader from Conn
worryg0d Oct 23, 2024
86130c0
Compressor append-like API
worryg0d Oct 24, 2024
1343003
Fix reading <new_metadata_id> in parseResultMetadata
worryg0d Oct 25, 2024
98e37c1
Updating prepared stmt metadata result set and result_metadata_id whe…
worryg0d Oct 25, 2024
31d10f9
Avoiding deadlock after updating the lru cache entry
worryg0d Oct 25, 2024
2a5056d
Integration test for Metadata_changed mechanism
worryg0d Oct 28, 2024
bf0d7fa
1. Updating info to ensure the code looking at the updated prepared stmt
worryg0d Oct 30, 2024
0592a90
1. Updated TestPrepareExecuteMetadataChangedFlag to validate Metadata…
worryg0d Oct 30, 2024
0298a00
1. Updated the way how the driver constructs stmt cache keys. The cur…
worryg0d Oct 30, 2024
a1a613c
Added skip for TestStmtCacheUsesOverriddenKeyspace and TestRoutingKey…
worryg0d Oct 31, 2024
e779e73
Go bump to 1.19
worryg0d Nov 7, 2024
5ca5382
Changed CI to run integration tests over proto 5 + lz4 compressor
worryg0d Nov 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package gocql

import (
"github.com/stretchr/testify/require"
"testing"
"time"
)
Expand Down Expand Up @@ -84,3 +85,84 @@ func TestBatch_WithTimestamp(t *testing.T) {
t.Errorf("got ts %d, expected %d", storedTs, micros)
}
}

func TestBatch_WithNowInSeconds(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Batch now in seconds are only available on protocol >= 5")
}

if err := createTable(session, `CREATE TABLE IF NOT EXISTS batch_now_in_seconds (id int primary key, val text)`); err != nil {
t.Fatal(err)
}

b := session.NewBatch(LoggedBatch)
b.WithNowInSeconds(0)
b.Query("INSERT INTO batch_now_in_seconds (id, val) VALUES (?, ?) USING TTL 20", 1, "val")
if err := session.ExecuteBatch(b); err != nil {
t.Fatal(err)
}

var remainingTTL int
err := session.Query(`SELECT TTL(val) FROM batch_now_in_seconds WHERE id = ?`, 1).
WithNowInSeconds(10).
Scan(&remainingTTL)
if err != nil {
t.Fatal(err)
}

require.Equal(t, remainingTTL, 10)
}

func TestBatch_SetKeyspace(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("keyspace for BATCH message is not supported in protocol < 5")
}

const keyspaceStmt = `
CREATE KEYSPACE IF NOT EXISTS gocql_keyspace_override_test
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
`

err := session.Query(keyspaceStmt).Exec()
if err != nil {
t.Fatal(err)
}

err = createTable(session, "CREATE TABLE IF NOT EXISTS gocql_keyspace_override_test.batch_keyspace(id int, value text, PRIMARY KEY (id))")
if err != nil {
t.Fatal(err)
}

ids := []int{1, 2}
texts := []string{"val1", "val2"}

b := session.NewBatch(LoggedBatch).SetKeyspace("gocql_keyspace_override_test")
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[0], texts[0])
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[1], texts[1])
err = session.ExecuteBatch(b)
if err != nil {
t.Fatal(err)
}

var (
id int
text string
)

iter := session.Query("SELECT * FROM gocql_keyspace_override_test.batch_keyspace").Iter()
defer iter.Close()

for i := 0; iter.Scan(&id, &text); i++ {
require.Equal(t, id, ids[i])
require.Equal(t, text, texts[i])
}
}
158 changes: 158 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"context"
"errors"
"fmt"
"github.com/stretchr/testify/require"
"io"
"math"
"math/big"
Expand Down Expand Up @@ -3288,3 +3289,160 @@ func TestQuery_NamedValues(t *testing.T) {
t.Fatal(err)
}
}

func TestQuery_WithNowInSeconds(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Query now in seconds are only available on protocol >= 5")
}

if err := createTable(session, `CREATE TABLE IF NOT EXISTS query_now_in_seconds (id int primary key, val text)`); err != nil {
t.Fatal(err)
}

err := session.Query("INSERT INTO query_now_in_seconds (id, val) VALUES (?, ?) USING TTL 20", 1, "val").
WithNowInSeconds(int(0)).
Exec()
if err != nil {
t.Fatal(err)
}

var remainingTTL int
err = session.Query(`SELECT TTL(val) FROM query_now_in_seconds WHERE id = ?`, 1).
WithNowInSeconds(10).
Scan(&remainingTTL)
if err != nil {
t.Fatal(err)
}

require.Equal(t, remainingTTL, 10)
}

func TestQuery_SetKeyspace(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("keyspace for QUERY message is not supported in protocol < 5")
}

const keyspaceStmt = `
CREATE KEYSPACE IF NOT EXISTS gocql_query_keyspace_override_test
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
`

err := session.Query(keyspaceStmt).Exec()
if err != nil {
t.Fatal(err)
}

err = createTable(session, "CREATE TABLE IF NOT EXISTS gocql_query_keyspace_override_test.query_keyspace(id int, value text, PRIMARY KEY (id))")
if err != nil {
t.Fatal(err)
}

expectedID := 1
expectedText := "text"

// Testing PREPARE message
err = session.Query("INSERT INTO gocql_query_keyspace_override_test.query_keyspace (id, value) VALUES (?, ?)", expectedID, expectedText).Exec()
if err != nil {
t.Fatal(err)
}

var (
id int
text string
)

q := session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace").
SetKeyspace("gocql_query_keyspace_override_test")
err = q.Scan(&id, &text)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedID, id)
require.Equal(t, expectedText, text)

// Testing QUERY message
id = 0
text = ""

q = session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace").
SetKeyspace("gocql_query_keyspace_override_test")
q.skipPrepare = true
err = q.Scan(&id, &text)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedID, id)
require.Equal(t, expectedText, text)
}

func TestLargeSizeQuery(t *testing.T) {
// TestLargeSizeQuery runs a query bigger than the max allowed size of the payload of a frame,
// so it should be sent as 2 different frames where each contains a self-contained bit set to zero.

session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.large_size_query(id int, text_col text, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

defer session.Close()

longString := strings.Repeat("a", 500_000)

err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", longString).Exec()
if err != nil {
t.Fatal(err)
}

var result string
err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result)
if err != nil {
t.Fatal(err)
}

require.Equal(t, longString, result)
}

func TestQueryCompressionNotWorthIt(t *testing.T) {
// TestQueryCompressionNotWorthIt runs a query that is not likely to be compressed efficiently
// (uncompressed payload size > compressed payload size).
// So, it should send a Compressed Frame where:
// 1. Compressed length is set to the length of the uncompressed payload;
// 2. Uncompressed length is set to zero;
// 3. Payload is the uncompressed payload.

session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.compression_now_worth_it(id int, text_col text, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

defer session.Close()

str := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()_+"
err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", str).Exec()
if err != nil {
t.Fatal(err)
}

var result string
err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result)
if err != nil {
t.Fatal(err)
}

require.Equal(t, str, result)
joao-r-reis marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 3 additions & 0 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package gocql
import (
"flag"
"fmt"
"github.com/gocql/gocql/lz4"
"log"
"net"
"reflect"
Expand Down Expand Up @@ -111,6 +112,8 @@ func createCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
switch *flagCompressTest {
case "snappy":
cluster.Compressor = &SnappyCompressor{}
case "lz4":
cluster.Compressor = lz4.LZ4Compressor{}
case "":
default:
panic("invalid compressor: " + *flagCompressTest)
Expand Down
6 changes: 6 additions & 0 deletions compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Compressor interface {
Name() string
Encode(data []byte) ([]byte, error)
Decode(data []byte) ([]byte, error)
DecodeSized(data []byte, size uint32) ([]byte, error)
joao-r-reis marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add doc comment describing the semantics of the method?


We could also change this to be append-like, as follows:

// DecodeSized decodes the encoded bytes and appends them to dst.
// decodeSize is the size of data after decompression.
DecodeSized(dst, encoded []byte, decodedSize int32) ([]byte, error)

It seems that would be more flexible, for example it would allow the framer to reuse buffers for decompression in a sync.Pool if needed. And since this will be in 2.0, we could adjust the other methods as well.

Copy link
Contributor

@joao-r-reis joao-r-reis Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this, it's a great suggestion 👍 We don't have to work on changing the framer to reuse buffers now but making the API change now on the 2.0 release would make the most sense.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should agree on the API.

We could adjust the current API to this:

type Compressor interface {
	Name() string
	Encode(dst, data []byte) ([]byte, error)
	Decode(dst, data []byte) ([]byte, error)

	// DecodeSized decodes the encoded bytes and appends them to dst.
	// decodeSize is the size of data after decompression.
	DecodeSized(dst, encoded []byte, decodedSize uint32) ([]byte, error)
}

As another solution, we could change the API to this:

type Compressor interface {
	Name() string
	
	// EncodeWithSize encodes data bytes, appends the size of the data
	// to dst and appends encoded bytes.
	// It returns encoded data length.
	EncodeWithSize(dst, data []byte) (n int, err error)
	
	// DecodeWithSize reads size of uncompressed data from the encoded data,
	// decodes the encoded bytes and appends them to dst.
	// It returns decoded data length.
	DecodeWithSize(dst, encoded []byte) (n int, err error)

	// Encode encodes data bytes and appends them to dst.
	// It returns encoded data length.
	Encode(dst, data []byte) (n int, err error)

	// Decode decodes the encoded bytes and appends them to dst.
	// It returns decoded data length.
	Decode(dst, encoded []byte) (n int, err error)
}

The second provides distinguishing methods for:

  • frame body encoding/decoding EncodeWithSize and DecodeWithSize
  • segment payload processing Encode and Decode.

Copy link
Contributor

@joao-r-reis joao-r-reis Oct 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To truly make it 100% "append-like" (examples: golang/go#53693):

type Compressor interface {
	Name() string
	
	// AppendCompressedWithLength compresses src bytes, appends the length of the compressed bytes to dst and then appends the compressed bytes to dst.
        // It returns a new byte slice that is the result of the append operation.
	AppendCompressedWithLength(dst, src []byte) ([]byte, error)
	
	// AppendDecompressedWithLength reads the length of the decompressed bytes from src, decompresses bytes from src and appends the decompressed bytes to dst.
        // It returns a new byte slice that is the result of the append operation.
	AppendDecompressedWithLength(dst, src []byte) ([]byte, error)

	// AppendCompressed compresses src bytes and appends the compressed bytes to dst.
        // It returns a new byte slice that is the result of the append operation.
	AppendCompressed(dst, src []byte) ([]byte, error)

	// AppendDecompressed decompresses bytes from src and appends the decompressed bytes to dst.
        // It returns a new byte slice that is the result of the append operation.
	AppendDecompressed(dst, src []byte, decompressedLength uint32) ([]byte, error)
}

The lz4 library does not support append like semantics so we're going to have to expand dst slice before passing it to the lz4 library functions. If cap(dst)-len(dst) is not large enough then we'll have to allocate a new byte slice and append it to dst instead. AppendDecompressed has the decompressedLength parameter so we can check if we can reuse dst or if we have to allocate a new byte slice.

I used the name Compress and Decompress instead of Encode / Decode because it felt more natural to me but I'm fine with keeping Encode and Decode. Same for WithLength/WithSize

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please look at this? I tried to implement this append-like API, but I'm unsure if I should implement the AppendCompressed and AppendDecompressed methods for our snappy wrapper...

I used the name Compress and Decompress instead of Encode / Decode because it felt more natural to me but I'm fine with keeping Encode and Decode. Same for WithLength/WithSize

I like your variants. If nobody has anything against then I'll use them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a few comments but that's the spirit of what I was suggesting 👍 @martin-sucha can you also take a look to see if it is in line with what you were suggesting?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm unsure if I should implement the AppendCompressed and AppendDecompressed methods for our snappy wrapper...

I think it's fine to make them panic since protocol v5 doesn't support snappy anyway.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the suggested by @joao-r-reis solution. @martin-sucha Could you please take a look at this?

}

// SnappyCompressor implements the Compressor interface and can be used to
Expand All @@ -50,3 +51,8 @@ func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
return snappy.Decode(nil, data)
}

func (s SnappyCompressor) DecodeSized(data []byte, size uint32) ([]byte, error) {
buf := make([]byte, size)
return snappy.Decode(buf, data)
}
Loading