From 198454b2bd22820b0aebdc0f6ff49659a3a40934 Mon Sep 17 00:00:00 2001 From: Islam Aliev Date: Thu, 14 Nov 2024 21:35:59 +0100 Subject: [PATCH] refactor: Consolidate node-related fields into a struct (#3232) ## Relevant issue(s) Resolves #3208 ## Description All node-related fields are moved into a separate `nodeState` struct so now we don't need to maintain all slices and node indexes and it's also easier to reason about a node's state. --- tests/integration/acp.go | 4 +- tests/integration/db.go | 42 +++++++-- tests/integration/events.go | 76 ++++++++-------- tests/integration/lens.go | 2 +- tests/integration/p2p.go | 14 +-- tests/integration/state.go | 66 +++++++------- tests/integration/utils.go | 168 +++++++++++------------------------- 7 files changed, 167 insertions(+), 205 deletions(-) diff --git a/tests/integration/acp.go b/tests/integration/acp.go index b98be7a059..78a5a50997 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -304,7 +304,7 @@ func getCollectionAndDocInfo(s *state, collectionID, docInd, nodeID int) (string collectionName := "" docID := "" if collectionID != -1 { - collection := s.collections[nodeID][collectionID] + collection := s.nodes[nodeID].collections[collectionID] if !collection.Description().Name.HasValue() { require.Fail(s.t, "Expected non-empty collection name, but it was empty.", s.testCase.Description) } @@ -617,7 +617,7 @@ func getNodeAudience(s *state, nodeIndex int) immutable.Option[string] { if nodeIndex >= len(s.nodes) { return immutable.None[string]() } - switch client := s.nodes[nodeIndex].(type) { + switch client := s.nodes[nodeIndex].Client.(type) { case *http.Wrapper: return immutable.Some(strings.TrimPrefix(client.Host(), "http://")) case *cli.Wrapper: diff --git a/tests/integration/db.go b/tests/integration/db.go index 784ff6952f..8a099e0a94 100644 --- a/tests/integration/db.go +++ b/tests/integration/db.go @@ -22,6 +22,7 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/crypto" "github.com/sourcenetwork/defradb/internal/kms" + "github.com/sourcenetwork/defradb/net" "github.com/sourcenetwork/defradb/node" changeDetector "github.com/sourcenetwork/defradb/tests/change_detector" ) @@ -140,7 +141,7 @@ func getDefaultNodeOpts() []node.Option { // setupNode returns the database implementation for the current // testing state. The database type on the test state is used to // select the datastore implementation to use. -func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) { +func setupNode(s *state, opts ...node.Option) (*nodeState, error) { opts = append(getDefaultNodeOpts(), opts...) switch acpType { @@ -189,20 +190,51 @@ func setupNode(s *state, opts ...node.Option) (*node.Node, string, error) { opts = append(opts, node.WithStoreType(node.MemoryStore)) default: - return nil, "", fmt.Errorf("invalid database type: %v", s.dbt) + return nil, fmt.Errorf("invalid database type: %v", s.dbt) } if s.kms == PubSubKMSType { opts = append(opts, node.WithKMS(kms.PubSubServiceType)) } + netOpts := make([]net.NodeOpt, 0) + for _, opt := range opts { + if opt, ok := opt.(net.NodeOpt); ok { + netOpts = append(netOpts, opt) + } + } + + if s.isNetworkEnabled { + opts = append(opts, node.WithDisableP2P(false)) + } + node, err := node.New(s.ctx, opts...) if err != nil { - return nil, "", err + return nil, err } + err = node.Start(s.ctx) if err != nil { - return nil, "", err + return nil, err } - return node, path, nil + + c, err := setupClient(s, node) + require.Nil(s.t, err) + + eventState, err := newEventState(c.Events()) + require.NoError(s.t, err) + + st := &nodeState{ + Client: c, + event: eventState, + p2p: newP2PState(), + dbPath: path, + netOpts: netOpts, + } + + if node.Peer != nil { + st.peerInfo = node.Peer.PeerInfo() + } + + return st, nil } diff --git a/tests/integration/events.go b/tests/integration/events.go index bbe19ce391..0e28f3e3df 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -39,13 +39,13 @@ func waitForNetworkSetupEvents(s *state, nodeID int) { for p2pTopicEvent && replicatorEvents > 0 { select { - case _, ok := <-s.nodeEvents[nodeID].replicator.Message(): + case _, ok := <-s.nodes[nodeID].event.replicator.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for network setup events") } replicatorEvents-- - case _, ok := <-s.nodeEvents[nodeID].p2pTopic.Message(): + case _, ok := <-s.nodes[nodeID].event.p2pTopic.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for network setup events") } @@ -63,7 +63,7 @@ func waitForNetworkSetupEvents(s *state, nodeID int) { // Expected document heads will be updated for the targeted node. func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { select { - case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + case _, ok := <-s.nodes[cfg.SourceNodeID].event.replicator.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for replicator event") } @@ -73,21 +73,21 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { } // all previous documents should be merged on the subscriber node - for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads { - s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val.cid + for key, val := range s.nodes[cfg.SourceNodeID].p2p.actualDocHeads { + s.nodes[cfg.TargetNodeID].p2p.expectedDocHeads[key] = val.cid } // update node connections and replicators - s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} - s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} - s.nodeP2P[cfg.SourceNodeID].replicators[cfg.TargetNodeID] = struct{}{} + s.nodes[cfg.TargetNodeID].p2p.connections[cfg.SourceNodeID] = struct{}{} + s.nodes[cfg.SourceNodeID].p2p.connections[cfg.TargetNodeID] = struct{}{} + s.nodes[cfg.SourceNodeID].p2p.replicators[cfg.TargetNodeID] = struct{}{} } // waitForReplicatorConfigureEvent waits for a node to publish a // replicator completed event on the local event bus. func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { select { - case _, ok := <-s.nodeEvents[cfg.SourceNodeID].replicator.Message(): + case _, ok := <-s.nodes[cfg.SourceNodeID].event.replicator.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for replicator event") } @@ -96,9 +96,9 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { require.Fail(s.t, "timeout waiting for replicator event") } - delete(s.nodeP2P[cfg.TargetNodeID].connections, cfg.SourceNodeID) - delete(s.nodeP2P[cfg.SourceNodeID].connections, cfg.TargetNodeID) - delete(s.nodeP2P[cfg.SourceNodeID].replicators, cfg.TargetNodeID) + delete(s.nodes[cfg.TargetNodeID].p2p.connections, cfg.SourceNodeID) + delete(s.nodes[cfg.SourceNodeID].p2p.connections, cfg.TargetNodeID) + delete(s.nodes[cfg.SourceNodeID].p2p.replicators, cfg.TargetNodeID) } // waitForSubscribeToCollectionEvent waits for a node to publish a @@ -107,7 +107,7 @@ func waitForReplicatorDeleteEvent(s *state, cfg DeleteReplicator) { // Expected document heads will be updated for the subscriber node. func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { select { - case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + case _, ok := <-s.nodes[action.NodeID].event.p2pTopic.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for p2p topic event") } @@ -121,7 +121,7 @@ func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { if collectionIndex == NonExistentCollectionID { continue // don't track non existent collections } - s.nodeP2P[action.NodeID].peerCollections[collectionIndex] = struct{}{} + s.nodes[action.NodeID].p2p.peerCollections[collectionIndex] = struct{}{} } } @@ -129,7 +129,7 @@ func waitForSubscribeToCollectionEvent(s *state, action SubscribeToCollection) { // p2p topic completed event on the local event bus. func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollection) { select { - case _, ok := <-s.nodeEvents[action.NodeID].p2pTopic.Message(): + case _, ok := <-s.nodes[action.NodeID].event.p2pTopic.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for p2p topic event") } @@ -142,7 +142,7 @@ func waitForUnsubscribeToCollectionEvent(s *state, action UnsubscribeToCollectio if collectionIndex == NonExistentCollectionID { continue // don't track non existent collections } - delete(s.nodeP2P[action.NodeID].peerCollections, collectionIndex) + delete(s.nodes[action.NodeID].p2p.peerCollections, collectionIndex) } } @@ -160,7 +160,8 @@ func waitForUpdateEvents( continue // node is not selected } - if _, ok := s.closedNodes[i]; ok { + node := s.nodes[i] + if node.closed { continue // node is closed } @@ -172,7 +173,7 @@ func waitForUpdateEvents( for len(expect) > 0 { var evt event.Update select { - case msg, ok := <-s.nodeEvents[i].update.Message(): + case msg, ok := <-node.event.update.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for update event", "Node %d", i) } @@ -195,7 +196,7 @@ func waitForUpdateEvents( // we only need to update the network state if the nodes // are configured for networking - if i < len(s.nodeConfigs) { + if s.isNetworkEnabled { updateNetworkState(s, i, evt) } } @@ -208,15 +209,16 @@ func waitForUpdateEvents( // from running forever. func waitForMergeEvents(s *state, action WaitForSync) { for nodeID := 0; nodeID < len(s.nodes); nodeID++ { - if _, ok := s.closedNodes[nodeID]; ok { + node := s.nodes[nodeID] + if node.closed { continue // node is closed } - expect := s.nodeP2P[nodeID].expectedDocHeads + expect := node.p2p.expectedDocHeads // remove any docs that are already merged // up to the expected document head - for key, val := range s.nodeP2P[nodeID].actualDocHeads { + for key, val := range node.p2p.actualDocHeads { if head, ok := expect[key]; ok && head.String() == val.cid.String() { delete(expect, key) } @@ -228,7 +230,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { require.Fail(s.t, "doc index %d out of range", docIndex) } docID := s.docIDs[0][docIndex].String() - actual, hasActual := s.nodeP2P[nodeID].actualDocHeads[docID] + actual, hasActual := node.p2p.actualDocHeads[docID] if !hasActual || !actual.decrypted { expectDecrypted[docID] = struct{}{} } @@ -243,7 +245,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { for len(expect) > 0 || len(expectDecrypted) > 0 { var evt event.MergeComplete select { - case msg, ok := <-s.nodeEvents[nodeID].merge.Message(): + case msg, ok := <-node.event.merge.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for merge complete event") } @@ -262,7 +264,7 @@ func waitForMergeEvents(s *state, action WaitForSync) { if ok && head.String() == evt.Merge.Cid.String() { delete(expect, evt.Merge.DocID) } - s.nodeP2P[nodeID].actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} + node.p2p.actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} } } } @@ -272,31 +274,33 @@ func waitForMergeEvents(s *state, action WaitForSync) { func updateNetworkState(s *state, nodeID int, evt event.Update) { // find the correct collection index for this update collectionID := -1 - for i, c := range s.collections[nodeID] { + for i, c := range s.nodes[nodeID].collections { if c.SchemaRoot() == evt.SchemaRoot { collectionID = i } } + node := s.nodes[nodeID] + // update the actual document head on the node that updated it // as the node created the document, it is already decrypted - s.nodeP2P[nodeID].actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true} + node.p2p.actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true} // update the expected document heads of replicator targets - for id := range s.nodeP2P[nodeID].replicators { + for id := range node.p2p.replicators { // replicator target nodes push updates to source nodes - s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid } // update the expected document heads of connected nodes - for id := range s.nodeP2P[nodeID].connections { + for id := range node.p2p.connections { // connected nodes share updates of documents they have in common - if _, ok := s.nodeP2P[id].actualDocHeads[evt.DocID]; ok { - s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + if _, ok := s.nodes[id].p2p.actualDocHeads[evt.DocID]; ok { + s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid } // peer collection subscribers receive updates from any other subscriber node - if _, ok := s.nodeP2P[id].peerCollections[collectionID]; ok { - s.nodeP2P[id].expectedDocHeads[evt.DocID] = evt.Cid + if _, ok := s.nodes[id].p2p.peerCollections[collectionID]; ok { + s.nodes[id].p2p.expectedDocHeads[evt.DocID] = evt.Cid } } @@ -325,9 +329,9 @@ func getEventsForUpdateDoc(s *state, action UpdateDoc) map[string]struct{} { func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { var collection client.Collection if action.NodeID.HasValue() { - collection = s.collections[action.NodeID.Value()][action.CollectionID] + collection = s.nodes[action.NodeID.Value()].collections[action.CollectionID] } else { - collection = s.collections[0][action.CollectionID] + collection = s.nodes[0].collections[action.CollectionID] } docs, err := parseCreateDocs(action, collection) diff --git a/tests/integration/lens.go b/tests/integration/lens.go index c361c55342..fb002f076f 100644 --- a/tests/integration/lens.go +++ b/tests/integration/lens.go @@ -59,7 +59,7 @@ func configureMigration( ) { _, nodes := getNodesWithIDs(action.NodeID, s.nodes) for _, node := range nodes { - txn := getTransaction(s, node, action.TransactionID, action.ExpectedError) + txn := getTransaction(s, node.Client, action.TransactionID, action.ExpectedError) ctx := db.SetContextTxn(s.ctx, txn) err := node.SetMigration(ctx, action.LensConfig) diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 87e224dce4..0123fc1787 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -156,8 +156,8 @@ func connectPeers( err := sourceNode.Connect(s.ctx, targetNode.PeerInfo()) require.NoError(s.t, err) - s.nodeP2P[cfg.SourceNodeID].connections[cfg.TargetNodeID] = struct{}{} - s.nodeP2P[cfg.TargetNodeID].connections[cfg.SourceNodeID] = struct{}{} + s.nodes[cfg.SourceNodeID].p2p.connections[cfg.TargetNodeID] = struct{}{} + s.nodes[cfg.TargetNodeID].p2p.connections[cfg.SourceNodeID] = struct{}{} // Bootstrap triggers a bunch of async stuff for which we have no good way of waiting on. It must be // allowed to complete before documentation begins or it will not even try and sync it. So for now, we @@ -219,7 +219,7 @@ func subscribeToCollection( continue } - col := s.collections[action.NodeID][collectionIndex] + col := s.nodes[action.NodeID].collections[collectionIndex] schemaRoots = append(schemaRoots, col.SchemaRoot()) } @@ -253,7 +253,7 @@ func unsubscribeToCollection( continue } - col := s.collections[action.NodeID][collectionIndex] + col := s.nodes[action.NodeID].collections[collectionIndex] schemaRoots = append(schemaRoots, col.SchemaRoot()) } @@ -281,7 +281,7 @@ func getAllP2PCollections( ) { expectedCollections := []string{} for _, collectionIndex := range action.ExpectedCollectionIDs { - col := s.collections[action.NodeID][collectionIndex] + col := s.nodes[action.NodeID].collections[collectionIndex] expectedCollections = append(expectedCollections, col.SchemaRoot()) } @@ -294,8 +294,8 @@ func getAllP2PCollections( // reconnectPeers makes sure that all peers are connected after a node restart action. func reconnectPeers(s *state) { - for i, n := range s.nodeP2P { - for j := range n.connections { + for i, n := range s.nodes { + for j := range n.p2p.connections { sourceNode := s.nodes[i] targetNode := s.nodes[j] diff --git a/tests/integration/state.go b/tests/integration/state.go index e7130f2ebd..c163a2d9d3 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -114,6 +114,30 @@ func newEventState(bus *event.Bus) (*eventState, error) { }, nil } +// nodeState contains all testing state for a node. +type nodeState struct { + // The node's client active in this test. + clients.Client + // event contains all event node subscriptions. + event *eventState + // p2p contains p2p states for the node. + p2p *p2pState + // The network configurations for the nodes + netOpts []net.NodeOpt + // The path to any file-based databases active in this test. + dbPath string + // Collections by index present in the test. + // Indexes matches that of collectionNames. + collections []client.Collection + // Indexes, by index, by collection index. + indexes [][]client.IndexDescription + // indicates if the node is closed. + closed bool + // peerInfo contains the peer information for the node. + peerInfo peer.AddrInfo +} + +// state contains all testing state. type state struct { // The test context. ctx context.Context @@ -124,6 +148,7 @@ type state struct { // The TestCase currently being executed. testCase TestCase + // The type of KMS currently being tested. kms KMSType // The type of database currently being tested. @@ -153,30 +178,11 @@ type state struct { // These channels will receive a function which asserts results of any subscription requests. subscriptionResultsChans []chan func() - // nodeEvents contains all event node subscriptions. - nodeEvents []*eventState - - // The addresses of any nodes configured. - nodeAddresses []peer.AddrInfo - - // The configurations for any nodes - nodeConfigs [][]net.NodeOpt - // The nodes active in this test. - nodes []clients.Client - - // closedNodes contains the indexes of nodes that have been closed. - closedNodes map[int]struct{} - - // nodeP2P contains p2p states for all nodes - nodeP2P []*p2pState + nodes []*nodeState - // The paths to any file-based databases active in this test. - dbPaths []string - - // Collections by index, by nodeID present in the test. - // Indexes matches that of collectionNames. - collections [][]client.Collection + // The ACP options to share between each node. + acpOptions []node.ACPOpt // The names of the collections active in this test. // Indexes matches that of initial collections. @@ -196,17 +202,14 @@ type state struct { // Valid Cid string values by [UniqueCid] ID. cids map[any]string - // Indexes, by index, by collection index, by node index. - indexes [][][]client.IndexDescription - // isBench indicates wether the test is currently being benchmarked. isBench bool // The SourceHub address used to pay for SourceHub transactions. sourcehubAddress string - // The ACP options to share between each node. - acpOptions []node.ACPOpt + // isNetworkEnabled indicates whether the network is enabled. + isNetworkEnabled bool } // newState returns a new fresh state for the given testCase. @@ -230,19 +233,10 @@ func newState( allActionsDone: make(chan struct{}), identities: map[identityRef]*identityHolder{}, subscriptionResultsChans: []chan func(){}, - nodeEvents: []*eventState{}, - nodeAddresses: []peer.AddrInfo{}, - nodeConfigs: [][]net.NodeOpt{}, - nodeP2P: []*p2pState{}, - nodes: []clients.Client{}, - closedNodes: map[int]struct{}{}, - dbPaths: []string{}, - collections: [][]client.Collection{}, collectionNames: collectionNames, collectionIndexesByRoot: map[uint32]int{}, docIDs: [][]client.DocID{}, cids: map[any]string{}, - indexes: [][][]client.IndexDescription{}, isBench: false, } } diff --git a/tests/integration/utils.go b/tests/integration/utils.go index f827ac0130..39c9ea9624 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -41,7 +41,6 @@ import ( "github.com/sourcenetwork/defradb/net" "github.com/sourcenetwork/defradb/node" changeDetector "github.com/sourcenetwork/defradb/tests/change_detector" - "github.com/sourcenetwork/defradb/tests/clients" "github.com/sourcenetwork/defradb/tests/gen" "github.com/sourcenetwork/defradb/tests/predefined" ) @@ -442,7 +441,7 @@ func createGenerateDocs(s *state, docs []gen.GeneratedDoc, nodeID immutable.Opti func generateDocs(s *state, action GenerateDocs) { nodeIDs, _ := getNodesWithIDs(action.NodeID, s.nodes) firstNodesID := nodeIDs[0] - collections := s.collections[firstNodesID] + collections := s.nodes[firstNodesID].collections defs := make([]client.CollectionDefinition, 0, len(collections)) for _, collection := range collections { if len(action.ForCollections) == 0 || slices.Contains(action.ForCollections, collection.Name().Value()) { @@ -459,7 +458,7 @@ func generateDocs(s *state, action GenerateDocs) { func generatePredefinedDocs(s *state, action CreatePredefinedDocs) { nodeIDs, _ := getNodesWithIDs(action.NodeID, s.nodes) firstNodesID := nodeIDs[0] - collections := s.collections[firstNodesID] + collections := s.nodes[firstNodesID].collections defs := make([]client.CollectionDefinition, 0, len(collections)) for _, col := range collections { defs = append(defs, col.Definition()) @@ -577,10 +576,10 @@ func closeNodes( s *state, action Close, ) { - nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) - for i, node := range nodes { + _, nodes := getNodesWithIDs(action.NodeID, s.nodes) + for _, node := range nodes { node.Close() - s.closedNodes[nodeIDs[i]] = struct{}{} + node.closed = true } } @@ -594,7 +593,7 @@ func closeNodes( // greater than 0. For example if requesting a node with nodeID=2 then the resulting output will contain only // one element (at index 0) caller might accidentally assume that this node belongs to node 0. Therefore, the // caller should always use the returned IDs, instead of guessing the IDs based on node indexes. -func getNodesWithIDs(nodeID immutable.Option[int], nodes []clients.Client) ([]int, []clients.Client) { +func getNodesWithIDs(nodeID immutable.Option[int], nodes []*nodeState) ([]int, []*nodeState) { if !nodeID.HasValue() { indexes := make([]int, len(nodes)) for i := range nodes { @@ -603,7 +602,7 @@ func getNodesWithIDs(nodeID immutable.Option[int], nodes []clients.Client) ([]in return indexes, nodes } - return []int{nodeID.Value()}, []clients.Client{nodes[nodeID.Value()]} + return []int{nodeID.Value()}, []*nodeState{nodes[nodeID.Value()]} } func calculateLenForFlattenedActions(testCase *TestCase) int { @@ -711,83 +710,44 @@ ActionLoop: func setStartingNodes( s *state, ) { - hasExplicitNode := false for _, action := range s.testCase.Actions { switch action.(type) { case ConfigureNode: - hasExplicitNode = true + s.isNetworkEnabled = true } } // If nodes have not been explicitly configured via actions, setup a default one. - if !hasExplicitNode { - node, path, err := setupNode(s) - require.Nil(s.t, err) - - c, err := setupClient(s, node) + if !s.isNetworkEnabled { + st, err := setupNode(s) require.Nil(s.t, err) - - eventState, err := newEventState(c.Events()) - require.NoError(s.t, err) - - s.nodes = append(s.nodes, c) - s.nodeEvents = append(s.nodeEvents, eventState) - s.nodeP2P = append(s.nodeP2P, newP2PState()) - s.dbPaths = append(s.dbPaths, path) + s.nodes = append(s.nodes, st) } } func startNodes(s *state, action Start) { - _, nodes := getNodesWithIDs(action.NodeID, s.nodes) + nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) // We need to restart the nodes in reverse order, to avoid dial backoff issues. for i := len(nodes) - 1; i >= 0; i-- { - nodeIndex := i - if action.NodeID.HasValue() { - nodeIndex = action.NodeID.Value() - } + nodeIndex := nodeIDs[i] originalPath := databaseDir - databaseDir = s.dbPaths[nodeIndex] - node, _, err := setupNode(s, db.WithNodeIdentity(getIdentity(s, NodeIdentity(nodeIndex)))) - require.NoError(s.t, err) - databaseDir = originalPath - - if len(s.nodeConfigs) == 0 { - // If there are no explicit node configuration actions the node will be - // basic (i.e. no P2P stuff) and can be yielded now. - c, err := setupClient(s, node) - require.NoError(s.t, err) - s.nodes[nodeIndex] = c - - eventState, err := newEventState(c.Events()) - require.NoError(s.t, err) - s.nodeEvents[nodeIndex] = eventState - continue + databaseDir = s.nodes[nodeIndex].dbPath + opts := []node.Option{db.WithNodeIdentity(getIdentity(s, NodeIdentity(nodeIndex)))} + for _, opt := range s.nodes[nodeIndex].netOpts { + opts = append(opts, opt) } - - // We need to make sure the node is configured with its old address, otherwise - // a new one may be selected and reconnection to it will fail. var addresses []string - for _, addr := range s.nodeAddresses[nodeIndex].Addrs { + for _, addr := range s.nodes[nodeIndex].peerInfo.Addrs { addresses = append(addresses, addr.String()) } - - nodeOpts := s.nodeConfigs[nodeIndex] - nodeOpts = append(nodeOpts, net.WithListenAddresses(addresses...)) - - node.Peer, err = net.NewPeer(s.ctx, node.DB.Blockstore(), node.DB.Encstore(), node.DB.Events(), nodeOpts...) + opts = append(opts, net.WithListenAddresses(addresses...)) + node, err := setupNode(s, opts...) require.NoError(s.t, err) + databaseDir = originalPath + node.p2p = s.nodes[nodeIndex].p2p + s.nodes[nodeIndex] = node - c, err := setupClient(s, node) - require.NoError(s.t, err) - s.nodes[nodeIndex] = c - - eventState, err := newEventState(c.Events()) - require.NoError(s.t, err) - s.nodeEvents[nodeIndex] = eventState - - delete(s.closedNodes, nodeIndex) - - waitForNetworkSetupEvents(s, i) + waitForNetworkSetupEvents(s, nodeIndex) } // If the db was restarted we need to refresh the collection definitions as the old instances @@ -814,10 +774,8 @@ func restartNodes( func refreshCollections( s *state, ) { - s.collections = make([][]client.Collection, len(s.nodes)) - - for nodeID, node := range s.nodes { - s.collections[nodeID] = make([]client.Collection, len(s.collectionNames)) + for _, node := range s.nodes { + node.collections = make([]client.Collection, len(s.collectionNames)) allCollections, err := node.GetCollections(s.ctx, client.CollectionFetchOptions{}) require.Nil(s.t, err) @@ -838,7 +796,7 @@ func refreshCollections( for _, collection := range allCollections { if index, ok := s.collectionIndexesByRoot[collection.Description().RootID]; ok { - s.collections[nodeID][index] = collection + node.collections[index] = collection } } } @@ -864,35 +822,23 @@ func configureNode( netNodeOpts := action() netNodeOpts = append(netNodeOpts, net.WithPrivateKey(privateKey)) - nodeOpts := []node.Option{node.WithDisableP2P(false), db.WithRetryInterval([]time.Duration{time.Millisecond * 1})} + nodeOpts := []node.Option{db.WithRetryInterval([]time.Duration{time.Millisecond * 1})} for _, opt := range netNodeOpts { nodeOpts = append(nodeOpts, opt) } nodeOpts = append(nodeOpts, db.WithNodeIdentity(getIdentity(s, NodeIdentity(len(s.nodes))))) - node, path, err := setupNode(s, nodeOpts...) //disable change detector, or allow it? + node, err := setupNode(s, nodeOpts...) //disable change detector, or allow it? require.NoError(s.t, err) - s.nodeAddresses = append(s.nodeAddresses, node.Peer.PeerInfo()) - s.nodeConfigs = append(s.nodeConfigs, netNodeOpts) - - c, err := setupClient(s, node) - require.NoError(s.t, err) - - eventState, err := newEventState(c.Events()) - require.NoError(s.t, err) - - s.nodes = append(s.nodes, c) - s.nodeEvents = append(s.nodeEvents, eventState) - s.nodeP2P = append(s.nodeP2P, newP2PState()) - s.dbPaths = append(s.dbPaths, path) + s.nodes = append(s.nodes, node) } func refreshDocuments( s *state, startActionIndex int, ) { - if len(s.collections) == 0 { + if len(s.nodes) == 0 { // This should only be possible at the moment for P2P testing, for which the // change detector is currently disabled. We'll likely need some fancier logic // here if/when we wish to enable it. @@ -902,9 +848,9 @@ func refreshDocuments( // For now just do the initial setup using the collections on the first node, // this may need to become more involved at a later date depending on testing // requirements. - s.docIDs = make([][]client.DocID, len(s.collections[0])) + s.docIDs = make([][]client.DocID, len(s.nodes[0].collections)) - for i := range s.collections[0] { + for i := range s.nodes[0].collections { s.docIDs[i] = []client.DocID{} } @@ -917,7 +863,7 @@ func refreshDocuments( // Just use the collection from the first relevant node, as all will be the same for this // purpose. firstNodesID := nodeIDs[0] - collection := s.collections[firstNodesID][action.CollectionID] + collection := s.nodes[firstNodesID].collections[action.CollectionID] if action.DocMap != nil { substituteRelations(s, action) @@ -939,16 +885,10 @@ func refreshDocuments( func refreshIndexes( s *state, ) { - if len(s.collections) == 0 { - return - } - - s.indexes = make([][][]client.IndexDescription, len(s.collections)) - - for i, nodeCols := range s.collections { - s.indexes[i] = make([][]client.IndexDescription, len(nodeCols)) + for _, node := range s.nodes { + node.indexes = make([][]client.IndexDescription, len(node.collections)) - for j, col := range nodeCols { + for i, col := range node.collections { if col == nil { continue } @@ -957,7 +897,7 @@ func refreshIndexes( continue } - s.indexes[i][j] = colIndexes + node.indexes[i] = colIndexes } } } @@ -966,7 +906,7 @@ func getIndexes( s *state, action GetIndexes, ) { - if len(s.collections) == 0 { + if len(s.nodes) == 0 { return } @@ -974,7 +914,7 @@ func getIndexes( nodeIDs, _ := getNodesWithIDs(action.NodeID, s.nodes) for _, nodeID := range nodeIDs { - collections := s.collections[nodeID] + collections := s.nodes[nodeID].collections err := withRetryOnNode( s.nodes[nodeID], func() error { @@ -1259,7 +1199,7 @@ func createDoc( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] err := withRetryOnNode( node, func() error { @@ -1449,7 +1389,7 @@ func deleteDoc( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] ctx := getContextWithIdentity(s.ctx, s, action.Identity, nodeID) err := withRetryOnNode( node, @@ -1493,7 +1433,7 @@ func updateDoc( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] err := withRetryOnNode( node, func() error { @@ -1596,7 +1536,7 @@ func updateWithFilter(s *state, action UpdateWithFilter) { nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] ctx := getContextWithIdentity(s.ctx, s, action.Identity, nodeID) err := withRetryOnNode( node, @@ -1621,18 +1561,10 @@ func createIndex( s *state, action CreateIndex, ) { - if action.CollectionID >= len(s.indexes) { - // Expand the slice if required, so that the index can be accessed by collection index - s.indexes = append( - s.indexes, - make([][][]client.IndexDescription, action.CollectionID-len(s.indexes)+1)..., - ) - } - nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] indexDesc := client.IndexDescription{ Name: action.IndexName, } @@ -1659,8 +1591,8 @@ func createIndex( if err != nil { return err } - s.indexes[nodeID][action.CollectionID] = append( - s.indexes[nodeID][action.CollectionID], + s.nodes[nodeID].indexes[action.CollectionID] = append( + s.nodes[nodeID].indexes[action.CollectionID], desc, ) return nil @@ -1684,10 +1616,10 @@ func dropIndex( nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collection := s.collections[nodeID][action.CollectionID] + collection := s.nodes[nodeID].collections[action.CollectionID] indexName := action.IndexName if indexName == "" { - indexName = s.indexes[nodeID][action.CollectionID][action.IndexID].Name + indexName = s.nodes[nodeID].indexes[action.CollectionID][action.IndexID].Name } err := withRetryOnNode( @@ -1764,7 +1696,7 @@ func backupImport( // about this in our tests so we just retry a few times until it works (or the // retry limit is breached - important incase this is a different error) func withRetryOnNode( - node clients.Client, + node client.DB, action func() error, ) error { for i := 0; i < node.MaxTxnRetries(); i++ {