Skip to content

Commit

Permalink
Integration test for Metadata_changed mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
worryg0d committed Oct 28, 2024
1 parent 31d10f9 commit 2a5056d
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 11 deletions.
102 changes: 98 additions & 4 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3397,8 +3397,6 @@ func TestLargeSizeQuery(t *testing.T) {
t.Fatal(err)
}

defer session.Close()

longString := strings.Repeat("a", 500_000)

err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", longString).Exec()
Expand Down Expand Up @@ -3430,8 +3428,6 @@ func TestQueryCompressionNotWorthIt(t *testing.T) {
t.Fatal(err)
}

defer session.Close()

str := "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890!@#$%^&*()_+"
err := session.Query("INSERT INTO gocql_test.large_size_query (id, text_col) VALUES (?, ?)", "1", str).Exec()
if err != nil {
Expand All @@ -3446,3 +3442,101 @@ func TestQueryCompressionNotWorthIt(t *testing.T) {

require.Equal(t, str, result)
}

func TestPrepareExecuteMetadataChangedFlag(t *testing.T) {
// This test ensures that the whole Metadata_changed flow
// is handled properly.
//
// To trigger C* to return Metadata_changed we should do:
// 1. Create a table
// 2. Prepare stmt which uses the created table
// 3. Change the table schema in order to affect prepared stmt (e.g. add a column)
// 4. Execute prepared stmt. As a result C* should return RESULT/ROWS response with
// Metadata_changed flag, new metadata id and updated metadata resultset.
//
// The driver should handle this by updating its prepared statement inside the cache
// when it receives RESULT/ROWS with Metadata_changed flag
session := createSession(t)
defer session.Close()

if session.cfg.ProtoVersion < protoVersion5 {
t.Skip("Metadata_changed mechanism is only available in proto > 4")
}

if err := createTable(session, "CREATE TABLE IF NOT EXISTS gocql_test.metadata_changed(id int, PRIMARY KEY (id))"); err != nil {
t.Fatal(err)
}

err := session.Query("INSERT INTO gocql_test.metadata_changed (id) VALUES (?)", 1).Exec()
if err != nil {
t.Fatal(err)
}

// We have to specify conn for all queries to ensure that
// all queries are running on the same node
conn := session.getConn()

const selectStmt = "SELECT * FROM gocql_test.metadata_changed"
queryBeforeTableAltering := session.Query(selectStmt)
queryBeforeTableAltering.conn = conn
row := make(map[string]interface{})
err = queryBeforeTableAltering.MapScan(row)
if err != nil {
t.Fatal(err)
}

require.Len(t, row, 1, "Expected to retrieve a single column")
stmtCacheKey := session.stmtsLRU.keyFor(conn.host.HostID(), conn.currentKeyspace, queryBeforeTableAltering.stmt)
inflight, _ := session.stmtsLRU.get(stmtCacheKey)
preparedStatementBeforeTableAltering := inflight.preparedStatment

// Changing table schema in order to cause C* to return RESULT/ROWS Metadata_changed
alteringTableQuery := session.Query("ALTER TABLE gocql_test.metadata_changed ADD new_col int")
alteringTableQuery.conn = conn
err = alteringTableQuery.Exec()
if err != nil {
t.Fatal(err)
}

// Expecting C* will return RESULT/ROWS Metadata_changed
// and it will be properly handled
queryAfterTableAltering := session.Query(selectStmt)
queryAfterTableAltering.conn = conn
row = make(map[string]interface{})
err = queryAfterTableAltering.MapScan(row)
if err != nil {
t.Fatal(err)
}

// Ensuring if cache contains updated prepared statement
require.Len(t, row, 2, "Expected to retrieve both columns")
inflight, _ = session.stmtsLRU.get(stmtCacheKey)
preparedStatementAfterTableAltering := inflight.preparedStatment
require.NotEqual(t, preparedStatementBeforeTableAltering.resultMetadataID, preparedStatementAfterTableAltering.resultMetadataID)
require.NotEqual(t, preparedStatementBeforeTableAltering.response, preparedStatementAfterTableAltering.response)

// Executing prepared stmt and expecting that C* won't return
// Metadata_changed because the table is not being changed.
// Running query with timeout to ensure there is no deadlocks.
// However, it doesn't 100% proves that there is a deadlock...
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30000)
defer cancel()

queryAfterTableAltering2 := session.Query(selectStmt).WithContext(ctx)
queryAfterTableAltering2.conn = conn
row = make(map[string]interface{})
err = queryAfterTableAltering2.MapScan(row)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
t.Fatal("It is likely failed due deadlock")
}
t.Fatal(err)
}

// Ensuring metadata of prepared stmt is not changed
require.Len(t, row, 2, "Expected to retrieve both columns")
inflight, _ = session.stmtsLRU.get(stmtCacheKey)
preparedStatementAfterTableAltering2 := inflight.preparedStatment
require.Equal(t, preparedStatementAfterTableAltering.resultMetadataID, preparedStatementAfterTableAltering2.resultMetadataID)
require.Equal(t, preparedStatementAfterTableAltering.response, preparedStatementAfterTableAltering2.response)
}
8 changes: 3 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,8 +1613,6 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
return &Iter{framer: framer}
case *resultRowsFrame:
if x.meta.newMetadataID != nil {
// Updating the result metadata id in prepared stmt
//
// If a RESULT/Rows message reports
// changed resultset metadata with the Metadata_changed flag, the reported new
// resultset metadata must be used in subsequent executions
Expand All @@ -1631,8 +1629,8 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
},
}
c.session.stmtsLRU.add(stmtCacheKey, newInflight)
// Closing done here because the stmtsLRU already contains a new inflight
// with updated metadata and result metadata id
// The driver should close this done to avoid deadlocks of
// other subsequent requests
close(newInflight.done)
}
}
Expand All @@ -1643,7 +1641,7 @@ func (c *Conn) executeQuery(ctx context.Context, qry *Query) *Iter {
numRows: x.numRows,
}

if params.skipMeta {
if params.skipMeta && x.meta.noMetaData() {
if info != nil {
iter.meta = info.response
iter.meta.pagingState = copyBytes(x.meta.pagingState)
Expand Down
8 changes: 6 additions & 2 deletions frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,8 +1031,12 @@ func (r *resultMetadata) morePages() bool {
return r.flags&flagHasMorePages == flagHasMorePages
}

func (r *resultMetadata) noMetaData() bool {
return r.flags&flagNoMetaData == flagNoMetaData
}

func (r resultMetadata) String() string {
return fmt.Sprintf("[metadata flags=0x%x paging_state=% X columns=%v]", r.flags, r.pagingState, r.columns)
return fmt.Sprintf("[metadata flags=0x%x paging_state=% X columns=%v new_metadata_id=% X]", r.flags, r.pagingState, r.columns, r.newMetadataID)
}

func (f *framer) readCol(col *ColumnInfo, meta *resultMetadata, globalSpec bool, keyspace, table string) {
Expand Down Expand Up @@ -1072,7 +1076,7 @@ func (f *framer) parseResultMetadata() resultMetadata {
meta.newMetadataID = copyBytes(f.readShortBytes())
}

if meta.flags&flagNoMetaData == flagNoMetaData {
if meta.noMetaData() {
return meta
}

Expand Down

0 comments on commit 2a5056d

Please sign in to comment.