Skip to content

Commit

Permalink
remove share.ErrNamespaceNotFound
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Jun 21, 2023
1 parent 2046250 commit 11c71db
Show file tree
Hide file tree
Showing 12 changed files with 56 additions and 83 deletions.
3 changes: 1 addition & 2 deletions blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,7 @@ func (s *Service) getByCommitment(

namespacedShares, err := s.shareGetter.GetSharesByNamespace(ctx, header.DAH, nID)
if err != nil {
if errors.Is(err, share.ErrNamespaceNotFound) ||
errors.Is(err, share.ErrNotFound) {
if errors.Is(err, share.ErrNotFound) {
err = ErrBlobNotFound
}
return nil, nil, err
Expand Down
5 changes: 3 additions & 2 deletions share/availability/light/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,9 @@ func TestService_GetSharesByNamespaceNotFound(t *testing.T) {
getter, root := GetterWithRandSquare(t, 1)
root.RowRoots = nil

_, err := getter.GetSharesByNamespace(context.Background(), root, namespace.RandomNamespace().Bytes())
assert.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := getter.GetSharesByNamespace(context.Background(), root, namespace.RandomNamespace().Bytes())
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())
}

func BenchmarkService_GetSharesByNamespace(b *testing.B) {
Expand Down
12 changes: 3 additions & 9 deletions share/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ import (
var (
// ErrNotFound is used to indicate that requested data could not be found.
ErrNotFound = errors.New("share: data not found")
// ErrNamespaceNotFound is returned by GetSharesByNamespace when data for the requested root does
// not include any shares from the given namespace. If the target namespace belongs to the
// namespace range of any rows of the requested root, the error will be returned along with
// collected non-inclusion proofs for those rows. It is the obligation of the requester to verify
// non-inclusion proofs for all rows that could possibly contain the target namespace by calling
// the Verify method.
ErrNamespaceNotFound = errors.New("share: namespace not found in data")
)

// Getter interface provides a set of accessors for shares by the Root.
Expand All @@ -39,8 +32,9 @@ type Getter interface {

// GetSharesByNamespace gets all shares from an EDS within the given namespace.
// Shares are returned in a row-by-row order if the namespace spans multiple rows.
// If data for the requested root does not include any shares from the given namespace
// ErrNamespaceNotFound will be returned along with non-inclusion proofs if there are any.
// Inclusion of returned data could be verified using Verify method on NamespacedShares.
// If no shares are found for target namespace non-inclusion could be also verified by calling
// Verify method.
GetSharesByNamespace(context.Context, *Root, namespace.ID) (NamespacedShares, error)
}

Expand Down
2 changes: 1 addition & 1 deletion share/getters/cascade.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func cascadeGetters[V any](
getCtx, cancel := ctxWithSplitTimeout(ctx, len(getters)-i, 0)
val, getErr := get(getCtx, getter)
cancel()
if getErr == nil || errors.Is(getErr, share.ErrNamespaceNotFound) {
if getErr == nil {
return val, getErr
}

Expand Down
12 changes: 7 additions & 5 deletions share/getters/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,9 @@ func TestStoreGetter(t *testing.T) {

// nid not found
nID = make([]byte, namespace.NamespaceSize)
_, err = sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := sg.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())

// root not found
root := share.Root{}
Expand Down Expand Up @@ -216,13 +217,14 @@ func TestIPLDGetter(t *testing.T) {
// nid not found
nID = make([]byte, namespace.NamespaceSize)
emptyShares, err := sg.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
require.NoError(t, err)
require.Nil(t, emptyShares)

// nid doesnt exist in root
root := share.Root{}
_, err = sg.GetSharesByNamespace(ctx, &root, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err = sg.GetSharesByNamespace(ctx, &root, nID)
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())
})
}

Expand Down
7 changes: 4 additions & 3 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (sg *ShrexGetter) GetSharesByNamespace(
// verify that the namespace could exist inside the roots before starting network requests
roots := filterRootsByNamespace(root, id)
if len(roots) == 0 {
return nil, share.ErrNamespaceNotFound
return nil, nil
}

for {
Expand All @@ -217,15 +217,16 @@ func (sg *ShrexGetter) GetSharesByNamespace(
nd, getErr := sg.ndClient.RequestND(reqCtx, root, id, peer)
cancel()
switch {
case getErr == nil, errors.Is(getErr, share.ErrNamespaceNotFound):
case getErr == nil:
// both inclusion and non-inclusion cases needs verification
if verErr := nd.Verify(root, id); verErr != nil {
getErr = verErr
setStatus(peers.ResultBlacklistPeer)
break
}
setStatus(peers.ResultNoop)
sg.metrics.recordNDAttempt(ctx, attempt, true)
return nd, getErr
return nd, nil
case errors.Is(getErr, context.DeadlineExceeded),
errors.Is(getErr, context.Canceled):
setStatus(peers.ResultCooldownPeer)
Expand Down
16 changes: 8 additions & 8 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func TestShrexGetter(t *testing.T) {
// check for namespace to be between max and min namespace in root
require.Len(t, filterRootsByNamespace(&dah, maxNID), 1)

sh, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
// no shares should be returned
require.Len(t, sh.Flatten(), 0)
require.Nil(t, sh.Verify(&dah, nID))
require.Empty(t, emptyShares.Flatten())
require.Nil(t, emptyShares.Verify(&dah, nID))
})

t.Run("ND_namespace_not_in_root", func(t *testing.T) {
Expand All @@ -134,11 +134,11 @@ func TestShrexGetter(t *testing.T) {
// check for namespace to be not in root
require.Len(t, filterRootsByNamespace(&dah, nID), 0)

sh, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := getter.GetSharesByNamespace(ctx, &dah, nID)
require.NoError(t, err)
// no shares should be returned
require.Len(t, sh.Flatten(), 0)
require.Nil(t, sh.Verify(&dah, nID))
require.Empty(t, emptyShares.Flatten())
require.Nil(t, emptyShares.Verify(&dah, nID))
})

t.Run("EDS_Available", func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions share/getters/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ func (sg *StoreGetter) GetSharesByNamespace(
// wrap the read-only CAR blockstore in a getter
blockGetter := eds.NewBlockGetter(bs)
shares, err = collectSharesByNamespace(ctx, blockGetter, root, nID)
if err != nil && !errors.Is(err, share.ErrNamespaceNotFound) {
if err != nil {
return nil, fmt.Errorf("getter/store: failed to retrieve shares by namespace: %w", err)
}

return shares, err
return shares, nil
}
9 changes: 1 addition & 8 deletions share/getters/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func collectSharesByNamespace(

rootCIDs := filterRootsByNamespace(root, nID)
if len(rootCIDs) == 0 {
return nil, share.ErrNamespaceNotFound
return nil, nil
}

errGroup, ctx := errgroup.WithContext(ctx)
Expand All @@ -83,13 +83,6 @@ func collectSharesByNamespace(
if err := errGroup.Wait(); err != nil {
return nil, err
}

// return ErrNamespaceNotFound along with collected proofs if no shares are found for the
// namespace.ID
if len(rootCIDs) == 1 && len(shares[0].Shares) == 0 {
return shares, share.ErrNamespaceNotFound
}

return shares, nil
}

Expand Down
38 changes: 13 additions & 25 deletions share/p2p/shrexnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (c *Client) RequestND(
peer peer.ID,
) (share.NamespacedShares, error) {
shares, err := c.doRequest(ctx, root, nID, peer)
if err == nil || errors.Is(err, share.ErrNamespaceNotFound) {
return shares, err
if err == nil {
return shares, nil
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout)
Expand Down Expand Up @@ -119,24 +119,11 @@ func (c *Client) doRequest(
return nil, fmt.Errorf("client-nd: reading response: %w", err)
}

err = c.statusToErr(ctx, resp.Status)
if errors.Is(err, share.ErrNamespaceNotFound) {
shares := convertToNonInclusionProofs(resp.Rows)
return shares, err
}
if err != nil {
return nil, fmt.Errorf("client-nd: response code is not OK: %w", err)
}

shares, err := convertToNamespacedShares(resp.Rows)
if err != nil {
return nil, fmt.Errorf("client-nd: converting response to shares: %w", err)
}
return shares, nil
return c.convertResponse(ctx, resp)
}

// convertToNamespacedShares converts proto Rows to share.NamespacedShares
func convertToNamespacedShares(rows []*pb.Row) (share.NamespacedShares, error) {
func convertToNamespacedShares(rows []*pb.Row) share.NamespacedShares {
shares := make([]share.NamespacedRow, 0, len(rows))
for _, row := range rows {
var proof *nmt.Proof
Expand All @@ -155,7 +142,7 @@ func convertToNamespacedShares(rows []*pb.Row) (share.NamespacedShares, error) {
Proof: proof,
})
}
return shares, nil
return shares
}

func convertToNonInclusionProofs(rows []*pb.Row) share.NamespacedShares {
Expand Down Expand Up @@ -203,22 +190,23 @@ func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream)
}
}

func (c *Client) statusToErr(ctx context.Context, code pb.StatusCode) error {
switch code {
func (c *Client) convertResponse(
ctx context.Context, resp pb.GetSharesByNamespaceResponse) (share.NamespacedShares, error) {
switch resp.Status {
case pb.StatusCode_OK:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess)
return nil
return convertToNamespacedShares(resp.Rows), nil
case pb.StatusCode_NAMESPACE_NOT_FOUND:
return convertToNonInclusionProofs(resp.Rows), nil
case pb.StatusCode_NOT_FOUND:
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
return p2p.ErrNotFound
case pb.StatusCode_NAMESPACE_NOT_FOUND:
return share.ErrNamespaceNotFound
return nil, p2p.ErrNotFound
case pb.StatusCode_INVALID:
log.Debug("client-nd: invalid request")
fallthrough
case pb.StatusCode_INTERNAL:
fallthrough
default:
return p2p.ErrInvalidResponse
return nil, p2p.ErrInvalidResponse
}
}
7 changes: 4 additions & 3 deletions share/p2p/shrexnd/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func TestExchange_RequestND_NotFound(t *testing.T) {
require.NoError(t, edsStore.Put(ctx, dah.Hash(), eds))

randNID := dah.RowRoots[(len(dah.RowRoots)-1)/2][:namespace.NamespaceSize]
_, err := client.RequestND(ctx, &dah, randNID, server.host.ID())
require.ErrorIs(t, err, share.ErrNamespaceNotFound)
emptyShares, err := client.RequestND(ctx, &dah, randNID, server.host.ID())
require.NoError(t, err)
require.Empty(t, emptyShares.Flatten())
})
}

Expand Down Expand Up @@ -119,7 +120,7 @@ func (m notFoundGetter) GetEDS(
func (m notFoundGetter) GetSharesByNamespace(
_ context.Context, _ *share.Root, _ nmtnamespace.ID,
) (share.NamespacedShares, error) {
return nil, share.ErrNamespaceNotFound
return nil, nil
}

func newStore(t *testing.T) *eds.Store {
Expand Down
24 changes: 9 additions & 15 deletions share/p2p/shrexnd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,13 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre
logger.Warn("server: nd not found")
srv.respondNotFoundError(ctx, logger, stream)
return
case errors.Is(err, share.ErrNamespaceNotFound):
resp := namespacedSharesToResponse(shares, false)
srv.respond(ctx, logger, stream, resp)
return
case err != nil:
logger.Errorw("server: retrieving shares", "err", err)
srv.respondInternalError(ctx, logger, stream)
return
}

resp := namespacedSharesToResponse(shares, true)
resp := namespacedSharesToResponse(shares)
srv.respond(ctx, logger, stream, resp)
}

Expand Down Expand Up @@ -181,26 +177,24 @@ func (srv *Server) respondInternalError(ctx context.Context,
}

// namespacedSharesToResponse encodes shares into proto and sends it to client with OK status code
func namespacedSharesToResponse(shares share.NamespacedShares, found bool) *pb.GetSharesByNamespaceResponse {
func namespacedSharesToResponse(shares share.NamespacedShares) *pb.GetSharesByNamespaceResponse {
rows := make([]*pb.Row, 0, len(shares))
for _, row := range shares {
proof := &pb.Proof{
Start: int64(row.Proof.Start()),
End: int64(row.Proof.End()),
Nodes: row.Proof.Nodes(),
Hashleaf: row.Proof.LeafHash(),
}

row := &pb.Row{
Shares: row.Shares,
Proof: proof,
Proof: &pb.Proof{
Start: int64(row.Proof.Start()),
End: int64(row.Proof.End()),
Nodes: row.Proof.Nodes(),
Hashleaf: row.Proof.LeafHash(),
},
}

rows = append(rows, row)
}

status := pb.StatusCode_OK
if !found {
if len(shares) == 0 || (len(shares) == 1 && len(shares[0].Shares) == 0) {
status = pb.StatusCode_NAMESPACE_NOT_FOUND
}

Expand Down

0 comments on commit 11c71db

Please sign in to comment.