Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSGO-1 CASSGO-30 Native Protocol 5 Support #1822

Open
wants to merge 26 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fbdaa2b
Support for Native Protocol 5 release version
worryg0d Jul 18, 2024
93fb4ad
Added 'IF NOT EXISTS' to table creation queries in integration tests
worryg0d Oct 16, 2024
d926108
Clarified comment on startup completed
worryg0d Oct 16, 2024
a1baea2
Returned panics on setting keyspace and now_in_seconds field when pro…
worryg0d Oct 16, 2024
b77aa18
Added description for TestLargeSizeQuery and TestQueryCompressionNotW…
worryg0d Oct 17, 2024
465332b
Removed startupCompleted from Conn obj
worryg0d Oct 17, 2024
f613eaa
Creating new inflight instance when newMetadataID is received
worryg0d Oct 17, 2024
380d660
Moved panics on flags computing stage for now_in_seconds and keyspace…
worryg0d Oct 17, 2024
8b7e64b
Methods renames and comments corrections
worryg0d Oct 18, 2024
2eb3fbe
Improved reading/writing segments code
worryg0d Oct 21, 2024
7a210f2
Removed replace lz4 from go.mod
worryg0d Oct 21, 2024
d8ab2fa
Improved frame segmentation before writing it to conn
worryg0d Oct 21, 2024
34db5ed
Improved error messages
worryg0d Oct 21, 2024
df5adc0
Additional check in recvPartialFrames
worryg0d Oct 21, 2024
a79c985
removed net.Conn and *bufio.Reader from Conn
worryg0d Oct 23, 2024
86130c0
Compressor append-like API
worryg0d Oct 24, 2024
1343003
Fix reading <new_metadata_id> in parseResultMetadata
worryg0d Oct 25, 2024
98e37c1
Updating prepared stmt metadata result set and result_metadata_id whe…
worryg0d Oct 25, 2024
31d10f9
Avoiding deadlock after updating the lru cache entry
worryg0d Oct 25, 2024
2a5056d
Integration test for Metadata_changed mechanism
worryg0d Oct 28, 2024
bf0d7fa
1. Updating info to ensure the code looking at the updated prepared stmt
worryg0d Oct 30, 2024
0592a90
1. Updated TestPrepareExecuteMetadataChangedFlag to validate Metadata…
worryg0d Oct 30, 2024
0298a00
1. Updated the way how the driver constructs stmt cache keys. The cur…
worryg0d Oct 30, 2024
a1a613c
Added skip for TestStmtCacheUsesOverriddenKeyspace and TestRoutingKey…
worryg0d Oct 31, 2024
e779e73
Go bump to 1.19
worryg0d Nov 7, 2024
5ca5382
Changed CI to run integration tests over proto 5 + lz4 compressor
worryg0d Nov 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package gocql

import (
"github.com/stretchr/testify/require"
"testing"
"time"
)
Expand Down Expand Up @@ -84,3 +85,84 @@ func TestBatch_WithTimestamp(t *testing.T) {
t.Errorf("got ts %d, expected %d", storedTs, micros)
}
}

func TestBatch_WithNowInSeconds(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Batch now in seconds are only available on protocol >= 5")
}

if err := createTable(session, `CREATE TABLE IF NOT EXISTS batch_now_in_seconds (id int primary key, val text)`); err != nil {
t.Fatal(err)
}

b := session.NewBatch(LoggedBatch)
b.WithNowInSeconds(0)
b.Query("INSERT INTO batch_now_in_seconds (id, val) VALUES (?, ?) USING TTL 20", 1, "val")
if err := session.ExecuteBatch(b); err != nil {
t.Fatal(err)
}

var remainingTTL int
err := session.Query(`SELECT TTL(val) FROM batch_now_in_seconds WHERE id = ?`, 1).
WithNowInSeconds(10).
Scan(&remainingTTL)
if err != nil {
t.Fatal(err)
}

require.Equal(t, remainingTTL, 10)
}

func TestBatch_SetKeyspace(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("keyspace for BATCH message is not supported in protocol < 5")
}

