Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge ring and ringDescriber #1849

Open
wants to merge 7 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"context"
"errors"
"fmt"
"github.com/stretchr/testify/require"
"io"
"math"
"math/big"
Expand All @@ -45,6 +44,8 @@ import (
"time"
"unicode"

"github.com/stretchr/testify/require"

"gopkg.in/inf.v0"
)

Expand Down Expand Up @@ -808,7 +809,7 @@ func TestReconnection(t *testing.T) {
session := createSessionFromCluster(cluster, t)
defer session.Close()

h := session.ring.allHosts()[0]
h := session.hostSource.getHostsList()[0]
session.handleNodeDown(h.ConnectAddress(), h.Port())

if h.State() != NodeDown {
Expand Down Expand Up @@ -1613,7 +1614,7 @@ func TestPrepare_PreparedCacheEviction(t *testing.T) {
}

// Walk through all the configured hosts and test cache retention and eviction
for _, host := range session.ring.hosts {
for _, host := range session.hostSource.hosts {
_, ok := session.stmtsLRU.lru.Get(session.stmtsLRU.keyFor(host.HostID(), session.cfg.Keyspace, "SELECT id,mod FROM prepcachetest WHERE id = 0"))
if ok {
t.Errorf("expected first select to be purged but was in cache for host=%q", host)
Expand Down Expand Up @@ -2769,7 +2770,7 @@ func TestTokenAwareConnPool(t *testing.T) {
session := createSessionFromCluster(cluster, t)
defer session.Close()

expectedPoolSize := cluster.NumConns * len(session.ring.allHosts())
expectedPoolSize := cluster.NumConns * len(session.hostSource.getHostsList())

// wait for pool to fill
for i := 0; i < 10; i++ {
Expand Down
2 changes: 1 addition & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1697,7 +1697,7 @@ func (c *Conn) awaitSchemaAgreement(ctx context.Context) (err error) {
}

for _, row := range rows {
host, err := c.session.hostInfoFromMap(row, &HostInfo{connectAddress: c.host.ConnectAddress(), port: c.session.cfg.Port})
host, err := hostInfoFromMap(row, &HostInfo{connectAddress: c.host.ConnectAddress(), port: c.session.cfg.Port}, c.session.cfg.translateAddressPort)
if err != nil {
goto cont
}
Expand Down
6 changes: 3 additions & 3 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,12 @@ type connHost struct {
func (c *controlConn) setupConn(conn *Conn) error {
// we need up-to-date host info for the filterHost call below
iter := conn.querySystemLocal(context.TODO())
host, err := c.session.hostInfoFromIter(iter, conn.host.connectAddress, conn.conn.RemoteAddr().(*net.TCPAddr).Port)
host, err := hostInfoFromIter(iter, conn.host.connectAddress, conn.conn.RemoteAddr().(*net.TCPAddr).Port, c.session.cfg.translateAddressPort)
if err != nil {
return err
}

host = c.session.ring.addOrUpdate(host)
host = c.session.hostSource.addOrUpdate(host)

if c.session.cfg.filterHost(host) {
return fmt.Errorf("host was filtered: %v", host.ConnectAddress())
Expand Down Expand Up @@ -385,7 +385,7 @@ func (c *controlConn) reconnect() {
}

func (c *controlConn) attemptReconnect() (*Conn, error) {
hosts := c.session.ring.allHosts()
hosts := c.session.hostSource.getHostsList()
hosts = shuffleHosts(hosts)

// keep the old behavior of connecting to the old host first by moving it to
Expand Down
4 changes: 2 additions & 2 deletions control_ccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
}()

assertNodeDown := func() error {
hosts := session.ring.currentHosts()
hosts := session.hostSource.getHostsMap()
if len(hosts) != 1 {
return fmt.Errorf("expected 1 host in ring but there were %v", len(hosts))
}
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestControlConn_ReconnectRefreshesRing(t *testing.T) {
}

assertNodeUp := func() error {
hosts := session.ring.currentHosts()
hosts := session.hostSource.getHostsMap()
if len(hosts) != len(allCcmHosts) {
return fmt.Errorf("expected %v hosts in ring but there were %v", len(allCcmHosts), len(hosts))
}
Expand Down
4 changes: 2 additions & 2 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *Session) handleNodeUp(eventIp net.IP, eventPort int) {
s.logger.Printf("gocql: Session.handleNodeUp: %s:%d\n", eventIp.String(), eventPort)
}

host, ok := s.ring.getHostByIP(eventIp.String())
host, ok := s.hostSource.getHostByIP(eventIp.String())
if !ok {
s.debounceRingRefresh()
return
Expand Down Expand Up @@ -256,7 +256,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) {
s.logger.Printf("gocql: Session.handleNodeDown: %s:%d\n", ip.String(), port)
}

host, ok := s.ring.getHostByIP(ip.String())
host, ok := s.hostSource.getHostByIP(ip.String())
if ok {
host.setState(NodeDown)
if s.cfg.filterHost(host) {
Expand Down
6 changes: 3 additions & 3 deletions events_ccm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestEventNodeDownControl(t *testing.T) {
}
session.pool.mu.RUnlock()

host := session.ring.getHost(node.Addr)
host := session.hostSource.getHost(node.Addr)
if host == nil {
t.Fatal("node not in metadata ring")
} else if host.IsUp() {
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestEventNodeDown(t *testing.T) {
t.Fatal("node not removed after remove event")
}

host := session.ring.getHost(node.Addr)
host := session.hostSource.getHost(node.Addr)
if host == nil {
t.Fatal("node not in metadata ring")
} else if host.IsUp() {
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestEventNodeUp(t *testing.T) {
t.Fatal("node not added after node added event")
}

host := session.ring.getHost(node.Addr)
host := session.hostSource.getHost(node.Addr)
if host == nil {
t.Fatal("node not in metadata ring")
} else if !host.IsUp() {
Expand Down
134 changes: 15 additions & 119 deletions host_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
package gocql

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -445,14 +444,6 @@ func (h *HostInfo) String() string {
h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens))
}

// Polls system.peers at a specific interval to find new hosts
type ringDescriber struct {
session *Session
mu sync.Mutex
prevHosts []*HostInfo
prevPartitioner string
}

// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces
func checkSystemSchema(control *controlConn) (bool, error) {
iter := control.query("SELECT * FROM system_schema.keyspaces")
Expand All @@ -471,7 +462,7 @@ func checkSystemSchema(control *controlConn) (bool, error) {

// Given a map that represents a row from either system.local or system.peers
// return as much information as we can in *HostInfo
func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*HostInfo, error) {
func hostInfoFromMap(row map[string]interface{}, host *HostInfo, translateAddressPort func(addr net.IP, port int) (net.IP, int)) (*HostInfo, error) {
const assertErrorMsg = "Assertion failed for %s"
var ok bool

Expand Down Expand Up @@ -583,14 +574,14 @@ func (s *Session) hostInfoFromMap(row map[string]interface{}, host *HostInfo) (*
// Not sure what the port field will be called until the JIRA issue is complete
}

ip, port := s.cfg.translateAddressPort(host.ConnectAddress(), host.port)
ip, port := translateAddressPort(host.ConnectAddress(), host.port)
host.connectAddress = ip
host.port = port

return host, nil
}

func (s *Session) hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPort int) (*HostInfo, error) {
func hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPort int, translateAddressPort func(addr net.IP, port int) (net.IP, int)) (*HostInfo, error) {
rows, err := iter.SliceMap()
if err != nil {
// TODO(zariel): make typed error
Expand All @@ -601,106 +592,13 @@ func (s *Session) hostInfoFromIter(iter *Iter, connectAddress net.IP, defaultPor
return nil, errors.New("query returned 0 rows")
}

host, err := s.hostInfoFromMap(rows[0], &HostInfo{connectAddress: connectAddress, port: defaultPort})
host, err := hostInfoFromMap(rows[0], &HostInfo{connectAddress: connectAddress, port: defaultPort}, translateAddressPort)
if err != nil {
return nil, err
}
return host, nil
}

// Ask the control node for the local host info
func (r *ringDescriber) getLocalHostInfo() (*HostInfo, error) {
if r.session.control == nil {
return nil, errNoControl
}

iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
return ch.conn.querySystemLocal(context.TODO())
})

if iter == nil {
return nil, errNoControl
}

host, err := r.session.hostInfoFromIter(iter, nil, r.session.cfg.Port)
if err != nil {
return nil, fmt.Errorf("could not retrieve local host info: %w", err)
}
return host, nil
}

// Ask the control node for host info on all it's known peers
func (r *ringDescriber) getClusterPeerInfo(localHost *HostInfo) ([]*HostInfo, error) {
if r.session.control == nil {
return nil, errNoControl
}

var peers []*HostInfo
iter := r.session.control.withConnHost(func(ch *connHost) *Iter {
return ch.conn.querySystemPeers(context.TODO(), localHost.version)
})

if iter == nil {
return nil, errNoControl
}

rows, err := iter.SliceMap()
if err != nil {
// TODO(zariel): make typed error
return nil, fmt.Errorf("unable to fetch peer host info: %s", err)
}

for _, row := range rows {
// extract all available info about the peer
host, err := r.session.hostInfoFromMap(row, &HostInfo{port: r.session.cfg.Port})
if err != nil {
return nil, err
} else if !isValidPeer(host) {
// If it's not a valid peer
r.session.logger.Printf("Found invalid peer '%s' "+
"Likely due to a gossip or snitch issue, this host will be ignored", host)
continue
}

peers = append(peers, host)
}

return peers, nil
}

// Return true if the host is a valid peer
func isValidPeer(host *HostInfo) bool {
return !(len(host.RPCAddress()) == 0 ||
host.hostId == "" ||
host.dataCenter == "" ||
host.rack == "" ||
len(host.tokens) == 0)
}

// GetHosts returns a list of hosts found via queries to system.local and system.peers
func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
r.mu.Lock()
defer r.mu.Unlock()

localHost, err := r.getLocalHostInfo()
if err != nil {
return r.prevHosts, r.prevPartitioner, err
}

peerHosts, err := r.getClusterPeerInfo(localHost)
if err != nil {
return r.prevHosts, r.prevPartitioner, err
}

hosts := append([]*HostInfo{localHost}, peerHosts...)
var partitioner string
if len(hosts) > 0 {
partitioner = hosts[0].Partitioner()
}

return hosts, partitioner, nil
}

// debounceRingRefresh submits a ring refresh request to the ring refresh debouncer.
func (s *Session) debounceRingRefresh() {
s.ringRefresher.debounce()
Expand All @@ -716,21 +614,21 @@ func (s *Session) refreshRing() error {
return err
}

func refreshRing(r *ringDescriber) error {
hosts, partitioner, err := r.GetHosts()
func refreshRing(s *Session) error {
hosts, partitioner, err := s.hostSource.GetHostsFromSystem()
if err != nil {
return err
}

prevHosts := r.session.ring.currentHosts()
prevHosts := s.hostSource.getHostsMap()

for _, h := range hosts {
if r.session.cfg.filterHost(h) {
if s.cfg.filterHost(h) {
continue
}

if host, ok := r.session.ring.addHostIfMissing(h); !ok {
r.session.startPoolFill(h)
if host, ok := s.hostSource.addHostIfMissing(h); !ok {
s.startPoolFill(h)
} else {
// host (by hostID) already exists; determine if IP has changed
newHostID := h.HostID()
Expand All @@ -744,23 +642,21 @@ func refreshRing(r *ringDescriber) error {
} else {
// host IP has changed
// remove old HostInfo (w/old IP)
r.session.removeHost(existing)
if _, alreadyExists := r.session.ring.addHostIfMissing(h); alreadyExists {
s.removeHost(existing)
if _, alreadyExists := s.hostSource.addHostIfMissing(h); alreadyExists {
return fmt.Errorf("add new host=%s after removal: %w", h, ErrHostAlreadyExists)
}
// add new HostInfo (same hostID, new IP)
r.session.startPoolFill(h)
s.startPoolFill(h)
}
}
delete(prevHosts, h.HostID())
}

for _, host := range prevHosts {
r.session.removeHost(host)
s.removeHost(host)
}

r.session.metadata.setPartitioner(partitioner)
r.session.policy.SetPartitioner(partitioner)
s.policy.SetPartitioner(partitioner)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ func TestAuthentication(t *testing.T) {
session.Close()
}

func TestGetHosts(t *testing.T) {
func TestGetHostsFromSystem(t *testing.T) {
clusterHosts := getClusterHosts()
cluster := createCluster()
session := createSessionFromCluster(cluster, t)

hosts, partitioner, err := session.hostSource.GetHosts()
hosts, partitioner, err := session.hostSource.GetHostsFromSystem()

assertTrue(t, "err == nil", err == nil)
assertEqual(t, "len(hosts)", len(clusterHosts), len(hosts))
Expand Down
Loading