Skip to content

Commit

Permalink
cmds: implement ipfs dht provide command
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <why@ipfs.io>
  • Loading branch information
whyrusleeping committed Aug 20, 2016
1 parent 7276fd8 commit eaa433f
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 0 deletions.
111 changes: 111 additions & 0 deletions core/commands/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var DhtCmd = &cmds.Command{
"findpeer": findPeerDhtCmd,
"get": getValueDhtCmd,
"put": putValueDhtCmd,
"provide": provideRefDhtCmd,
},
}

Expand Down Expand Up @@ -227,6 +228,116 @@ var findProvidersDhtCmd = &cmds.Command{
Type: notif.QueryEvent{},
}

var provideRefDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Announce to the network that you are providing given values.",
},

Arguments: []cmds.Argument{
cmds.StringArg("key", true, true, "The key to find providers for.").EnableStdin(),
},
Options: []cmds.Option{
cmds.BoolOption("verbose", "v", "Print extra information.").Default(false),
},
Run: func(req cmds.Request, res cmds.Response) {
n, err := req.InvocContext().GetNode()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

dht, ok := n.Routing.(*ipdht.IpfsDHT)
if !ok {
res.SetError(ErrNotDHT, cmds.ErrNormal)
return
}

var keys []key.Key
for _, arg := range req.Arguments() {
k := key.B58KeyDecode(arg)
if k == "" {
res.SetError(fmt.Errorf("incorrectly formatted key: ", arg), cmds.ErrNormal)
return
}

has, err := n.Blockstore.Has(k)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

if !has {
res.SetError(fmt.Errorf("block %s not found locally, cannot provide", k), cmds.ErrNormal)
return
}

keys = append(keys, k)
}

outChan := make(chan interface{})
res.SetOutput((<-chan interface{})(outChan))

events := make(chan *notif.QueryEvent)
ctx := notif.RegisterForQueryEvents(req.Context(), events)

go func() {
defer close(outChan)
for e := range events {
outChan <- e
}
}()

go func() {
defer close(outChan)
for _, k := range keys {
err := dht.Provide(ctx, k)
if err != nil {
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
})
return
}
}
}()
},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
outChan, ok := res.Output().(<-chan interface{})
if !ok {
return nil, u.ErrCast()
}

verbose, _, _ := res.Request().Option("v").Bool()
pfm := pfuncMap{
notif.FinalPeer: func(obj *notif.QueryEvent, out io.Writer, verbose bool) {
if verbose {
fmt.Fprintf(out, "sending provider record to peer %s\n", obj.ID)
}
},
}

marshal := func(v interface{}) (io.Reader, error) {
obj, ok := v.(*notif.QueryEvent)
if !ok {
return nil, u.ErrCast()
}

buf := new(bytes.Buffer)
printEvent(obj, buf, verbose, pfm)
return buf, nil
}

return &cmds.ChannelMarshaler{
Channel: outChan,
Marshaler: marshal,
Res: res,
}, nil
},
},
Type: notif.QueryEvent{},
}

var findPeerDhtCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "Query the DHT for all of the multiaddresses associated with a Peer ID.",
Expand Down
5 changes: 5 additions & 0 deletions routing/dht/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,10 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
go func(p peer.ID) {
defer wg.Done()
log.Debugf("putProvider(%s, %s)", key, p)
notif.PublishQueryEvent(ctx, &notif.QueryEvent{
Type: notif.FinalPeer,
ID: p,
})
err := dht.sendMessage(ctx, p, mes)
if err != nil {
log.Debug(err)
Expand All @@ -272,6 +276,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key key.Key) error {
wg.Wait()
return nil
}

func (dht *IpfsDHT) makeProvRecord(skey key.Key) (*pb.Message, error) {
pi := pstore.PeerInfo{
ID: dht.self,
Expand Down

0 comments on commit eaa433f

Please sign in to comment.