Skip to content

Commit

Permalink
Add ACP to pubsub KMS
Browse files Browse the repository at this point in the history
  • Loading branch information
islamaliev committed Nov 4, 2024
1 parent 12669e4 commit 5998a2d
Show file tree
Hide file tree
Showing 17 changed files with 827 additions and 74 deletions.
9 changes: 9 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ type Merge struct {
SchemaRoot string
}

// MergeComplete is a notification that a merge has been completed.
type MergeComplete struct {
// Merge is the merge that was completed.
Merge Merge

// Decrypted specifies if the merge payload was decrypted.
Decrypted bool
}

// Message contains event info.
type Message struct {
// Name is the name of the event this message was generated from.
Expand Down
72 changes: 72 additions & 0 deletions internal/db/collection_retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package db

import (
"context"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/internal/db/description"
)

// CollectionRetriever is a helper struct that retrieves a collection from a document ID.
type CollectionRetriever struct {
db *db
}

// NewCollectionRetriever creates a new CollectionRetriever.
func NewCollectionRetriever(database client.DB) *CollectionRetriever {
internalDB, ok := database.(*db)
if !ok {
return nil
}
return &CollectionRetriever{
db: internalDB,
}
}

// RetrieveCollectionFromDocID retrieves a collection from a document ID.
func (r *CollectionRetriever) RetrieveCollectionFromDocID(
ctx context.Context,
docID string,
) (client.Collection, error) {
ctx, txn, err := ensureContextTxn(ctx, r.db, false)
if err != nil {
return nil, err
}
defer txn.Discard(ctx)

headIterator, err := NewHeadBlocksIteratorFromTxn(ctx, txn, docID)
if err != nil {
return nil, err
}

hasValue, err := headIterator.Next()
if err != nil {
return nil, err
}

if !hasValue {
return nil, NewErrDocIDNotFound(docID)
}

schema, err := description.GetSchemaVersion(ctx, txn, headIterator.CurrentBlock().Delta.GetSchemaVersionID())
if err != nil {
return nil, err
}

col, err := getCollectionFromRootSchema(ctx, r.db, schema.Root)
if err != nil {
return nil, err
}

return col, nil
}
32 changes: 32 additions & 0 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,41 @@ func (db *db) AddDocActorRelationship(
return client.AddDocActorRelationshipResult{}, err
}

err = db.publishDocUpdateEvent(ctx, docID, collection)
if err != nil {
return client.AddDocActorRelationshipResult{}, err
}

return client.AddDocActorRelationshipResult{ExistedAlready: exists}, nil
}

func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error {
headsIterator, err := NewHeadBlocksIterator(ctx, db.multistore.Headstore(), db.Blockstore(), docID)
if err != nil {
return err
}
defer headsIterator.Close()

Check failure on line 277 in internal/db/db.go

View workflow job for this annotation

GitHub Actions / Lint GoLang job

Error return value of `headsIterator.Close` is not checked (errcheck)

for {
hasValue, err := headsIterator.Next()
if err != nil {
return err
}
if !hasValue {
break
}

updateEvent := event.Update{
DocID: docID,
Cid: headsIterator.CurrentCid(),
SchemaRoot: collection.Schema().Root,
Block: headsIterator.CurrentRawBlock(),
}
db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
}
return nil
}

func (db *db) DeleteDocActorRelationship(
ctx context.Context,
collectionName string,
Expand Down
6 changes: 6 additions & 0 deletions internal/db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ const (
errColNotMaterialized string = "non-materialized collections are not supported"
errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP"
errInvalidDefaultFieldValue string = "default field value is invalid"
errDocIDNotFound string = "docID not found"
)

var (
Expand Down Expand Up @@ -152,6 +153,7 @@ var (
ErrContextDone = errors.New("context done")
ErrFailedToRetryDoc = errors.New("failed to retry doc")
ErrTimeoutDocRetry = errors.New("timeout while retrying doc")
ErrDocIDNotFound = errors.New(errDocIDNotFound)
)

// NewErrFailedToGetHeads returns a new error indicating that the heads of a document
Expand Down Expand Up @@ -690,3 +692,7 @@ func NewErrDefaultFieldValueInvalid(collection string, inner error) error {
errors.NewKV("Inner", inner),
)
}

func NewErrDocIDNotFound(docID string) error {
return errors.New(errDocIDNotFound, errors.NewKV("DocID", docID))
}
110 changes: 110 additions & 0 deletions internal/db/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package db

import (
"context"
"io"

"github.com/ipfs/go-cid"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
)

// DocHeadBlocksIterator is an iterator that iterates over the head blocks of a document.
type DocHeadBlocksIterator struct {
ctx context.Context
blockstore datastore.Blockstore
cids []cid.Cid

currentCid cid.Cid
currentBlock *coreblock.Block
currentRawBlock []byte
}

var _ io.Closer = (*DocHeadBlocksIterator)(nil)

func (h *DocHeadBlocksIterator) Close() error {
return nil
}

// NewHeadBlocksIterator creates a new DocHeadBlocksIterator.
func NewHeadBlocksIterator(
ctx context.Context,
headstore datastore.DSReaderWriter,
blockstore datastore.Blockstore,
docID string,
) (*DocHeadBlocksIterator, error) {
headStoreKey := core.HeadStoreKey{
DocID: docID,
FieldID: core.COMPOSITE_NAMESPACE,
}
headset := clock.NewHeadSet(headstore, headStoreKey)
cids, _, err := headset.List(ctx)
if err != nil {
return nil, err
}
return &DocHeadBlocksIterator{
ctx: ctx,
blockstore: blockstore,
cids: cids,
}, nil
}

// NewHeadBlocksIteratorFromTxn creates a new DocHeadBlocksIterator from a transaction.
func NewHeadBlocksIteratorFromTxn(
ctx context.Context,
txn datastore.Txn,
docID string,
) (*DocHeadBlocksIterator, error) {
return NewHeadBlocksIterator(ctx, txn.Headstore(), txn.Blockstore(), docID)
}

// Next advances the iterator to the next block.
func (h *DocHeadBlocksIterator) Next() (bool, error) {
if len(h.cids) == 0 {
return false, nil
}
nextCid := h.cids[0]
h.cids = h.cids[1:]

rawBlock, err := h.blockstore.Get(h.ctx, nextCid)
if err != nil {
return false, err
}
blk, err := coreblock.GetFromBytes(rawBlock.RawData())
if err != nil {
return false, err
}

h.currentCid = nextCid
h.currentBlock = blk
h.currentRawBlock = rawBlock.RawData()
return true, nil
}

// CurrentCid returns the CID of the current block.
func (h *DocHeadBlocksIterator) CurrentCid() cid.Cid {
return h.currentCid
}

// CurrentBlock returns the current block.
func (h *DocHeadBlocksIterator) CurrentBlock() *coreblock.Block {
return h.currentBlock
}

// CurrentRawBlock returns the raw data of the current block.
func (h *DocHeadBlocksIterator) CurrentRawBlock() []byte {
return h.currentRawBlock
}
11 changes: 9 additions & 2 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,10 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
}

