Skip to content

Commit

Permalink
CASSGO-26 consistency serial was added
Browse files Browse the repository at this point in the history
The user should be able to set consistency to SERIAL or LOCAL_SERIAL
for Paxos reads, but the previous implementation doesn't support such a feature.

patch by Mykyta Oleksiienko; reviewed by João Reis for CASSGO-26
  • Loading branch information
OleksiienkoMykyta committed Nov 14, 2024
1 parent 7b7e6af commit 769a5c1
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 43 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Allow SERIAL and LOCAL_SERIAL on SELECT statements [CASSGO-26](https://issues.apache.org/jira/browse/CASSGO-26)

### Changed

- Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19)
Expand Down
88 changes: 88 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"unicode"

inf "gopkg.in/inf.v0"

"github.com/stretchr/testify/require"
)

func TestEmptyHosts(t *testing.T) {
Expand Down Expand Up @@ -504,6 +506,92 @@ func TestCAS(t *testing.T) {
}
}

func TestConsistencySerial(t *testing.T) {
session := createSession(t)
defer session.Close()

type testStruct struct {
name string
id int
consistency Consistency
expected string
}

testCases := []testStruct{
{
name: "Any",
consistency: Any,
expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got ANY",
}, {
name: "One",
consistency: One,
expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got ONE",
}, {
name: "Two",
consistency: Two,
expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got TWO",
}, {
name: "Three",
consistency: Three,
expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got THREE",
}, {
name: "Quorum",
consistency: Quorum,
expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got QUORUM",
}, {
name: "LocalQuorum",
consistency: LocalQuorum,
expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got LOCAL_QUORUM",
}, {
name: "EachQuorum",
consistency: EachQuorum,
expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got EACH_QUORUM",
}, {
name: "Serial",
id: 8,
consistency: Serial,
expected: "",
}, {
name: "LocalSerial",
id: 9,
consistency: LocalSerial,
expected: "",
}, {
name: "LocalOne",
consistency: LocalOne,
expected: "serial consistency can only be SERIAL or LOCAL_SERIAL got LOCAL_ONE",
},
}

err := session.Query("CREATE TABLE IF NOT EXISTS gocql_test.consistency_serial (id int PRIMARY KEY)").Exec()
if err != nil {
t.Fatalf("can't create consistency_serial table:%v", err)
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.expected == "" {
err = session.Query("INSERT INTO gocql_test.consistency_serial (id) VALUES (?)", tc.id).SerialConsistency(tc.consistency).Exec()
if err != nil {
t.Fatal(err)
}

var receivedID int
err = session.Query("SELECT * FROM gocql_test.consistency_serial WHERE id=?", tc.id).Scan(&receivedID)
if err != nil {
t.Fatal(err)
}

require.Equal(t, tc.id, receivedID)
} else {
require.PanicsWithValue(t, tc.expected, func() {
session.Query("INSERT INTO gocql_test.consistency_serial (id) VALUES (?)", tc.id).SerialConsistency(tc.consistency)
})
}
})
}
}

