Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSGO-26 consistency serial was added #1837

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Allow SERIAL and LOCAL_SERIAL on SELECT statements [CASSGO-26](https://issues.apache.org/jira/browse/CASSGO-26)

### Changed

- Don't restrict server authenticator unless PasswordAuthentictor.AllowedAuthenticators is provided (CASSGO-19)
Expand Down
88 changes: 88 additions & 0 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import (
"unicode"

inf "gopkg.in/inf.v0"

"github.com/stretchr/testify/require"
)

func TestEmptyHosts(t *testing.T) {
Expand Down Expand Up @@ -504,6 +506,92 @@ func TestCAS(t *testing.T) {
}
}

func TestConsistencySerial(t *testing.T) {
session := createSession(t)
defer session.Close()

type testStruct struct {
name string
id int
consistency Consistency
expectedPanicValue string
}

testCases := []testStruct{
{
name: "Any",
consistency: Any,
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got ANY",
}, {
name: "One",
consistency: One,
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got ONE",
}, {
name: "Two",
consistency: Two,
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got TWO",
}, {
name: "Three",
consistency: Three,
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got THREE",
}, {
name: "Quorum",
consistency: Quorum,
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got QUORUM",
}, {
name: "LocalQuorum",
consistency: LocalQuorum,
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got LOCAL_QUORUM",
}, {
name: "EachQuorum",
consistency: EachQuorum,
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got EACH_QUORUM",
}, {
name: "Serial",
id: 8,
consistency: Serial,
expectedPanicValue: "",
}, {
name: "LocalSerial",
id: 9,
consistency: LocalSerial,
expectedPanicValue: "",
}, {
name: "LocalOne",
consistency: LocalOne,
expectedPanicValue: "serial consistency can only be SERIAL or LOCAL_SERIAL got LOCAL_ONE",
},
}

err := session.Query("CREATE TABLE IF NOT EXISTS gocql_test.consistency_serial (id int PRIMARY KEY)").Exec()
if err != nil {
t.Fatalf("can't create consistency_serial table:%v", err)
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
if tc.expectedPanicValue == "" {
err = session.Query("INSERT INTO gocql_test.consistency_serial (id) VALUES (?)", tc.id).SerialConsistency(tc.consistency).Exec()
if err != nil {
t.Fatal(err)
}

var receivedID int
err = session.Query("SELECT * FROM gocql_test.consistency_serial WHERE id=?", tc.id).Scan(&receivedID)
if err != nil {
t.Fatal(err)
}

require.Equal(t, tc.id, receivedID)
} else {
require.PanicsWithValue(t, tc.expectedPanicValue, func() {
session.Query("INSERT INTO gocql_test.consistency_serial (id) VALUES (?)", tc.id).SerialConsistency(tc.consistency)
})
}
})
}
}

