Skip to content

Commit

Permalink
feat(share/getter) Add support for Non-inclusion proofs (#2256)
Browse files Browse the repository at this point in the history
## Overview

This PR implements support for non-inclusion proof for getters and shrex
protocol.
Requires #2242 to be
merged first
  • Loading branch information
walldiss committed Jun 22, 2023
1 parent b222475 commit f32c903
Show file tree
Hide file tree
Showing 16 changed files with 197 additions and 116 deletions.
3 changes: 1 addition & 2 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ func (s *Service) getByCommitment(

namespacedShares, err := s.shareGetter.GetSharesByNamespace(ctx, header.DAH, nID)
if err != nil {
if errors.Is(err, share.ErrNamespaceNotFound) ||
errors.Is(err, share.ErrNotFound) {
if errors.Is(err, share.ErrNotFound) {
err = ErrBlobNotFound
}
return nil, nil, err
Expand Down
5 changes: 3 additions & 2 deletions share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ func TestService_GetSharesByNamespaceNotFound(t *testing.T) {
getter, root := GetterWithRandSquare(t, 1)
root.RowRoots = nil

_, err := getter.GetSharesByNamespace(context.Background(), root, namespace.RandomNamespace().Bytes())
assert.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := getter.GetSharesByNamespace(context.Background(), root, namespace.RandomNamespace().Bytes())
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())
}

func BenchmarkService_GetSharesByNamespace(b *testing.B) {
Expand Down
2 changes: 1 addition & 1 deletion share/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestCollectLeavesByNamespace_AbsentNamespaceId(t *testing.T) {
t.Cleanup(cancel)
bServ := mdutils.Bserv()

shares := RandShares(t, 16)
shares := RandShares(t, 1024)

// set all shares to the same namespace id
nids, err := randomNids(5)
Expand Down
6 changes: 3 additions & 3 deletions share/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ import (
var (
// ErrNotFound is used to indicate that requested data could not be found.
ErrNotFound = errors.New("share: data not found")
// ErrNamespaceNotFound is returned by GetSharesByNamespace when data for requested root does
// not include any shares from the given namespace
ErrNamespaceNotFound = errors.New("share: namespace not found in data")
)

// Getter interface provides a set of accessors for shares by the Root.
Expand All @@ -35,6 +32,9 @@ type Getter interface {

// GetSharesByNamespace gets all shares from an EDS within the given namespace.
// Shares are returned in a row-by-row order if the namespace spans multiple rows.
// Inclusion of returned data could be verified using Verify method on NamespacedShares.
// If no shares are found for target namespace non-inclusion could be also verified by calling
// Verify method.
GetSharesByNamespace(context.Context, *Root, namespace.ID) (NamespacedShares, error)
}

Expand Down
4 changes: 2 additions & 2 deletions share/getters/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func cascadeGetters[V any](
getCtx, cancel := ctxWithSplitTimeout(ctx, len(getters)-i, 0)
val, getErr := get(getCtx, getter)
cancel()
if getErr == nil || errors.Is(getErr, share.ErrNamespaceNotFound) {
return val, getErr
if getErr == nil {
return val, nil
}

if errors.Is(getErr, errOperationNotSupported) {
Expand Down
12 changes: 7 additions & 5 deletions share/getters/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ func TestStoreGetter(t *testing.T) {

// nid not found
nID = make([]byte, namespace.NamespaceSize)
_, err = sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := sg.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())

// root not found
root := share.Root{}
Expand Down Expand Up @@ -216,13 +217,14 @@ func TestIPLDGetter(t *testing.T) {
// nid not found
nID = make([]byte, namespace.NamespaceSize)
emptyShares, err := sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
require.NoError(t, err)
require.Nil(t, emptyShares)

// nid doesnt exist in root
root := share.Root{}
_, err = sg.GetSharesByNamespace(ctx, &root, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err = sg.GetSharesByNamespace(ctx, &root, nID)
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())
})
}

Expand Down
13 changes: 5 additions & 8 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
// verify that the namespace could exist inside the roots before starting network requests
roots := filterRootsByNamespace(root, id)
if len(roots) == 0 {
return nil, share.ErrNamespaceNotFound
return nil, nil
}

for {
Expand All @@ -218,18 +218,15 @@ func (sg *ShrexGetter) GetSharesByNamespace(
cancel()
switch {
case getErr == nil:
if getErr = nd.Verify(root, id); getErr != nil {
// both inclusion and non-inclusion cases needs verification
if verErr := nd.Verify(root, id); verErr != nil {
getErr = verErr
setStatus(peers.ResultBlacklistPeer)
break
}
setStatus(peers.ResultNoop)
sg.metrics.recordNDAttempt(ctx, attempt, true)
return nd, getErr
case errors.Is(getErr, share.ErrNamespaceNotFound):
// TODO: will be merged with first case once non-inclusion proofs are ready
setStatus(peers.ResultNoop)
sg.metrics.recordNDAttempt(ctx, attempt, true)
return nd, getErr
return nd, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
setStatus(peers.ResultCooldownPeer)
Expand Down
50 changes: 42 additions & 8 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ import (
"github.com/stretchr/testify/require"

"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-app/pkg/namespace"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"
nmtnamespace "github.com/celestiaorg/nmt/namespace"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/ipld"
"github.com/celestiaorg/celestia-node/share/p2p/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
Expand Down Expand Up @@ -88,23 +89,56 @@ func TestShrexGetter(t *testing.T) {
require.ErrorIs(t, err, share.ErrNotFound)
})

t.Run("ND_namespace_not_found", func(t *testing.T) {
t.Run("ND_namespace_not_included", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
eds, dah, nID := generateTestEDS(t)
eds, dah, maxNID := generateTestEDS(t)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

// corrupt NID
nID[4]++
nID := make([]byte, ipld.NamespaceSize)
copy(nID, maxNID)
nID[ipld.NamespaceSize-1]-- // pray for last byte to not be 0x00
// check for namespace to be between max and min namespace in root
require.Len(t, filterRootsByNamespace(&dah, maxNID), 1)

_, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
// no shares should be returned
require.Empty(t, emptyShares.Flatten())
require.Nil(t, emptyShares.Verify(&dah, nID))
})

t.Run("ND_namespace_not_in_dah", func(t *testing.T) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
t.Cleanup(cancel)

// generate test data
eds, dah, maxNID := generateTestEDS(t)
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))
peerManager.Validate(ctx, srvHost.ID(), shrexsub.Notification{
DataHash: dah.Hash(),
Height: 1,
})

// corrupt NID
nID := make([]byte, ipld.NamespaceSize)
copy(nID, maxNID)
nID[ipld.NamespaceSize-1]++ // pray for last byte to not be 0xFF
// check for namespace to be not in root
require.Len(t, filterRootsByNamespace(&dah, nID), 0)

emptyShares, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
// no shares should be returned
require.Empty(t, emptyShares.Flatten())
require.Nil(t, emptyShares.Verify(&dah, nID))
})

t.Run("EDS_Available", func(t *testing.T) {
Expand Down Expand Up @@ -166,8 +200,8 @@ func newStore(t *testing.T) (*eds.Store, error) {
func generateTestEDS(t *testing.T) (*rsmt2d.ExtendedDataSquare, da.DataAvailabilityHeader, nmtnamespace.ID) {
eds := share.RandEDS(t, 4)
dah := da.NewDataAvailabilityHeader(eds)
randNID := dah.RowRoots[(len(dah.RowRoots)-1)/2][:namespace.NamespaceSize]
return eds, dah, randNID
max := nmt.MaxNamespace(dah.RowRoots[(len(dah.RowRoots))/2-1], ipld.NamespaceSize)
return eds, dah, max
}

func testManager(
Expand Down
4 changes: 0 additions & 4 deletions share/getters/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,6 @@ func (sg *StoreGetter) GetSharesByNamespace(
// wrap the read-only CAR blockstore in a getter
blockGetter := eds.NewBlockGetter(bs)
shares, err = collectSharesByNamespace(ctx, blockGetter, root, nID)
if errors.Is(err, ipld.ErrNodeNotFound) {
// convert error to satisfy getter interface contract
err = share.ErrNotFound
}
if err != nil {
return nil, fmt.Errorf("getter/store: failed to retrieve shares by namespace: %w", err)
}
Expand Down
8 changes: 1 addition & 7 deletions share/getters/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func collectSharesByNamespace(

rootCIDs := filterRootsByNamespace(root, nID)
if len(rootCIDs) == 0 {
return nil, share.ErrNamespaceNotFound
return nil, nil
}

errGroup, ctx := errgroup.WithContext(ctx)
Expand All @@ -83,12 +83,6 @@ func collectSharesByNamespace(
if err := errGroup.Wait(); err != nil {
return nil, err
}

// return ErrNamespaceNotFound if no shares are found for the namespace.ID
if len(rootCIDs) == 1 && len(shares[0].Shares) == 0 {
return nil, share.ErrNamespaceNotFound
}

return shares, nil
}

Expand Down
2 changes: 1 addition & 1 deletion share/ipld/namespace_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (n *NamespaceData) addProof(d direction, cid cid.Cid, depth int) {
// Leaves returns retrieved leaves within the bounds in case `WithLeaves` option was passed,
// otherwise nil will be returned.
func (n *NamespaceData) Leaves() []ipld.Node {
if n.leaves == nil || n.noLeaves() {
if n.leaves == nil || n.noLeaves() || n.isAbsentNamespace.Load() {
return nil
}
return n.leaves[n.bounds.lowest : n.bounds.highest+1]
Expand Down
50 changes: 30 additions & 20 deletions share/p2p/shrexnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (c *Client) RequestND(
) (share.NamespacedShares, error) {
shares, err := c.doRequest(ctx, root, nID, peer)
if err == nil {
return shares, err
return shares, nil
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
Expand All @@ -71,7 +71,7 @@ func (c *Client) RequestND(
return nil, context.DeadlineExceeded
}
}
if err != p2p.ErrNotFound && err != share.ErrNamespaceNotFound {
if err != p2p.ErrNotFound {
log.Warnw("client-nd: peer returned err", "err", err)
}
return nil, err
Expand Down Expand Up @@ -119,19 +119,11 @@ func (c *Client) doRequest(
return nil, fmt.Errorf("client-nd: reading response: %w", err)
}

if err = c.statusToErr(ctx, resp.Status); err != nil {
return nil, fmt.Errorf("client-nd: response code is not OK: %w", err)
}

shares, err := convertToNamespacedShares(resp.Rows)
if err != nil {
return nil, fmt.Errorf("client-nd: converting response to shares: %w", err)
}
return shares, nil
return c.convertResponse(ctx, resp)
}

// convertToNamespacedShares converts proto Rows to share.NamespacedShares
func convertToNamespacedShares(rows []*pb.Row) (share.NamespacedShares, error) {
func convertToNamespacedShares(rows []*pb.Row) share.NamespacedShares {
shares := make([]share.NamespacedRow, 0, len(rows))
for _, row := range rows {
var proof *nmt.Proof
Expand All @@ -150,7 +142,24 @@ func convertToNamespacedShares(rows []*pb.Row) (share.NamespacedShares, error) {
Proof: proof,
})
}
return shares, nil
return shares
}

func convertToNonInclusionProofs(rows []*pb.Row) share.NamespacedShares {
shares := make([]share.NamespacedRow, 0, len(rows))
for _, row := range rows {
proof := nmt.NewAbsenceProof(
int(row.Proof.Start),
int(row.Proof.End),
row.Proof.Nodes,
row.Proof.Hashleaf,
ipld.NMTIgnoreMaxNamespace,
)
shares = append(shares, share.NamespacedRow{
Proof: &proof,
})
}
return shares
}

func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream) {
Expand Down Expand Up @@ -181,22 +190,23 @@ func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream)
}
}

func (c *Client) statusToErr(ctx context.Context, code pb.StatusCode) error {
switch code {
func (c *Client) convertResponse(
ctx context.Context, resp pb.GetSharesByNamespaceResponse) (share.NamespacedShares, error) {
switch resp.Status {
case pb.StatusCode_OK:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess)
return nil
return convertToNamespacedShares(resp.Rows), nil
case pb.StatusCode_NAMESPACE_NOT_FOUND:
return convertToNonInclusionProofs(resp.Rows), nil
case pb.StatusCode_NOT_FOUND:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return p2p.ErrNotFound
case pb.StatusCode_NAMESPACE_NOT_FOUND:
return share.ErrNamespaceNotFound
return nil, p2p.ErrNotFound
case pb.StatusCode_INVALID:
log.Debug("client-nd: invalid request")
fallthrough
case pb.StatusCode_INTERNAL:
fallthrough
default:
return p2p.ErrInvalidResponse
return nil, p2p.ErrInvalidResponse
}
}
7 changes: 4 additions & 3 deletions share/p2p/shrexnd/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func TestExchange_RequestND_NotFound(t *testing.T) {
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))

randNID := dah.RowRoots[(len(dah.RowRoots)-1)/2][:namespace.NamespaceSize]
_, err := client.RequestND(ctx, &dah, randNID, server.host.ID())
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := client.RequestND(ctx, &dah, randNID, server.host.ID())
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())
})
}

Expand Down Expand Up @@ -119,7 +120,7 @@ func (m notFoundGetter) GetEDS(
func (m notFoundGetter) GetSharesByNamespace(
_ context.Context, _ *share.Root, _ nmtnamespace.ID,
) (share.NamespacedShares, error) {
return nil, share.ErrNamespaceNotFound
return nil, nil
}

func newStore(t *testing.T) *eds.Store {
Expand Down
Loading

0 comments on commit f32c903

Please sign in to comment.