Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dht mode toggling (modulo dynamic switching) #350

Merged
merged 10 commits into from
Jun 26, 2019
116 changes: 116 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/event"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -41,6 +42,13 @@ var logger = logging.Logger("dht")
// collect members of the routing table.
const NumBootstrapQueries = 5

type DHTMode int

const (
ModeServer = DHTMode(1)
ModeClient = DHTMode(2)
)

// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
Expand All @@ -66,6 +74,9 @@ type IpfsDHT struct {
plk sync.Mutex

protocols []protocol.ID // DHT protocols

mode DHTMode
modeLk sync.Mutex
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand All @@ -89,6 +100,8 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))

go dht.handleProtocolChanges(ctx)

dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
// remove ourselves from network notifs.
dht.host.Network().StopNotify((*netNotifiee)(dht))
Expand All @@ -97,8 +110,10 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.mode = ModeClient

if !cfg.Client {
dht.mode = ModeServer
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just call modeToServer?

for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
}
Expand Down Expand Up @@ -365,6 +380,61 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
return filtered
}

func (dht *IpfsDHT) SetMode(m DHTMode) error {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()

if m == dht.mode {
return nil
}

switch m {
case ModeServer:
return dht.moveToServerMode()
case ModeClient:
return dht.moveToClientMode()
default:
raulk marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("unrecognized dht mode: %d", m)
}
}

func (dht *IpfsDHT) moveToServerMode() error {
dht.mode = ModeServer
for _, p := range dht.protocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
}
return nil
}

func (dht *IpfsDHT) moveToClientMode() error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be fine but I'd feel safer if we checked if we were in client mode in the stream handler, just in case.

dht.mode = ModeClient
for _, p := range dht.protocols {
dht.host.RemoveStreamHandler(p)
}

pset := make(map[protocol.ID]bool)
for _, p := range dht.protocols {
pset[p] = true
}

for _, c := range dht.host.Network().Conns() {
for _, s := range c.GetStreams() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: backwards-compatibility. This is where the tricky part comes in. This drops streams for peers in our routing table. It will not invite the peer to drop us from theirs. So they’ll keep querying us, and we’ll “na” all their negotiations. We’ll basically become an unhelpful peer taking up a slot in their table, unless we disconnect to trigger the removal.

On a related note, there seems to be a race between identify and the DHT notifee. Even if we disconnect and reconnect, if the DHT notifee runs before identify finishes, we might be deciding on stale protocols: https://github.com/libp2p/go-libp2p-kad-dht/blob/master/notif.go#L27

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah... the backwards compatibility bit kinda sucks. I don't know that a nice backwards compatible solution exists aside from hard disconnecting from those peers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd much rather track streams manually.

if pset[s.Protocol()] {
if s.Stat().Direction == network.DirInbound {
s.Reset()
}
}
}
}
return nil
}

func (dht *IpfsDHT) getMode() DHTMode {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()
return dht.mode
}

// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
Expand Down Expand Up @@ -437,3 +507,49 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta
) // ignoring error as it is unrelated to the actual function of this code.
return ctx
}

func (dht *IpfsDHT) handleProtocolChanges(ctx context.Context) {
// register for event bus protocol ID changes
ch := make(chan event.EvtPeerProtocolsUpdated, 8)
cancel, err := dht.host.EventBus().Subscribe(ch)
if err != nil {
panic(err)
}
defer cancel()

pmap := make(map[protocol.ID]bool)
for _, p := range dht.protocols {
pmap[p] = true
}

for {
select {
case e, ok := <-ch:
if !ok {
return
}
var drop, add bool
for _, p := range e.Added {
if pmap[p] {
add = true
}
}
for _, p := range e.Removed {
if pmap[p] {
drop = true
}
}

if add && drop {
// TODO: discuss how to handle this case
logger.Warning("peer adding and dropping dht protocols? odd")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to figure out what to do here, @Kubuxu and @raulk any thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposal: "Peer is bad, we don't want them in our routing table"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can either noop (since both operations cancel each other out), or discard the peer altogether.

} else if add {
dht.RoutingTable().Update(e.Peer)
} else if drop {
dht.RoutingTable().Remove(e.Peer)
}
case <-ctx.Done():
return
}
}
}
18 changes: 18 additions & 0 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
msmux "github.com/multiformats/go-multistream"

ggio "github.com/gogo/protobuf/io"

Expand Down Expand Up @@ -79,6 +80,11 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
defer timer.Stop()

for {
if dht.getMode() != ModeServer {
logger.Errorf("ignoring incoming dht message while not in server mode")
return false
}

var req pb.Message
switch err := r.ReadMsg(&req); err {
case io.EOF:
Expand Down Expand Up @@ -153,6 +159,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
raulk marked this conversation as resolved.
Show resolved Hide resolved
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentRequestErrors.M(1))
return nil, err
}
Expand All @@ -161,6 +170,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message

rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil {
if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentRequestErrors.M(1))
return nil, err
}
Expand All @@ -187,11 +199,17 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentMessageErrors.M(1))
return err
}

if err := ms.SendMessage(ctx, pmes); err != nil {
if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p)
}
stats.Record(ctx, metrics.SentMessageErrors.M(1))
return err
}
Expand Down
19 changes: 19 additions & 0 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1387,3 +1387,22 @@ func TestClientModeAtInit(t *testing.T) {
err := pinger.Ping(context.Background(), client.PeerID())
assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
}

func TestModeChange(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clientOnly := setupDHT(ctx, t, true)
clientToServer := setupDHT(ctx, t, true)
clientOnly.Host().Peerstore().AddAddrs(clientToServer.PeerID(), clientToServer.Host().Addrs(), peerstore.AddressTTL)
err := clientOnly.Ping(ctx, clientToServer.PeerID())
assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
err = clientToServer.SetMode(ModeServer)
assert.Nil(t, err)
err = clientOnly.Ping(ctx, clientToServer.PeerID())
assert.Nil(t, err)
err = clientToServer.SetMode(ModeClient)
assert.Nil(t, err)
err = clientOnly.Ping(ctx, clientToServer.PeerID())
assert.NotNil(t, err)
}