const keyspaceStmt = `
CREATE KEYSPACE IF NOT EXISTS gocql_keyspace_override_test
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
`

err := session.Query(keyspaceStmt).Exec()
if err != nil {
t.Fatal(err)
}

err = createTable(session, "CREATE TABLE IF NOT EXISTS gocql_keyspace_override_test.batch_keyspace(id int, value text, PRIMARY KEY (id))")
if err != nil {
t.Fatal(err)
}

ids := []int{1, 2}
texts := []string{"val1", "val2"}

b := session.NewBatch(LoggedBatch).SetKeyspace("gocql_keyspace_override_test")
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[0], texts[0])
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[1], texts[1])
err = session.ExecuteBatch(b)
if err != nil {
t.Fatal(err)
}

var (
id int
text string
)

iter := session.Query("SELECT * FROM gocql_keyspace_override_test.batch_keyspace").Iter()
defer iter.Close()

for i := 0; iter.Scan(&id, &text); i++ {
require.Equal(t, id, ids[i])
require.Equal(t, text, texts[i])
}
}
158 changes: 158 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"context"
"errors"
"fmt"
"github.com/stretchr/testify/require"
"io"
"math"
"math/big"
Expand Down Expand Up @@ -3288,3 +3289,160 @@ func TestQuery_NamedValues(t *testing.T) {
t.Fatal(err)
}
}

func TestQuery_WithNowInSeconds(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Query now in seconds are only available on protocol >= 5")
}

if err := createTable(session, `CREATE TABLE IF NOT EXISTS query_now_in_seconds (id int primary key, val text)`); err != nil {
t.Fatal(err)
}

err := session.Query("INSERT INTO query_now_in_seconds (id, val) VALUES (?, ?) USING TTL 20", 1, "val").
WithNowInSeconds(int(0)).
Exec()
if err != nil {
t.Fatal(err)
}

var remainingTTL int
err = session.Query(`SELECT TTL(val) FROM query_now_in_seconds WHERE id = ?`, 1).
WithNowInSeconds(10).
Scan(&remainingTTL)
if err != nil {
t.Fatal(err)
}

require.Equal(t, remainingTTL, 10)
}

func TestQuery_SetKeyspace(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("keyspace for QUERY message is not supported in protocol < 5")
}

const keyspaceStmt = `
CREATE KEYSPACE IF NOT EXISTS gocql_query_keyspace_override_test
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
`

err := session.Query(keyspaceStmt).Exec()
if err != nil {
t.Fatal(err)
}

err = createTable(session, "CREATE TABLE IF NOT EXISTS gocql_query_keyspace_override_test.query_keyspace(id int, value text, PRIMARY KEY (id))")
if err != nil {
t.Fatal(err)
}

expectedID := 1
expectedText := "text"

// Testing PREPARE message
err = session.Query("INSERT INTO gocql_query_keyspace_override_test.query_keyspace (id, value) VALUES (?, ?)", expectedID, expectedText).Exec()
if err != nil {
t.Fatal(err)
}

var (
id int
text string
)

q := session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace").
SetKeyspace("gocql_query_keyspace_override_test")
err = q.Scan(&id, &text)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedID, id)
require.Equal(t, expectedText, text)

// Testing QUERY message
id = 0
text = ""

q = session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace").
SetKeyspace("gocql_query_keyspace_override_test")
q.skipPrepare = true
err = q.Scan(&id, &text)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedID, id)
require.Equal(t, expectedText, text)
}

func TestLargeSizeQuery(t *testing.T) {
// TestLargeSizeQuery runs a query bigger than the max allowed size of the payload of a frame,
// so it should be sent as 2 different frames where each contains a self-contained bit set to zero.

session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.large_size_query(id int, text_col text, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

defer session.Close()

longString := strings.Repeat("a", 500_000)

err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", longString).Exec()
if err != nil {
t.Fatal(err)
}

var result string
err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result)
if err != nil {
t.Fatal(err)
}

require.Equal(t, longString, result)
}

