Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/getter) Add support for Non-inclusion proofs #2256

Merged
merged 17 commits into from
Jun 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ func (s *Service) getByCommitment(

namespacedShares, err := s.shareGetter.GetSharesByNamespace(ctx, header.DAH, nID)
if err != nil {
if errors.Is(err, share.ErrNamespaceNotFound) ||
errors.Is(err, share.ErrNotFound) {
if errors.Is(err, share.ErrNotFound) {
err = ErrBlobNotFound
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
}
return nil, nil, err
Expand Down
5 changes: 3 additions & 2 deletions share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ func TestService_GetSharesByNamespaceNotFound(t *testing.T) {
getter, root := GetterWithRandSquare(t, 1)
root.RowRoots = nil

_, err := getter.GetSharesByNamespace(context.Background(), root, namespace.RandomNamespace().Bytes())
assert.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := getter.GetSharesByNamespace(context.Background(), root, namespace.RandomNamespace().Bytes())
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())
}

func BenchmarkService_GetSharesByNamespace(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion share/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestCollectLeavesByNamespace_AbsentNamespaceId(t *testing.T) {
t.Cleanup(cancel)
bServ := mdutils.Bserv()

shares := RandShares(t, 16)
shares := RandShares(t, 1024)
Wondertan marked this conversation as resolved.
Show resolved Hide resolved

// set all shares to the same namespace id
nids, err := randomNids(5)
Expand Down
6 changes: 3 additions & 3 deletions share/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
var (
// ErrNotFound is used to indicate that requested data could not be found.
ErrNotFound = errors.New("share: data not found")
// ErrNamespaceNotFound is returned by GetSharesByNamespace when data for requested root does
// not include any shares from the given namespace
ErrNamespaceNotFound = errors.New("share: namespace not found in data")
)

// Getter interface provides a set of accessors for shares by the Root.
Expand All @@ -35,6 +32,9 @@ type Getter interface {

// GetSharesByNamespace gets all shares from an EDS within the given namespace.
// Shares are returned in a row-by-row order if the namespace spans multiple rows.
// Inclusion of returned data could be verified using Verify method on NamespacedShares.
// If no shares are found for target namespace non-inclusion could be also verified by calling
// Verify method.
GetSharesByNamespace(context.Context, *Root, namespace.ID) (NamespacedShares, error)
}

Expand Down
4 changes: 2 additions & 2 deletions share/getters/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func cascadeGetters[V any](
getCtx, cancel := ctxWithSplitTimeout(ctx, len(getters)-i, 0)
val, getErr := get(getCtx, getter)
cancel()
if getErr == nil || errors.Is(getErr, share.ErrNamespaceNotFound) {
return val, getErr
if getErr == nil {
return val, nil
}

if errors.Is(getErr, errOperationNotSupported) {
Expand Down
12 changes: 7 additions & 5 deletions share/getters/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ func TestStoreGetter(t *testing.T) {

// nid not found
nID = make([]byte, namespace.NamespaceSize)
_, err = sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := sg.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())

// root not found
root := share.Root{}
Expand Down Expand Up @@ -216,13 +217,14 @@ func TestIPLDGetter(t *testing.T) {
// nid not found
nID = make([]byte, namespace.NamespaceSize)
emptyShares, err := sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
require.NoError(t, err)
require.Nil(t, emptyShares)

// nid doesnt exist in root
root := share.Root{}
_, err = sg.GetSharesByNamespace(ctx, &root, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err = sg.GetSharesByNamespace(ctx, &root, nID)
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())
})
}

Expand Down
13 changes: 5 additions & 8 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
// verify that the namespace could exist inside the roots before starting network requests
roots := filterRootsByNamespace(root, id)
if len(roots) == 0 {
return nil, share.ErrNamespaceNotFound
return nil, nil
}

for {
Expand All @@ -218,18 +218,15 @@ func (sg *ShrexGetter) GetSharesByNamespace(
cancel()
switch {
case getErr == nil:
if getErr = nd.Verify(root, id); getErr != nil {
// both inclusion and non-inclusion cases needs verification
if verErr := nd.Verify(root, id); verErr != nil {
getErr = verErr
setStatus(peers.ResultBlacklistPeer)
break
}
setStatus(peers.ResultNoop)
sg.metrics.recordNDAttempt(ctx, attempt, true)
return nd, getErr
case errors.Is(getErr, share.ErrNamespaceNotFound):
// TODO: will be merged with first case once non-inclusion proofs are ready
setStatus(peers.ResultNoop)
sg.metrics.recordNDAttempt(ctx, attempt, true)
return nd, getErr
return nd, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
setStatus(peers.ResultCooldownPeer)
Expand Down
50 changes: 42 additions & 8 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ import (
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-app/pkg/namespace"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"
nmtnamespace "github.com/celestiaorg/nmt/namespace"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
Expand Down Expand Up @@ -88,23 +89,56 @@ func TestShrexGetter(t *testing.T) {
require.ErrorIs(t, err, share.ErrNotFound)
})

t.Run("ND_namespace_not_found", func(t *testing.T) {
t.Run("ND_namespace_not_included", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
eds, dah, nID := generateTestEDS(t)
eds, dah, maxNID := generateTestEDS(t)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

// corrupt NID
nID[4]++
nID := make([]byte, ipld.NamespaceSize)
copy(nID, maxNID)
nID[ipld.NamespaceSize-1]-- // pray for last byte to not be 0x00
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
// check for namespace to be between max and min namespace in root
require.Len(t, filterRootsByNamespace(&dah, maxNID), 1)

_, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
// no shares should be returned
require.Empty(t, emptyShares.Flatten())
require.Nil(t, emptyShares.Verify(&dah, nID))
})

t.Run("ND_namespace_not_in_dah", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
eds, dah, maxNID := generateTestEDS(t)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

// corrupt NID
nID := make([]byte, ipld.NamespaceSize)
copy(nID, maxNID)
nID[ipld.NamespaceSize-1]++ // pray for last byte to not be 0xFF
// check for namespace to be not in root
require.Len(t, filterRootsByNamespace(&dah, nID), 0)

emptyShares, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
// no shares should be returned
require.Empty(t, emptyShares.Flatten())
require.Nil(t, emptyShares.Verify(&dah, nID))
})

t.Run("EDS_Available", func(t *testing.T) {
Expand Down Expand Up @@ -166,8 +200,8 @@ func newStore(t *testing.T) (*eds.Store, error) {
func generateTestEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, da.DataAvailabilityHeader, nmtnamespace.ID) {
eds := share.RandEDS(t, 4)
dah := da.NewDataAvailabilityHeader(eds)
randNID := dah.RowRoots[(len(dah.RowRoots)-1)/2][:namespace.NamespaceSize]
return eds, dah, randNID
max := nmt.MaxNamespace(dah.RowRoots[(len(dah.RowRoots))/2-1], ipld.NamespaceSize)
renaynay marked this conversation as resolved.
Show resolved Hide resolved
return eds, dah, max
}

func testManager(
Expand Down
4 changes: 0 additions & 4 deletions share/getters/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ func (sg *StoreGetter) GetSharesByNamespace(
// wrap the read-only CAR blockstore in a getter
blockGetter := eds.NewBlockGetter(bs)
shares, err = collectSharesByNamespace(ctx, blockGetter, root, nID)
if errors.Is(err, ipld.ErrNodeNotFound) {
// convert error to satisfy getter interface contract
err = share.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("getter/store: failed to retrieve shares by namespace: %w", err)
}
Expand Down
8 changes: 1 addition & 7 deletions share/getters/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func collectSharesByNamespace(

rootCIDs := filterRootsByNamespace(root, nID)
if len(rootCIDs) == 0 {
return nil, share.ErrNamespaceNotFound
return nil, nil
}

errGroup, ctx := errgroup.WithContext(ctx)
Expand All @@ -83,12 +83,6 @@ func collectSharesByNamespace(
if err := errGroup.Wait(); err != nil {
return nil, err
}

// return ErrNamespaceNotFound if no shares are found for the namespace.ID
if len(rootCIDs) == 1 && len(shares[0].Shares) == 0 {
return nil, share.ErrNamespaceNotFound
}

return shares, nil
}

Expand Down
2 changes: 1 addition & 1 deletion share/ipld/namespace_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (n *NamespaceData) addProof(d direction, cid cid.Cid, depth int) {
// Leaves returns retrieved leaves within the bounds in case `WithLeaves` option was passed,
// otherwise nil will be returned.
func (n *NamespaceData) Leaves() []ipld.Node {
if n.leaves == nil || n.noLeaves() {
if n.leaves == nil || n.noLeaves() || n.isAbsentNamespace.Load() {
return nil
}
return n.leaves[n.bounds.lowest : n.bounds.highest+1]
Expand Down
50 changes: 30 additions & 20 deletions share/p2p/shrexnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *Client) RequestND(
) (share.NamespacedShares, error) {
shares, err := c.doRequest(ctx, root, nID, peer)
if err == nil {
return shares, err
return shares, nil
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
Expand All @@ -71,7 +71,7 @@ func (c *Client) RequestND(
return nil, context.DeadlineExceeded
}
}
if err != p2p.ErrNotFound && err != share.ErrNamespaceNotFound {
if err != p2p.ErrNotFound {
log.Warnw("client-nd: peer returned err", "err", err)
}
return nil, err
Expand Down Expand Up @@ -119,19 +119,11 @@ func (c *Client) doRequest(
return nil, fmt.Errorf("client-nd: reading response: %w", err)
}

if err = c.statusToErr(ctx, resp.Status); err != nil {
return nil, fmt.Errorf("client-nd: response code is not OK: %w", err)
}

shares, err := convertToNamespacedShares(resp.Rows)
if err != nil {
return nil, fmt.Errorf("client-nd: converting response to shares: %w", err)
}
return shares, nil
return c.convertResponse(ctx, resp)
}

// convertToNamespacedShares converts proto Rows to share.NamespacedShares
func convertToNamespacedShares(rows []*pb.Row) (share.NamespacedShares, error) {
func convertToNamespacedShares(rows []*pb.Row) share.NamespacedShares {
shares := make([]share.NamespacedRow, 0, len(rows))
for _, row := range rows {
var proof *nmt.Proof
Expand All @@ -150,7 +142,24 @@ func convertToNamespacedShares(rows []*pb.Row) (share.NamespacedShares, error) {
Proof: proof,
})
}
return shares, nil
return shares
}

func convertToNonInclusionProofs(rows []*pb.Row) share.NamespacedShares {
shares := make([]share.NamespacedRow, 0, len(rows))
for _, row := range rows {
proof := nmt.NewAbsenceProof(
int(row.Proof.Start),
int(row.Proof.End),
row.Proof.Nodes,
row.Proof.Hashleaf,
ipld.NMTIgnoreMaxNamespace,
)
shares = append(shares, share.NamespacedRow{
Proof: &proof,
})
}
return shares
}

func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream) {
Expand Down Expand Up @@ -181,22 +190,23 @@ func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream)
}
}

func (c *Client) statusToErr(ctx context.Context, code pb.StatusCode) error {
switch code {
func (c *Client) convertResponse(
ctx context.Context, resp pb.GetSharesByNamespaceResponse) (share.NamespacedShares, error) {
switch resp.Status {
case pb.StatusCode_OK:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess)
return nil
return convertToNamespacedShares(resp.Rows), nil
case pb.StatusCode_NAMESPACE_NOT_FOUND:
return convertToNonInclusionProofs(resp.Rows), nil
case pb.StatusCode_NOT_FOUND:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return p2p.ErrNotFound
case pb.StatusCode_NAMESPACE_NOT_FOUND:
return share.ErrNamespaceNotFound
return nil, p2p.ErrNotFound
case pb.StatusCode_INVALID:
log.Debug("client-nd: invalid request")
fallthrough
case pb.StatusCode_INTERNAL:
fallthrough
default:
return p2p.ErrInvalidResponse
return nil, p2p.ErrInvalidResponse
}
}
7 changes: 4 additions & 3 deletions share/p2p/shrexnd/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func TestExchange_RequestND_NotFound(t *testing.T) {
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))

randNID := dah.RowRoots[(len(dah.RowRoots)-1)/2][:namespace.NamespaceSize]
_, err := client.RequestND(ctx, &dah, randNID, server.host.ID())
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := client.RequestND(ctx, &dah, randNID, server.host.ID())
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())
})
}

Expand Down Expand Up @@ -119,7 +120,7 @@ func (m notFoundGetter) GetEDS(
func (m notFoundGetter) GetSharesByNamespace(
_ context.Context, _ *share.Root, _ nmtnamespace.ID,
) (share.NamespacedShares, error) {
return nil, share.ErrNamespaceNotFound
return nil, nil
}

func newStore(t *testing.T) *eds.Store {
Expand Down
Loading
Loading