Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commands/filestore: use new cmds lib #5673

Merged
merged 5 commits into from
Nov 6, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 104 additions & 119 deletions core/commands/filestore.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package commands

import (
"context"
"fmt"
"io"
"os"

oldCmds "github.com/ipfs/go-ipfs/commands"
lgc "github.com/ipfs/go-ipfs/commands/legacy"
"github.com/ipfs/go-ipfs/core"
core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
e "github.com/ipfs/go-ipfs/core/commands/e"
"github.com/ipfs/go-ipfs/filestore"
filestore "github.com/ipfs/go-ipfs/filestore"

cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
cmds "gx/ipfs/Qma6uuSyjkecGhMFFLfzyJDPyoDtNJSHJNweDccZhaWkgU/go-ipfs-cmds"
Expand All @@ -23,8 +21,8 @@ var FileStoreCmd = &cmds.Command{
},
Subcommands: map[string]*cmds.Command{
"ls": lsFileStore,
"verify": lgc.NewCommand(verifyFileStore),
"dups": lgc.NewCommand(dupsFileStore),
"verify": verifyFileStore,
"dups": dupsFileStore,
},
}

Expand Down Expand Up @@ -59,11 +57,7 @@ The output is:
}
args := req.Arguments
if len(args) > 0 {
out := perKeyActionToChan(req.Context, args, func(c cid.Cid) *filestore.ListRes {
return filestore.List(fs, c)
})

return res.Emit(out)
return listByArgs(res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
Expand All @@ -72,8 +66,17 @@ The output is:
return err
}

out := listResToChan(req.Context, next)
return res.Emit(out)
for {
r := next()
if r == nil {
break
}
if err := res.Emit(r); err != nil {
return err
}
}

return nil
},
PostRun: cmds.PostRunMap{
cmds.CLI: streamResult(func(v interface{}, out io.Writer) nonFatalError {
Expand All @@ -88,7 +91,7 @@ The output is:
Type: filestore.ListRes{},
}

var verifyFileStore = &oldCmds.Command{
var verifyFileStore = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "Verify objects in filestore.",
LongDescription: `
Expand Down Expand Up @@ -118,96 +121,103 @@ For ERROR entries the error will also be printed to stderr.
Options: []cmdkit.Option{
cmdkit.BoolOption(fileOrderOptionName, "verify the objects based on the order of the backing file"),
},
Run: func(req oldCmds.Request, res oldCmds.Response) {
_, fs, err := getFilestore(req.InvocContext())
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
_, fs, err := getFilestore(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
args := req.Arguments()
args := req.Arguments
if len(args) > 0 {
out := perKeyActionToChan(req.Context(), args, func(c cid.Cid) *filestore.ListRes {
return filestore.Verify(fs, c)
})
res.SetOutput(out)
} else {
fileOrder, _, _ := req.Option(fileOrderOptionName).Bool()
next, err := filestore.VerifyAll(fs, fileOrder)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return listByArgs(res, fs, args)
}

fileOrder, _ := req.Options[fileOrderOptionName].(bool)
next, err := filestore.VerifyAll(fs, fileOrder)
if err != nil {
return err
}

for {
r := next()
if r == nil {
break
}
if err := res.Emit(r); err != nil {
return err
}
out := listResToChan(req.Context(), next)
res.SetOutput(out)
}

return nil
},
Marshalers: oldCmds.MarshalerMap{
oldCmds.Text: func(res oldCmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output())
if err != nil {
return nil, err
}
PostRun: cmds.PostRunMap{
cmds.CLI: func(res cmds.Response, re cmds.ResponseEmitter) error {
for {
v, err := res.Next()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

r, ok := v.(*filestore.ListRes)
if !ok {
return nil, e.TypeErr(r, v)
}
list, ok := v.(*filestore.ListRes)
if !ok {
return e.TypeErr(list, v)
}

if r.Status == filestore.StatusOtherError {
fmt.Fprintf(res.Stderr(), "%s\n", r.ErrorMsg)
if list.Status == filestore.StatusOtherError {
fmt.Fprintf(os.Stderr, "%s\n", list.ErrorMsg)
}
fmt.Fprintf(os.Stdout, "%s %s\n", list.Status.Format(), list.FormatLong())
}
fmt.Fprintf(res.Stdout(), "%s %s\n", r.Status.Format(), r.FormatLong())
return nil, nil
},
},
Type: filestore.ListRes{},
}

var dupsFileStore = &oldCmds.Command{
var dupsFileStore = &cmds.Command{
Helptext: cmdkit.HelpText{
Tagline: "List blocks that are both in the filestore and standard block storage.",
},
Run: func(req oldCmds.Request, res oldCmds.Response) {
_, fs, err := getFilestore(req.InvocContext())
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
_, fs, err := getFilestore(env)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}
ch, err := fs.FileManager().AllKeysChan(req.Context())
ch, err := fs.FileManager().AllKeysChan(req.Context)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
return err
}

out := make(chan interface{}, 128)
res.SetOutput((<-chan interface{})(out))

go func() {
defer close(out)
for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
if err != nil {
select {
case out <- &RefWrapper{Err: err.Error()}:
case <-req.Context().Done():
}
return
}
if have {
select {
case out <- &RefWrapper{Ref: cid.String()}:
case <-req.Context().Done():
return
}
for cid := range ch {
have, err := fs.MainBlockstore().Has(cid)
if err != nil {
return res.Emit(&RefWrapper{Err: err.Error()})
}
if have {
if err := res.Emit(&RefWrapper{Ref: cid.String()}); err != nil {
return err
}
}
}()
}

return nil
},
Marshalers: refsMarshallerMap,
Type: RefWrapper{},
Encoders: cmds.EncoderMap{
cmds.Text: cmds.MakeTypedEncoder(func(req *cmds.Request, w io.Writer, out *RefWrapper) error {
if out.Err != "" {
return fmt.Errorf(out.Err)
}

fmt.Fprintln(w, out.Ref)

return nil
}),
},
Type: RefWrapper{},
}

func getFilestore(env interface{}) (*core.IpfsNode, *filestore.Filestore, error) {
func getFilestore(env cmds.Environment) (*core.IpfsNode, *filestore.Filestore, error) {
n, err := cmdenv.GetNode(env)
if err != nil {
return nil, nil, err
Expand All @@ -219,49 +229,24 @@ func getFilestore(env interface{}) (*core.IpfsNode, *filestore.Filestore, error)
return n, fs, err
}

func listResToChan(ctx context.Context, next func() *filestore.ListRes) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for {
r := next()
if r == nil {
return
func listByArgs(res cmds.ResponseEmitter, fs *filestore.Filestore, args []string) error {
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
ret := &filestore.ListRes{
Status: filestore.StatusOtherError,
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
}
select {
case out <- r:
case <-ctx.Done():
return
if err := res.Emit(ret); err != nil {
return err
}
continue
}
}()
return out
}

func perKeyActionToChan(ctx context.Context, args []string, action func(cid.Cid) *filestore.ListRes) <-chan interface{} {
out := make(chan interface{}, 128)
go func() {
defer close(out)
for _, arg := range args {
c, err := cid.Decode(arg)
if err != nil {
select {
case out <- &filestore.ListRes{
Status: filestore.StatusOtherError,
ErrorMsg: fmt.Sprintf("%s: %v", arg, err),
}:
case <-ctx.Done():
}

continue
}
r := action(c)
select {
case out <- r:
case <-ctx.Done():
return
}
r := filestore.Verify(fs, c)
if err := res.Emit(r); err != nil {
return err
}
}()
return out
}

return nil
}