Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dht mode toggling (modulo dynamic switching) #350

Merged
merged 10 commits into from
Jun 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 122 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -44,6 +45,13 @@ var logger = logging.Logger("dht")
// collect members of the routing table.
const NumBootstrapQueries = 5

type DHTMode int

const (
ModeServer = DHTMode(1)
ModeClient = DHTMode(2)
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand All @@ -70,6 +78,9 @@ type IpfsDHT struct {

protocols []protocol.ID // DHT protocols

mode DHTMode
modeLk sync.Mutex

bucketSize int
}

Expand All @@ -95,6 +106,8 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

go dht.handleProtocolChanges(ctx)

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
Expand All @@ -103,10 +116,11 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.mode = ModeClient

if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
if err := dht.moveToServerMode(); err != nil {
return nil, err
}
}
return dht, nil
Expand Down Expand Up @@ -446,6 +460,61 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
return filtered
}

func (dht *IpfsDHT) SetMode(m DHTMode) error {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()

if m == dht.mode {
return nil
}

switch m {
case ModeServer:
return dht.moveToServerMode()
case ModeClient:
return dht.moveToClientMode()
default:
raulk marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("unrecognized dht mode: %d", m)
}
}

func (dht *IpfsDHT) moveToServerMode() error {
dht.mode = ModeServer
for _, p := range dht.protocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
}
return nil
}

func (dht *IpfsDHT) moveToClientMode() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine but I'd feel safer if we checked if we were in client mode in the stream handler, just in case.

dht.mode = ModeClient
for _, p := range dht.protocols {
dht.host.RemoveStreamHandler(p)
}

pset := make(map[protocol.ID]bool)
for _, p := range dht.protocols {
pset[p] = true
}

for _, c := range dht.host.Network().Conns() {
for _, s := range c.GetStreams() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: backwards-compatibility. This is where the tricky part comes in. This drops streams for peers in our routing table. It will not invite the peer to drop us from theirs. So they’ll keep querying us, and we’ll “na” all their negotiations. We’ll basically become an unhelpful peer taking up a slot in their table, unless we disconnect to trigger the removal.

On a related note, there seems to be a race between identify and the DHT notifee. Even if we disconnect and reconnect, if the DHT notifee runs before identify finishes, we might be deciding on stale protocols: https://github.com/libp2p/go-libp2p-kad-dht/blob/master/notif.go#L27

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... the backwards compatibility bit kinda sucks. I don't know that a nice backwards compatible solution exists aside from hard disconnecting from those peers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd much rather track streams manually.

if pset[s.Protocol()] {
if s.Stat().Direction == network.DirInbound {
s.Reset()
}
}
}
}
return nil
}

func (dht *IpfsDHT) getMode() DHTMode {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()
return dht.mode
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
Expand Down Expand Up @@ -518,3 +587,54 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta
) // ignoring error as it is unrelated to the actual function of this code.
return ctx
}

func (dht *IpfsDHT) handleProtocolChanges(ctx context.Context) {
// register for event bus protocol ID changes
sub, err := dht.host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated))
if err != nil {
panic(err)
}
defer sub.Close()

pmap := make(map[protocol.ID]bool)
for _, p := range dht.protocols {
pmap[p] = true
}

for {
select {
case ie, ok := <-sub.Out():
e, ok := ie.(event.EvtPeerProtocolsUpdated)
if !ok {
logger.Errorf("got wrong type from subscription: %T", ie)
return
}

if !ok {
return
}
var drop, add bool
for _, p := range e.Added {
if pmap[p] {
add = true
}
}
for _, p := range e.Removed {
if pmap[p] {
drop = true
}
}

if add && drop {
// TODO: discuss how to handle this case
logger.Warning("peer adding and dropping dht protocols? odd")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to figure out what to do here, @Kubuxu and @raulk any thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposal: "Peer is bad, we don't want them in our routing table"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can either noop (since both operations cancel each other out), or discard the peer altogether.

} else if add {
dht.RoutingTable().Update(e.Peer)
} else if drop {
dht.RoutingTable().Remove(e.Peer)
}
case <-ctx.Done():
return
}
}
}
18 changes: 18 additions & 0 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
msmux "github.com/multiformats/go-multistream"

ggio "github.com/gogo/protobuf/io"

Expand Down Expand Up @@ -80,6 +81,11 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
defer timer.Stop()

for {
if dht.getMode() != ModeServer {
logger.Errorf("ignoring incoming dht message while not in server mode")
return false
}

var req pb.Message
msgbytes, err := r.ReadMsg()
if err != nil {
Expand Down Expand Up @@ -166,6 +172,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
raulk marked this conversation as resolved.
Show resolved Hide resolved
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentRequestErrors.M(1))
return nil, err
}
Expand All @@ -174,6 +183,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil {
if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentRequestErrors.M(1))
return nil, err
}
Expand All @@ -200,11 +212,17 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentMessageErrors.M(1))
return err
}

if err := ms.SendMessage(ctx, pmes); err != nil {
if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentMessageErrors.M(1))
return err
}
Expand Down
19 changes: 19 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,3 +1387,22 @@ func TestClientModeAtInit(t *testing.T) {
err := pinger.Ping(context.Background(), client.PeerID())
assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
}

func TestModeChange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clientOnly := setupDHT(ctx, t, true)
clientToServer := setupDHT(ctx, t, true)
clientOnly.Host().Peerstore().AddAddrs(clientToServer.PeerID(), clientToServer.Host().Addrs(), peerstore.AddressTTL)
err := clientOnly.Ping(ctx, clientToServer.PeerID())
assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
err = clientToServer.SetMode(ModeServer)
assert.Nil(t, err)
err = clientOnly.Ping(ctx, clientToServer.PeerID())
assert.Nil(t, err)
err = clientToServer.SetMode(ModeClient)
assert.Nil(t, err)
err = clientOnly.Ping(ctx, clientToServer.PeerID())
assert.NotNil(t, err)
}
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ require (
github.com/ipfs/go-log v0.0.1
github.com/ipfs/go-todocounter v0.0.1
github.com/jbenet/goprocess v0.1.3
github.com/libp2p/go-libp2p v0.1.0
github.com/libp2p/go-libp2p v0.2.0
github.com/libp2p/go-libp2p-circuit v0.1.0
github.com/libp2p/go-libp2p-core v0.0.1
github.com/libp2p/go-libp2p-core v0.0.6
github.com/libp2p/go-libp2p-kbucket v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.1.0
github.com/libp2p/go-libp2p-peerstore v0.1.1
github.com/libp2p/go-libp2p-record v0.1.0
github.com/libp2p/go-libp2p-routing v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0
github.com/libp2p/go-libp2p-testing v0.0.3
github.com/libp2p/go-libp2p-testing v0.0.4
github.com/libp2p/go-msgio v0.0.4
github.com/mr-tron/base58 v1.1.2
github.com/multiformats/go-multiaddr v0.0.4
Expand Down
Loading