Skip to content

Commit

Permalink
Support for Native Protocol 5 release version
Browse files Browse the repository at this point in the history
  • Loading branch information
worryg0d committed Oct 10, 2024
1 parent 953e0df commit fbdaa2b
Show file tree
Hide file tree
Showing 16 changed files with 1,417 additions and 68 deletions.
82 changes: 82 additions & 0 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package gocql

import (
"github.com/stretchr/testify/require"
"testing"
"time"
)
Expand Down Expand Up @@ -84,3 +85,84 @@ func TestBatch_WithTimestamp(t *testing.T) {
t.Errorf("got ts %d, expected %d", storedTs, micros)
}
}

func TestBatch_WithNowInSeconds(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Batch now in seconds are only available on protocol >= 5")
}

if err := createTable(session, `CREATE TABLE batch_now_in_seconds (id int primary key, val text)`); err != nil {
t.Fatal(err)
}

b := session.NewBatch(LoggedBatch)
b.WithNowInSeconds(0)
b.Query("INSERT INTO batch_now_in_seconds (id, val) VALUES (?, ?) USING TTL 20", 1, "val")
if err := session.ExecuteBatch(b); err != nil {
t.Fatal(err)
}

var remainingTTL int
err := session.Query(`SELECT TTL(val) FROM batch_now_in_seconds WHERE id = ?`, 1).
WithNowInSeconds(10).
Scan(&remainingTTL)
if err != nil {
t.Fatal(err)
}

require.Equal(t, remainingTTL, 10)
}

func TestBatch_SetKeyspace(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("keyspace for BATCH message is not supported in protocol < 5")
}

const keyspaceStmt = `
CREATE KEYSPACE IF NOT EXISTS gocql_keyspace_override_test
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
`

err := session.Query(keyspaceStmt).Exec()
if err != nil {
t.Fatal(err)
}

err = createTable(session, "CREATE TABLE IF NOT EXISTS gocql_keyspace_override_test.batch_keyspace(id int, value text, PRIMARY KEY (id))")
if err != nil {
t.Fatal(err)
}

ids := []int{1, 2}
texts := []string{"val1", "val2"}

b := session.NewBatch(LoggedBatch).SetKeyspace("gocql_keyspace_override_test")
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[0], texts[0])
b.Query("INSERT INTO batch_keyspace(id, value) VALUES (?, ?)", ids[1], texts[1])
err = session.ExecuteBatch(b)
if err != nil {
t.Fatal(err)
}

var (
id int
text string
)

iter := session.Query("SELECT * FROM gocql_keyspace_override_test.batch_keyspace").Iter()
defer iter.Close()

for i := 0; iter.Scan(&id, &text); i++ {
require.Equal(t, id, ids[i])
require.Equal(t, text, texts[i])
}
}
148 changes: 148 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"context"
"errors"
"fmt"
"github.com/stretchr/testify/require"
"io"
"math"
"math/big"
Expand Down Expand Up @@ -3288,3 +3289,150 @@ func TestQuery_NamedValues(t *testing.T) {
t.Fatal(err)
}
}

func TestQuery_WithNowInSeconds(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Query now in seconds are only available on protocol >= 5")
}

if err := createTable(session, `CREATE TABLE query_now_in_seconds (id int primary key, val text)`); err != nil {
t.Fatal(err)
}

err := session.Query("INSERT INTO query_now_in_seconds (id, val) VALUES (?, ?) USING TTL 20", 1, "val").
WithNowInSeconds(int(0)).
Exec()
if err != nil {
t.Fatal(err)
}

var remainingTTL int
err = session.Query(`SELECT TTL(val) FROM query_now_in_seconds WHERE id = ?`, 1).
WithNowInSeconds(10).
Scan(&remainingTTL)
if err != nil {
t.Fatal(err)
}

require.Equal(t, remainingTTL, 10)
}

func TestQuery_SetKeyspace(t *testing.T) {
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("keyspace for QUERY message is not supported in protocol < 5")
}

const keyspaceStmt = `
CREATE KEYSPACE IF NOT EXISTS gocql_query_keyspace_override_test
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': '1'
};
`

err := session.Query(keyspaceStmt).Exec()
if err != nil {
t.Fatal(err)
}

err = createTable(session, "CREATE TABLE IF NOT EXISTS gocql_query_keyspace_override_test.query_keyspace(id int, value text, PRIMARY KEY (id))")
if err != nil {
t.Fatal(err)
}

expectedID := 1
expectedText := "text"

// Testing PREPARE message
err = session.Query("INSERT INTO gocql_query_keyspace_override_test.query_keyspace (id, value) VALUES (?, ?)", expectedID, expectedText).Exec()
if err != nil {
t.Fatal(err)
}

var (
id int
text string
)

q := session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace").
SetKeyspace("gocql_query_keyspace_override_test")
err = q.Scan(&id, &text)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedID, id)
require.Equal(t, expectedText, text)

// Testing QUERY message
id = 0
text = ""

q = session.Query("SELECT * FROM gocql_query_keyspace_override_test.query_keyspace").
SetKeyspace("gocql_query_keyspace_override_test")
q.skipPrepare = true
err = q.Scan(&id, &text)
if err != nil {
t.Fatal(err)
}

require.Equal(t, expectedID, id)
require.Equal(t, expectedText, text)
}

func TestLargeSizeQuery(t *testing.T) {
session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE gocql_test.large_size_query(id int, text_col text, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

defer session.Close()

longString := strings.Repeat("a", 500_000)

err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", longString).Exec()
if err != nil {
t.Fatal(err)
}

var result string
err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result)
if err != nil {
t.Fatal(err)
}

require.Equal(t, longString, result)
}

func TestQueryCompressionNotWorthIt(t *testing.T) {
session := createSession(t)
defer session.Close()

if err := createTable(session, "CREATE TABLE gocql_test.compression_now_worth_it(id int, text_col text, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

defer session.Close()

str := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()_+"
err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", str).Exec()
if err != nil {
t.Fatal(err)
}

var result string
err = session.Query("SELECT text_col FROM gocql_test.large_size_query").Scan(&result)
if err != nil {
t.Fatal(err)
}

require.Equal(t, str, result)
}
3 changes: 3 additions & 0 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package gocql
import (
"flag"
"fmt"
"github.com/gocql/gocql/lz4"
"log"
"net"
"reflect"
Expand Down Expand Up @@ -111,6 +112,8 @@ func createCluster(opts ...func(*ClusterConfig)) *ClusterConfig {
switch *flagCompressTest {
case "snappy":
cluster.Compressor = &SnappyCompressor{}
case "lz4":
cluster.Compressor = lz4.LZ4Compressor{}
case "":
default:
panic("invalid compressor: " + *flagCompressTest)
Expand Down
6 changes: 6 additions & 0 deletions compressor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Compressor interface {
Name() string
Encode(data []byte) ([]byte, error)
Decode(data []byte) ([]byte, error)
DecodeSized(data []byte, size uint32) ([]byte, error)
}

// SnappyCompressor implements the Compressor interface and can be used to
Expand All @@ -50,3 +51,8 @@ func (s SnappyCompressor) Encode(data []byte) ([]byte, error) {
func (s SnappyCompressor) Decode(data []byte) ([]byte, error) {
return snappy.Decode(nil, data)
}

func (s SnappyCompressor) DecodeSized(data []byte, size uint32) ([]byte, error) {
buf := make([]byte, size)
return snappy.Decode(buf, data)
}
Loading

0 comments on commit fbdaa2b

Please sign in to comment.