func TestDurationType(t *testing.T) {
session := createSession(t)
defer session.Close()
Expand Down
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type ClusterConfig struct {

// Consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL.
// Default: unset
SerialConsistency SerialConsistency
SerialConsistency Consistency

// SslOpts configures TLS use when HostDialer is not set.
// SslOpts is ignored if HostDialer is set.
Expand Down
58 changes: 20 additions & 38 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ const (

type Consistency uint16

// SerialConsistency is deprecated. Use Consistency instead.
type SerialConsistency = Consistency

const (
Any Consistency = 0x00
One Consistency = 0x01
Expand All @@ -201,6 +204,8 @@ const (
All Consistency = 0x05
LocalQuorum Consistency = 0x06
EachQuorum Consistency = 0x07
Serial Consistency = 0x08
LocalSerial Consistency = 0x09
LocalOne Consistency = 0x0A
)

Expand All @@ -224,6 +229,10 @@ func (c Consistency) String() string {
return "EACH_QUORUM"
case LocalOne:
return "LOCAL_ONE"
case Serial:
return "SERIAL"
case LocalSerial:
return "LOCAL_SERIAL"
default:
return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c))
}
Expand Down Expand Up @@ -253,13 +262,21 @@ func (c *Consistency) UnmarshalText(text []byte) error {
*c = EachQuorum
case "LOCAL_ONE":
*c = LocalOne
case "SERIAL":
*c = Serial
case "LOCAL_SERIAL":
*c = LocalSerial
default:
return fmt.Errorf("invalid consistency %q", string(text))
}

return nil
}

func (c Consistency) IsSerial() bool {
return c == Serial || c == LocalSerial

}
func ParseConsistency(s string) Consistency {
var c Consistency
if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil {
Expand All @@ -286,41 +303,6 @@ func MustParseConsistency(s string) (Consistency, error) {
return c, nil
}

type SerialConsistency uint16

const (
Serial SerialConsistency = 0x08
LocalSerial SerialConsistency = 0x09
)

func (s SerialConsistency) String() string {
switch s {
case Serial:
return "SERIAL"
case LocalSerial:
return "LOCAL_SERIAL"
default:
return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s))
}
}

func (s SerialConsistency) MarshalText() (text []byte, err error) {
return []byte(s.String()), nil
}

func (s *SerialConsistency) UnmarshalText(text []byte) error {
switch string(text) {
case "SERIAL":
*s = Serial
case "LOCAL_SERIAL":
*s = LocalSerial
default:
return fmt.Errorf("invalid consistency %q", string(text))
}

return nil
}

const (
apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
)
Expand Down Expand Up @@ -1452,7 +1434,7 @@ type queryParams struct {
values []queryValues
pageSize int
pagingState []byte
serialConsistency SerialConsistency
serialConsistency Consistency
// v3+
defaultTimestamp bool
defaultTimestampValue int64
Expand Down Expand Up @@ -1541,7 +1523,7 @@ func (f *framer) writeQueryParams(opts *queryParams) {
}

if opts.serialConsistency > 0 {
f.writeConsistency(Consistency(opts.serialConsistency))
f.writeConsistency(opts.serialConsistency)
}

if f.proto > protoVersion2 && opts.defaultTimestamp {
Expand Down Expand Up @@ -1653,7 +1635,7 @@ type writeBatchFrame struct {
consistency Consistency

// v3+
serialConsistency SerialConsistency
serialConsistency Consistency
defaultTimestamp bool
defaultTimestampValue int64

Expand Down
18 changes: 14 additions & 4 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
}

if cfg.SerialConsistency > 0 && !cfg.SerialConsistency.IsSerial() {
return nil, fmt.Errorf("the default SerialConsistency level is not allowed to be anything else but SERIAL or LOCAL_SERIAL. Recived value: %v", cfg.SerialConsistency)
}

// TODO: we should take a context in here at some point
ctx, cancel := context.WithCancel(context.TODO())

Expand Down Expand Up @@ -915,7 +919,7 @@ type Query struct {
rt RetryPolicy
spec SpeculativeExecutionPolicy
binding func(q *QueryInfo) ([]interface{}, error)
serialCons SerialConsistency
serialCons Consistency
defaultTimestamp bool
defaultTimestampValue int64
disableSkipMetadata bool
Expand Down Expand Up @@ -1264,7 +1268,10 @@ func (q *Query) Bind(v ...interface{}) *Query {
// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
// SERIAL. This option will be ignored for anything else that a
// conditional update/insert.
func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
func (q *Query) SerialConsistency(cons Consistency) *Query {
if !cons.IsSerial() {
panic("serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String())
}
q.serialCons = cons
return q
}
Expand Down Expand Up @@ -1735,7 +1742,7 @@ type Batch struct {
trace Tracer
observer BatchObserver
session *Session
serialCons SerialConsistency
serialCons Consistency
defaultTimestamp bool
defaultTimestampValue int64
context context.Context
Expand Down Expand Up @@ -1914,7 +1921,10 @@ func (b *Batch) Size() int {
// conditional update/insert.
//
// Only available for protocol 3 and above
func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
func (b *Batch) SerialConsistency(cons Consistency) *Batch {
if !cons.IsSerial() {
panic("serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String())
}
b.serialCons = cons
return b
}
Expand Down

0 comments on commit 769a5c1

Please sign in to comment.