func TestDurationType(t *testing.T) {
session := createSession(t)
defer session.Close()
Expand Down
2 changes: 1 addition & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ type ClusterConfig struct {

// Consistency for the serial part of queries, values can be either SERIAL or LOCAL_SERIAL.
// Default: unset
SerialConsistency SerialConsistency
SerialConsistency Consistency

// SslOpts configures TLS use when HostDialer is not set.
// SslOpts is ignored if HostDialer is set.
Expand Down
58 changes: 20 additions & 38 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ const (

type Consistency uint16

// SerialConsistency is deprecated. Use Consistency instead.
type SerialConsistency = Consistency

const (
Any Consistency = 0x00
One Consistency = 0x01
Expand All @@ -201,6 +204,8 @@ const (
All Consistency = 0x05
LocalQuorum Consistency = 0x06
EachQuorum Consistency = 0x07
Serial Consistency = 0x08
LocalSerial Consistency = 0x09
LocalOne Consistency = 0x0A
)

Expand All @@ -224,6 +229,10 @@ func (c Consistency) String() string {
return "EACH_QUORUM"
case LocalOne:
return "LOCAL_ONE"
case Serial:
return "SERIAL"
case LocalSerial:
return "LOCAL_SERIAL"
default:
return fmt.Sprintf("UNKNOWN_CONS_0x%x", uint16(c))
}
Expand Down Expand Up @@ -253,13 +262,21 @@ func (c *Consistency) UnmarshalText(text []byte) error {
*c = EachQuorum
case "LOCAL_ONE":
*c = LocalOne
case "SERIAL":
*c = Serial
case "LOCAL_SERIAL":
*c = LocalSerial
default:
return fmt.Errorf("invalid consistency %q", string(text))
}

return nil
}

func (c Consistency) IsSerial() bool {
return c == Serial || c == LocalSerial

}
func ParseConsistency(s string) Consistency {
var c Consistency
if err := c.UnmarshalText([]byte(strings.ToUpper(s))); err != nil {
Expand All @@ -286,41 +303,6 @@ func MustParseConsistency(s string) (Consistency, error) {
return c, nil
}

type SerialConsistency uint16

const (
Serial SerialConsistency = 0x08
LocalSerial SerialConsistency = 0x09
)

func (s SerialConsistency) String() string {
switch s {
case Serial:
return "SERIAL"
case LocalSerial:
return "LOCAL_SERIAL"
default:
return fmt.Sprintf("UNKNOWN_SERIAL_CONS_0x%x", uint16(s))
}
}

func (s SerialConsistency) MarshalText() (text []byte, err error) {
return []byte(s.String()), nil
}

func (s *SerialConsistency) UnmarshalText(text []byte) error {
switch string(text) {
case "SERIAL":
*s = Serial
case "LOCAL_SERIAL":
*s = LocalSerial
default:
return fmt.Errorf("invalid consistency %q", string(text))
}

return nil
}

const (
apacheCassandraTypePrefix = "org.apache.cassandra.db.marshal."
)
Expand Down Expand Up @@ -1452,7 +1434,7 @@ type queryParams struct {
values []queryValues
pageSize int
pagingState []byte
serialConsistency SerialConsistency
serialConsistency Consistency
// v3+
defaultTimestamp bool
defaultTimestampValue int64
Expand Down Expand Up @@ -1541,7 +1523,7 @@ func (f *framer) writeQueryParams(opts *queryParams) {
}

if opts.serialConsistency > 0 {
f.writeConsistency(Consistency(opts.serialConsistency))
f.writeConsistency(opts.serialConsistency)
}

if f.proto > protoVersion2 && opts.defaultTimestamp {
Expand Down Expand Up @@ -1653,7 +1635,7 @@ type writeBatchFrame struct {
consistency Consistency

// v3+
serialConsistency SerialConsistency
serialConsistency Consistency
defaultTimestamp bool
defaultTimestampValue int64

Expand Down
18 changes: 14 additions & 4 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func NewSession(cfg ClusterConfig) (*Session, error) {
return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
}

if cfg.SerialConsistency > 0 && !cfg.SerialConsistency.IsSerial() {
return nil, fmt.Errorf("the default SerialConsistency level is not allowed to be anything else but SERIAL or LOCAL_SERIAL. Recived value: %v", cfg.SerialConsistency)
}
Comment on lines +147 to +149

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about adding a test for this?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And overall, how about adding another test to verify that these two consistency levels are allowed for use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added some tests to cover each consistency level, please check it


// TODO: we should take a context in here at some point
ctx, cancel := context.WithCancel(context.TODO())

Expand Down Expand Up @@ -915,7 +919,7 @@ type Query struct {
rt RetryPolicy
spec SpeculativeExecutionPolicy
binding func(q *QueryInfo) ([]interface{}, error)
serialCons SerialConsistency
serialCons Consistency
defaultTimestamp bool
defaultTimestampValue int64
disableSkipMetadata bool
Expand Down Expand Up @@ -1264,7 +1268,10 @@ func (q *Query) Bind(v ...interface{}) *Query {
// either SERIAL or LOCAL_SERIAL and if not present, it defaults to
// SERIAL. This option will be ignored for anything else that a
// conditional update/insert.
func (q *Query) SerialConsistency(cons SerialConsistency) *Query {
func (q *Query) SerialConsistency(cons Consistency) *Query {
if !cons.IsSerial() {
panic("serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String())
}
q.serialCons = cons
return q
}
Expand Down Expand Up @@ -1735,7 +1742,7 @@ type Batch struct {
trace Tracer
observer BatchObserver
session *Session
serialCons SerialConsistency
serialCons Consistency
defaultTimestamp bool
defaultTimestampValue int64
context context.Context
Expand Down Expand Up @@ -1914,7 +1921,10 @@ func (b *Batch) Size() int {
// conditional update/insert.
//
// Only available for protocol 3 and above
func (b *Batch) SerialConsistency(cons SerialConsistency) *Batch {
func (b *Batch) SerialConsistency(cons Consistency) *Batch {
if !cons.IsSerial() {
panic("serial consistency can only be SERIAL or LOCAL_SERIAL got " + cons.String())
}
b.serialCons = cons
return b
}
Expand Down