// send a complete event so we can track merges in the integration tests
db.events.Publish(event.NewMessage(event.MergeCompleteName, dagMerge))
db.events.Publish(event.NewMessage(event.MergeCompleteName, event.MergeComplete{
Merge: dagMerge,
Decrypted: len(mp.missingEncryptionBlocks) == 0,
}))
return nil
}

Expand Down Expand Up @@ -263,7 +266,9 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err
return res.Error
}

clear(mp.missingEncryptionBlocks)
if len(res.Items) == 0 {
return nil
}

for i := range res.Items {
_, link, err := cid.CidFromBytes(res.Items[i].Link)
Expand All @@ -279,6 +284,8 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err
mp.availableEncryptionBlocks[cidlink.Link{Cid: link}] = &encBlock
}

clear(mp.missingEncryptionBlocks)

err := mp.mergeComposites(ctx)
if err != nil {
return err
Expand Down
27 changes: 12 additions & 15 deletions internal/db/p2p_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
)

Expand Down Expand Up @@ -645,41 +644,39 @@ func (db *db) retryDoc(ctx context.Context, docID string) error {
return err
}
defer txn.Discard(ctx)
headStoreKey := core.HeadStoreKey{
DocID: docID,
FieldID: core.COMPOSITE_NAMESPACE,
}
headset := clock.NewHeadSet(txn.Headstore(), headStoreKey)
cids, _, err := headset.List(ctx)

headsIterator, err := NewHeadBlocksIteratorFromTxn(ctx, txn, docID)
if err != nil {
return err
}
defer headsIterator.Close()

Check failure on line 652 in internal/db/p2p_replicator.go

View workflow job for this annotation

GitHub Actions / Lint GoLang job

Error return value of `headsIterator.Close` is not checked (errcheck)

for _, c := range cids {
for {
select {
case <-ctx.Done():
return ErrContextDone
default:
}
rawblk, err := txn.Blockstore().Get(ctx, c)

hasValue, err := headsIterator.Next()
if err != nil {
return err
}
blk, err := coreblock.GetFromBytes(rawblk.RawData())
if err != nil {
return err
if !hasValue {
break
}
schema, err := db.getSchemaByVersionID(ctx, blk.Delta.GetSchemaVersionID())

schema, err := db.getSchemaByVersionID(ctx, headsIterator.CurrentBlock().Delta.GetSchemaVersionID())
if err != nil {
return err
}
successChan := make(chan bool)
defer close(successChan)
updateEvent := event.Update{
DocID: docID,
Cid: c,
Cid: headsIterator.CurrentCid(),
SchemaRoot: schema.Root,
Block: rawblk.RawData(),
Block: headsIterator.CurrentRawBlock(),
IsRetry: true,
// Because the retry is done in a separate goroutine but the retry handling process should be synchronous,
// we use a channel to block while waiting for the success status of the retry.
Expand Down
4 changes: 2 additions & 2 deletions internal/db/permission/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func CheckAccessOfDocOnCollectionWithACP(

// Now that we know acp is available and the collection is permissioned, before checking access with
// acp directly we need to make sure that the document is not public, as public documents will not
// be regestered with acp. We give unrestricted access to public documents, so it does not matter
// be registered with acp. We give unrestricted access to public documents, so it does not matter
// whether the request has a signature identity or not at this stage of the check.
isRegistered, err := acpSystem.IsDocRegistered(
ctx,
Expand All @@ -69,7 +69,7 @@ func CheckAccessOfDocOnCollectionWithACP(

// At this point if the request is not signatured, then it has no access, because:
// the collection has a policy on it, and the acp is enabled/available,
// and the document is not public (is regestered with acp).
// and the document is not public (is registered with acp).
if !identity.HasValue() {
return false, nil
}
Expand Down
Loading

0 comments on commit 5998a2d

Please sign in to comment.