func TestQueryCompressionNotWorthIt(t *testing.T) {
// TestQueryCompressionNotWorthIt runs a query that is not likely to be compressed efficiently
// (uncompressed payload size > compressed payload size).
// So, it should send a Compressed Frame where:
// 1. Compressed length is set to the length of the uncompressed payload;
// 2. Uncompressed length is set to zero;
// 3. Payload is the uncompressed payload.

session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.compression_now_worth_it(id int, text_col text, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

defer session.Close()

str := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()_+"
err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", str).Exec()
if err != nil {
t.Fatal(err)
}

var result string
err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result)
if err != nil {
t.Fatal(err)
}

require.Equal(t, str, result)
joao-r-reis marked this conversation as resolved.
Show resolved Hide resolved
}
40 changes: 31 additions & 9 deletions compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,28 @@

package gocql

import (
"github.com/golang/snappy"
)
import "github.com/golang/snappy"

type Compressor interface {
Name() string
Encode(data []byte) ([]byte, error)
Decode(data []byte) ([]byte, error)

// AppendCompressedWithLength compresses src bytes, appends the length of the compressed bytes to dst
// and then appends the compressed bytes to dst.
// It returns a new byte slice that is the result of the append operation.
AppendCompressedWithLength(dst, src []byte) ([]byte, error)

// AppendDecompressedWithLength reads the length of the decompressed bytes from src,
// decompressed bytes from src and appends the decompressed bytes to dst.
// It returns a new byte slice that is the result of the append operation.
AppendDecompressedWithLength(dst, src []byte) ([]byte, error)

// AppendCompressed compresses src bytes and appends the compressed bytes to dst.
// It returns a new byte slice that is the result of the append operation.
AppendCompressed(dst, src []byte) ([]byte, error)

// AppendDecompressed decompresses bytes from src and appends the decompressed bytes to dst.
// It returns a new byte slice that is the result of the append operation.
AppendDecompressed(dst, src []byte, decompressedLength uint32) ([]byte, error)
}

// SnappyCompressor implements the Compressor interface and can be used to
Expand All @@ -43,10 +57,18 @@ func (s SnappyCompressor) Name() string {
return "snappy"
}

func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
return snappy.Encode(nil, data), nil
func (s SnappyCompressor) AppendCompressedWithLength(dst, src []byte) ([]byte, error) {
return snappy.Encode(dst, src), nil
}

func (s SnappyCompressor) AppendDecompressedWithLength(dst, src []byte) ([]byte, error) {
return snappy.Decode(dst, src)
}

func (s SnappyCompressor) AppendCompressed(dst, src []byte) ([]byte, error) {
panic("SnappyCompressor.AppendCompressed is not supported")
}

func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
return snappy.Decode(nil, data)
func (s SnappyCompressor) AppendDecompressed(dst, src []byte, decompressedLength uint32) ([]byte, error) {
panic("SnappyCompressor.AppendDecompressed is not supported")
}
6 changes: 3 additions & 3 deletions compressor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,21 @@ func TestSnappyCompressor(t *testing.T) {
str := "My Test String"
//Test Encoding
expected := snappy.Encode(nil, []byte(str))
if res, err := c.Encode([]byte(str)); err != nil {
if res, err := c.AppendCompressedWithLength(nil, []byte(str)); err != nil {
t.Fatalf("failed to encode '%v' with error %v", str, err)
} else if bytes.Compare(expected, res) != 0 {
t.Fatal("failed to match the expected encoded value with the result encoded value.")
}

val, err := c.Encode([]byte(str))
val, err := c.AppendCompressedWithLength(nil, []byte(str))
if err != nil {
t.Fatalf("failed to encode '%v' with error '%v'", str, err)
}

//Test Decoding
if expected, err := snappy.Decode(nil, val); err != nil {
t.Fatalf("failed to decode '%v' with error %v", val, err)
} else if res, err := c.Decode(val); err != nil {
} else if res, err := c.AppendDecompressedWithLength(nil, val); err != nil {
t.Fatalf("failed to decode '%v' with error %v", val, err)
} else if bytes.Compare(expected, res) != 0 {
t.Fatal("failed to match the expected decoded value with the result decoded value.")
Expand Down
Loading