From cf462b7781974ae21794d00870d73f6831437aad Mon Sep 17 00:00:00 2001 From: Lewis Marshall Date: Sun, 7 May 2017 00:53:21 +0100 Subject: [PATCH] p2p/simulations: Add p2psim command-line API client Signed-off-by: Lewis Marshall --- p2p/simulations/adapters/exec.go | 14 +- p2p/simulations/adapters/inproc.go | 23 +- p2p/simulations/adapters/rpc_mux.go | 218 +++++++++ p2p/simulations/adapters/types.go | 46 +- p2p/simulations/cmd/p2psim/main.go | 425 ++++++++++++++++++ p2p/simulations/examples/p2psim.sh | 43 ++ p2p/simulations/http.go | 311 ++++++++++++- p2p/simulations/http_test.go | 347 ++++++++++++++ p2p/simulations/network.go | 77 ++-- p2p/testing/protocoltester.go | 4 +- .../simulations/discovery/discovery_test.go | 14 +- swarm/network/simulations/overlay.go | 4 +- 12 files changed, 1441 insertions(+), 85 deletions(-) create mode 100644 p2p/simulations/adapters/rpc_mux.go create mode 100644 p2p/simulations/cmd/p2psim/main.go create mode 100755 p2p/simulations/examples/p2psim.sh create mode 100644 p2p/simulations/http_test.go diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index fd31e5ef70a3..bb34a14cf780 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -97,6 +97,7 @@ type ExecNode struct { Info *p2p.NodeInfo client *rpc.Client + rpcMux *rpcMux newCmd func() *exec.Cmd key *ecdsa.PrivateKey } @@ -157,7 +158,8 @@ func (n *ExecNode) Start() (err error) { n.Cmd = cmd // create the RPC client and load the node info - n.client = rpc.NewClientWithConn(pipe2) + n.rpcMux = newRPCMux(pipe2) + n.client = n.rpcMux.Client() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var info p2p.NodeInfo @@ -222,6 +224,16 @@ func (n *ExecNode) NodeInfo() *p2p.NodeInfo { return info } +// ServeRPC serves RPC requests over the given connection using the node's +// RPC multiplexer +func (n *ExecNode) ServeRPC(conn net.Conn) error { + if n.rpcMux == nil { + return errors.New("RPC not started") + } + n.rpcMux.Serve(conn) + return nil +} + func init() { // register a reexec function to start a devp2p node when the current // binary is executed as "p2p-node" diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index fbabe9c5f7f4..481fe187689d 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "net" "sync" "github.com/ethereum/go-ethereum/event" @@ -119,6 +120,7 @@ type SimNode struct { peers map[discover.NodeID]MsgReadWriteCloser peerFeed event.Feed client *rpc.Client + rpcMux *rpcMux // dropPeers is used to force peer disconnects when // the node is stopped @@ -146,6 +148,19 @@ func (self *SimNode) Client() (*rpc.Client, error) { return self.client, nil } +// ServeRPC serves RPC requests over the given connection using the node's +// RPC multiplexer +func (self *SimNode) ServeRPC(conn net.Conn) error { + self.lock.Lock() + mux := self.rpcMux + self.lock.Unlock() + if mux == nil { + return errors.New("RPC not started") + } + mux.Serve(conn) + return nil +} + // Start starts the RPC handler and the underlying service func (self *SimNode) Start() error { self.dropPeers = make(chan struct{}) @@ -203,8 +218,13 @@ func (self *SimNode) startRPC() error { } } + // create an in-process RPC multiplexer + pipe1, pipe2 := net.Pipe() + go handler.ServeCodec(rpc.NewJSONCodec(pipe1), rpc.OptionMethodInvocation|rpc.OptionSubscriptions) + self.rpcMux = newRPCMux(pipe2) + // create an in-process RPC client - self.client = rpc.DialInProc(handler) + self.client = self.rpcMux.Client() return nil } @@ -216,6 +236,7 @@ func (self *SimNode) stopRPC() { if self.client != nil { self.client.Close() self.client = nil + self.rpcMux = nil } } diff --git a/p2p/simulations/adapters/rpc_mux.go b/p2p/simulations/adapters/rpc_mux.go new file mode 100644 index 000000000000..4cae58c223f7 --- /dev/null +++ b/p2p/simulations/adapters/rpc_mux.go @@ -0,0 +1,218 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package adapters + +import ( + "encoding/json" + "net" + "strconv" + "strings" + "sync" + + "github.com/ethereum/go-ethereum/rpc" +) + +// rpcMux is an RPC multiplexer which allows many clients to make RPC requests +// over a single connection by changing each request's ID to a unique value. +// +// This is used by node adapters so that simulations can create many RPC +// clients all sending requests over the underlying node's stdin / stdout. +type rpcMux struct { + conn net.Conn + + mtx sync.Mutex + idCounter uint64 + msgMap map[uint64]*rpcMsg + subMap map[string]*rpcReply + send chan *rpcMsg +} + +type rpcMsg struct { + Method string `json:"method,omitempty"` + Version string `json:"jsonrpc,omitempty"` + Id json.RawMessage `json:"id,omitempty"` + Payload json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error json.RawMessage `json:"error,omitempty"` + + id uint64 + reply *rpcReply +} + +// rpcSub is the payload or result of a subscription RPC message +type rpcSub struct { + Subscription string `json:"subscription"` + Result json.RawMessage `json:"result,omitempty"` +} + +// rpcReply receives replies to RPC messages for a particular client +type rpcReply struct { + ch chan *rpcMsg + closeOnce sync.Once +} + +func (r *rpcReply) close() { + r.closeOnce.Do(func() { close(r.ch) }) +} + +func newRPCMux(conn net.Conn) *rpcMux { + mux := &rpcMux{ + msgMap: make(map[uint64]*rpcMsg), + subMap: make(map[string]*rpcReply), + send: make(chan *rpcMsg), + } + go mux.sendLoop(conn) + go mux.recvLoop(conn) + return mux +} + +// Client creates a new RPC client which sends messages through the multiplexer +func (mux *rpcMux) Client() *rpc.Client { + pipe1, pipe2 := net.Pipe() + go mux.Serve(pipe1) + return rpc.NewClientWithConn(pipe2) +} + +// Serve reads RPC messages from the given connection, forwards them to the +// multiplexed connnection and writes replies back to the given connection +func (mux *rpcMux) Serve(conn net.Conn) { + // reply will receive replies to any messages we send + reply := &rpcReply{ch: make(chan *rpcMsg)} + defer func() { + // drain the channel to prevent blocking the recvLoop + for range reply.ch { + } + }() + + // start a goroutine to read RPC messages from the connection and + // forward them to the sendLoop + done := make(chan struct{}) + go func() { + defer close(done) + dec := json.NewDecoder(conn) + for { + msg := &rpcMsg{} + if err := dec.Decode(msg); err != nil { + return + } + msg.reply = reply + mux.send <- msg + } + }() + + // write message replies to the connection + enc := json.NewEncoder(conn) + for { + select { + case msg, ok := <-reply.ch: + if !ok { + return + } + if err := enc.Encode(msg); err != nil { + return + } + case <-done: + return + } + } +} + +// sendLoop receives messages from the send channel, changes their ID and +// writes them to the given connection +func (mux *rpcMux) sendLoop(conn net.Conn) { + enc := json.NewEncoder(conn) + for msg := range mux.send { + if err := enc.Encode(mux.newMsg(msg)); err != nil { + return + } + } +} + +// recvLoop reads messages from the given connection, changes their ID back +// to the oringal value and sends them to the message's reply channel +func (mux *rpcMux) recvLoop(conn net.Conn) { + // close all reply channels if we get an error + defer func() { + mux.mtx.Lock() + defer mux.mtx.Unlock() + for _, msg := range mux.msgMap { + msg.reply.close() + } + }() + + dec := json.NewDecoder(conn) + for { + msg := &rpcMsg{} + if err := dec.Decode(msg); err != nil { + return + } + if reply := mux.lookup(msg); reply != nil { + reply.ch <- msg + } + } +} + +// newMsg copies the given message and changes it's ID to a unique value +func (mux *rpcMux) newMsg(msg *rpcMsg) *rpcMsg { + mux.mtx.Lock() + defer mux.mtx.Unlock() + id := mux.idCounter + mux.idCounter++ + mux.msgMap[id] = msg + newMsg := *msg + newMsg.Id = json.RawMessage(strconv.FormatUint(id, 10)) + return &newMsg +} + +// lookup looks up the original message for which the given message is a reply +func (mux *rpcMux) lookup(msg *rpcMsg) *rpcReply { + mux.mtx.Lock() + defer mux.mtx.Unlock() + + // if the message has no ID, it is a subscription notification so + // lookup the original subscribe message + if msg.Id == nil { + sub := &rpcSub{} + if err := json.Unmarshal(msg.Payload, sub); err != nil { + return nil + } + return mux.subMap[sub.Subscription] + } + + // lookup the original message and restore the ID + id, err := strconv.ParseUint(string(msg.Id), 10, 64) + if err != nil { + return nil + } + origMsg, ok := mux.msgMap[id] + if !ok { + return nil + } + delete(mux.msgMap, id) + msg.Id = origMsg.Id + + // if the original message was a subscription, store the subscription + // ID so we can detect notifications + if strings.HasSuffix(string(origMsg.Method), "_subscribe") { + var result string + if err := json.Unmarshal(msg.Result, &result); err == nil { + mux.subMap[result] = origMsg.reply + } + } + + return origMsg.reply +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index 5e9e34ce8d53..4bfdded042b5 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -21,6 +21,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "net" "os" "github.com/docker/docker/pkg/reexec" @@ -46,6 +47,10 @@ type Node interface { // up and running Client() (*rpc.Client, error) + // ServeRPC serves RPC requests over the given connection using the + // node's RPC multiplexer + ServeRPC(net.Conn) error + // Start starts the node Start() error @@ -116,6 +121,9 @@ type NodeConfig struct { Id *NodeId PrivateKey *ecdsa.PrivateKey + // Name is a human friendly name for the node like "node01" + Name string + // Service is the name of the service which should be run when starting // the node (for SimNodes it should be the name of a service contained // in SimAdapter.services, for other nodes it should be a service @@ -128,15 +136,22 @@ type NodeConfig struct { type nodeConfigJSON struct { Id string `json:"id"` PrivateKey string `json:"private_key"` + Name string `json:"name"` Service string `json:"service"` } func (n *NodeConfig) MarshalJSON() ([]byte, error) { - return json.Marshal(nodeConfigJSON{ - n.Id.String(), - hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)), - n.Service, - }) + confJSON := nodeConfigJSON{ + Name: n.Name, + Service: n.Service, + } + if n.Id != nil { + confJSON.Id = n.Id.String() + } + if n.PrivateKey != nil { + confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) + } + return json.Marshal(confJSON) } func (n *NodeConfig) UnmarshalJSON(data []byte) error { @@ -145,18 +160,23 @@ func (n *NodeConfig) UnmarshalJSON(data []byte) error { return err } - nodeID, err := discover.HexID(confJSON.Id) - if err != nil { - return err + if confJSON.Id != "" { + nodeID, err := discover.HexID(confJSON.Id) + if err != nil { + return err + } + n.Id = &NodeId{NodeID: nodeID} } - n.Id = &NodeId{NodeID: nodeID} - key, err := hex.DecodeString(confJSON.PrivateKey) - if err != nil { - return err + if confJSON.PrivateKey != "" { + key, err := hex.DecodeString(confJSON.PrivateKey) + if err != nil { + return err + } + n.PrivateKey = crypto.ToECDSA(key) } - n.PrivateKey = crypto.ToECDSA(key) + n.Name = confJSON.Name n.Service = confJSON.Service return nil diff --git a/p2p/simulations/cmd/p2psim/main.go b/p2p/simulations/cmd/p2psim/main.go new file mode 100644 index 000000000000..b0b53a501633 --- /dev/null +++ b/p2p/simulations/cmd/p2psim/main.go @@ -0,0 +1,425 @@ +// p2psim provides a command-line client for a simulation API. +// +// Here is an example of creating a 2 node network with the first node +// connected to the second: +// +// $ p2psim network create +// Created network net1 +// +// $ p2psim node create net1 +// Created node01 +// +// $ p2psim node start net1 node01 +// Started node01 +// +// $ p2psim node create net1 +// Created node02 +// +// $ p2psim node start net1 node02 +// Started node02 +// +// $ p2psim node connect net1 node01 node02 +// Connected node01 to node02 +// +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "strings" + "text/tabwriter" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" + "gopkg.in/urfave/cli.v1" +) + +var client *simulations.Client + +func main() { + app := cli.NewApp() + app.Usage = "devp2p simulation command-line client" + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "api", + Value: "http://localhost:8888", + Usage: "simulation API URL", + EnvVar: "P2PSIM_API_URL", + }, + } + app.Before = func(ctx *cli.Context) error { + client = simulations.NewClient(ctx.GlobalString("api")) + return nil + } + app.Commands = []cli.Command{ + { + Name: "network", + Usage: "manage simulation networks", + Action: listNetworks, + Subcommands: []cli.Command{ + { + Name: "list", + Usage: "list networks", + Action: listNetworks, + }, + { + Name: "create", + Usage: "create a network", + Action: createNetwork, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config", + Value: "{}", + Usage: "JSON encoded network config", + }, + }, + }, + { + Name: "show", + ArgsUsage: "", + Usage: "show network information", + Action: showNetwork, + }, + { + Name: "events", + ArgsUsage: "", + Usage: "stream network events", + Action: streamNetwork, + }, + }, + }, + { + Name: "node", + Usage: "manage simulation nodes", + Action: listNodes, + Subcommands: []cli.Command{ + { + Name: "list", + ArgsUsage: "", + Usage: "list nodes", + Action: listNodes, + }, + { + Name: "create", + ArgsUsage: "", + Usage: "create a node", + Action: createNode, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config", + Value: "{}", + Usage: "JSON encoded node config", + }, + }, + }, + { + Name: "show", + ArgsUsage: " ", + Usage: "show node information", + Action: showNode, + }, + { + Name: "start", + ArgsUsage: " ", + Usage: "start a node", + Action: startNode, + }, + { + Name: "stop", + ArgsUsage: " ", + Usage: "stop a node", + Action: stopNode, + }, + { + Name: "connect", + ArgsUsage: " ", + Usage: "connect a node to a peer node", + Action: connectNode, + }, + { + Name: "disconnect", + ArgsUsage: " ", + Usage: "disconnect a node from a peer node", + Action: disconnectNode, + }, + { + Name: "rpc", + ArgsUsage: " []", + Usage: "call a node RPC method", + Action: rpcNode, + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "subscribe", + Usage: "method is a subscription", + }, + }, + }, + }, + }, + } + app.Run(os.Args) +} + +func listNetworks(ctx *cli.Context) error { + if len(ctx.Args()) != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networks, err := client.GetNetworks() + if err != nil { + return err + } + w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) + defer w.Flush() + fmt.Fprintf(w, "ID\tNODES\tCONNS\n") + for _, network := range networks { + fmt.Fprintf(w, "%s\t%d\t%d\n", network.Id, len(network.Nodes), len(network.Conns)) + } + return nil +} + +func createNetwork(ctx *cli.Context) error { + if len(ctx.Args()) != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + config := &simulations.NetworkConfig{} + if err := json.Unmarshal([]byte(ctx.String("config")), config); err != nil { + return err + } + network, err := client.CreateNetwork(config) + if err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Created network", network.Id) + return nil +} + +func showNetwork(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + network, err := client.GetNetwork(networkID) + if err != nil { + return err + } + w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) + defer w.Flush() + fmt.Fprintf(w, "ID\t%s\n", network.Id) + fmt.Fprintf(w, "NODES\t%d\n", len(network.Nodes)) + fmt.Fprintf(w, "CONNS\t%d\n", len(network.Conns)) + return nil +} + +func streamNetwork(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + events := make(chan *simulations.Event) + sub, err := client.SubscribeNetwork(networkID, events) + if err != nil { + return err + } + defer sub.Unsubscribe() + enc := json.NewEncoder(ctx.App.Writer) + for { + select { + case event := <-events: + if err := enc.Encode(event); err != nil { + return err + } + case err := <-sub.Err(): + return err + } + } +} + +func listNodes(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodes, err := client.GetNodes(networkID) + if err != nil { + return err + } + w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) + defer w.Flush() + fmt.Fprintf(w, "NAME\tPROTOCOLS\tID\n") + for _, node := range nodes { + fmt.Fprintf(w, "%s\t%s\t%s\n", node.Name, strings.Join(protocolList(node), ","), node.ID) + } + return nil +} + +func protocolList(node *p2p.NodeInfo) []string { + protos := make([]string, 0, len(node.Protocols)) + for name := range node.Protocols { + protos = append(protos, name) + } + return protos +} + +func createNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + config := &adapters.NodeConfig{} + if err := json.Unmarshal([]byte(ctx.String("config")), config); err != nil { + return err + } + node, err := client.CreateNode(networkID, config) + if err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Created", node.Name) + return nil +} + +func showNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 2 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + node, err := client.GetNode(networkID, nodeName) + if err != nil { + return err + } + w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) + defer w.Flush() + fmt.Fprintf(w, "NAME\t%s\n", node.Name) + fmt.Fprintf(w, "PROTOCOLS\t%s\n", strings.Join(protocolList(node), ",")) + fmt.Fprintf(w, "ID\t%s\n", node.ID) + fmt.Fprintf(w, "ENODE\t%s\n", node.Enode) + for name, proto := range node.Protocols { + fmt.Fprintln(w) + fmt.Fprintf(w, "--- PROTOCOL INFO: %s\n", name) + fmt.Fprintf(w, "%v\n", proto) + fmt.Fprintf(w, "---\n") + } + return nil +} + +func startNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 2 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + if err := client.StartNode(networkID, nodeName); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Started", nodeName) + return nil +} + +func stopNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 2 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + if err := client.StopNode(networkID, nodeName); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Stopped", nodeName) + return nil +} + +func connectNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 3 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + peerName := args[2] + if err := client.ConnectNode(networkID, nodeName, peerName); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Connected", nodeName, "to", peerName) + return nil +} + +func disconnectNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 3 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + peerName := args[2] + if err := client.DisconnectNode(networkID, nodeName, peerName); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Disconnected", nodeName, "from", peerName) + return nil +} + +func rpcNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) < 3 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + method := args[2] + rpcClient, err := client.RPCClient(context.Background(), networkID, nodeName) + if err != nil { + return err + } + if ctx.Bool("subscribe") { + return rpcSubscribe(rpcClient, ctx.App.Writer, method, args[3:]...) + } + var result interface{} + params := make([]interface{}, len(args[3:])) + for i, v := range args[3:] { + params[i] = v + } + if err := rpcClient.Call(&result, method, params...); err != nil { + return err + } + return json.NewEncoder(ctx.App.Writer).Encode(result) +} + +func rpcSubscribe(client *rpc.Client, out io.Writer, method string, args ...string) error { + parts := strings.SplitN(method, "_", 2) + namespace := parts[0] + method = parts[1] + ch := make(chan interface{}) + subArgs := make([]interface{}, len(args)+1) + subArgs[0] = method + for i, v := range args { + subArgs[i+1] = v + } + sub, err := client.Subscribe(context.Background(), namespace, ch, subArgs...) + if err != nil { + return err + } + defer sub.Unsubscribe() + enc := json.NewEncoder(out) + for { + select { + case v := <-ch: + if err := enc.Encode(v); err != nil { + return err + } + case err := <-sub.Err(): + return err + } + } +} diff --git a/p2p/simulations/examples/p2psim.sh b/p2p/simulations/examples/p2psim.sh new file mode 100755 index 000000000000..7c83285cc78b --- /dev/null +++ b/p2p/simulations/examples/p2psim.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# +# Start a network simulation using the API started by connectivity.go + +set -e + +main() { + if ! which p2psim &>/dev/null; then + fail "missing p2psim binary (you need to build p2p/simulations/cmd/p2psim)" + fi + + info "creating the example network" + p2psim network create --config '{"id": "example", "default_service": "ping-pong"}' + + info "creating 10 nodes" + for i in $(seq 1 10); do + p2psim node create "example" + p2psim node start "example" "$(node_name $i)" + done + + info "connecting node01 to all other nodes" + for i in $(seq 2 10); do + p2psim node connect "example" "node01" "$(node_name $i)" + done + + info "done" +} + +node_name() { + local num=$1 + echo "node$(printf '%02d' $num)" +} + +info() { + echo -e "\033[1;32m---> $(date +%H:%M:%S) ${@}\033[0m" +} + +fail() { + echo -e "\033[1;31mERROR: ${@}\033[0m" >&2 + exit 1 +} + +main "$@" diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 7809830d4072..9ae935e08d9f 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -1,23 +1,241 @@ package simulations import ( + "bufio" + "bytes" "context" "encoding/json" "fmt" + "io" + "io/ioutil" "net/http" + "strings" "sync" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" "github.com/julienschmidt/httprouter" - "github.com/pborman/uuid" + "golang.org/x/net/websocket" ) +// DefaultClient is the default simulation API client which expects the API +// to be running at http://localhost:8888 +var DefaultClient = NewClient("http://localhost:8888") + +// Client is a client for the simulation HTTP API which supports creating +// and managing simulation networks +type Client struct { + URL string + + client *http.Client +} + +// NewClient returns a new simulation API client +func NewClient(url string) *Client { + return &Client{ + URL: url, + client: http.DefaultClient, + } +} + +// GetNetworks returns a list of simulations networks +func (c *Client) GetNetworks() ([]*Network, error) { + var networks []*Network + return networks, c.Get("/networks", &networks) +} + +// CreateNetwork creates a new simulation network +func (c *Client) CreateNetwork(config *NetworkConfig) (*Network, error) { + network := &Network{} + return network, c.Post("/networks", config, network) +} + +// GetNetwork returns details of a network +func (c *Client) GetNetwork(networkID string) (*Network, error) { + network := &Network{} + return network, c.Get(fmt.Sprintf("/networks/%s", networkID), network) +} + +// SubscribeNetwork subscribes to network events which are sent from the server +// as a server-sent-events stream +func (c *Client) SubscribeNetwork(networkID string, events chan *Event) (event.Subscription, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("%s/networks/%s/events", c.URL, networkID), nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "text/event-stream") + res, err := c.client.Do(req) + if err != nil { + return nil, err + } + if res.StatusCode != http.StatusOK { + response, _ := ioutil.ReadAll(res.Body) + res.Body.Close() + return nil, fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + + // define a producer function to pass to event.Subscription + // which reads server-sent events from res.Body and sends + // them to the events channel + producer := func(stop <-chan struct{}) error { + defer res.Body.Close() + + // read lines from res.Body in a goroutine so that we are + // always reading from the stop channel + lines := make(chan string) + errC := make(chan error, 1) + go func() { + s := bufio.NewScanner(res.Body) + for s.Scan() { + select { + case lines <- s.Text(): + case <-stop: + return + } + } + errC <- s.Err() + }() + + // detect any lines which start with "data:", decode the data + // into an event and send it to the events channel + for { + select { + case line := <-lines: + if !strings.HasPrefix(line, "data:") { + continue + } + data := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + event := &Event{} + if err := json.Unmarshal([]byte(data), event); err != nil { + return fmt.Errorf("error decoding SSE event: %s", err) + } + select { + case events <- event: + case <-stop: + return nil + } + case err := <-errC: + return err + case <-stop: + return nil + } + } + } + + return event.NewSubscription(producer), nil +} + +// GetNodes returns all nodes which exist in a network +func (c *Client) GetNodes(networkID string) ([]*p2p.NodeInfo, error) { + var nodes []*p2p.NodeInfo + return nodes, c.Get(fmt.Sprintf("/networks/%s/nodes", networkID), &nodes) +} + +// CreateNode creates a node in a network using the given configuration +func (c *Client) CreateNode(networkID string, config *adapters.NodeConfig) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Post(fmt.Sprintf("/networks/%s/nodes", networkID), config, node) +} + +// GetNode returns details of a node +func (c *Client) GetNode(networkID, nodeID string) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Get(fmt.Sprintf("/networks/%s/nodes/%s", networkID, nodeID), node) +} + +// StartNode starts a node +func (c *Client) StartNode(networkID, nodeID string) error { + return c.Post(fmt.Sprintf("/networks/%s/nodes/%s/start", networkID, nodeID), nil, nil) +} + +// StopNode stops a node +func (c *Client) StopNode(networkID, nodeID string) error { + return c.Post(fmt.Sprintf("/networks/%s/nodes/%s/stop", networkID, nodeID), nil, nil) +} + +// ConnectNode connects a node to a peer node +func (c *Client) ConnectNode(networkID, nodeID, peerID string) error { + return c.Post(fmt.Sprintf("/networks/%s/nodes/%s/conn/%s", networkID, nodeID, peerID), nil, nil) +} + +// DisconnectNode disconnects a node from a peer node +func (c *Client) DisconnectNode(networkID, nodeID, peerID string) error { + return c.Delete(fmt.Sprintf("/networks/%s/nodes/%s/conn/%s", networkID, nodeID, peerID)) +} + +// RPCClient returns an RPC client connected to a node +func (c *Client) RPCClient(ctx context.Context, networkID, nodeID string) (*rpc.Client, error) { + baseURL := strings.Replace(c.URL, "http", "ws", 1) + return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/networks/%s/nodes/%s/rpc", baseURL, networkID, nodeID), "") +} + +// Get performs a HTTP GET request decoding the resulting JSON response +// into "out" +func (c *Client) Get(path string, out interface{}) error { + return c.Send("GET", path, nil, out) +} + +// Post performs a HTTP POST request sending "in" as the JSON body and +// decoding the resulting JSON response into "out" +func (c *Client) Post(path string, in, out interface{}) error { + return c.Send("POST", path, in, out) +} + +// Delete performs a HTTP DELETE request +func (c *Client) Delete(path string) error { + return c.Send("DELETE", path, nil, nil) +} + +// Send performs a HTTP request, sending "in" as the JSON request body and +// decoding the JSON response into "out" +func (c *Client) Send(method, path string, in, out interface{}) error { + var body []byte + if in != nil { + var err error + body, err = json.Marshal(in) + if err != nil { + return err + } + } + req, err := http.NewRequest(method, c.URL+path, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + res, err := c.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated { + response, _ := ioutil.ReadAll(res.Body) + return fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + if out != nil { + if err := json.NewDecoder(res.Body).Decode(out); err != nil { + return err + } + } + return nil +} + +// ServerConfig is the configuration used to start an API server type ServerConfig struct { + // Adapter is the NodeAdapter to use when creating new networks Adapter adapters.NodeAdapter - Mocker func(*Network) + + // Mocker is the function which will be called when a client sends a + // POST request to /networks//mock and is expected to + // generate some mock events in the network + Mocker func(*Network) } +// Server is an HTTP server providing an API to create and manage simulation +// networks type Server struct { ServerConfig @@ -26,6 +244,7 @@ type Server struct { mtx sync.Mutex } +// NewServer returns a new simulation API server func NewServer(config *ServerConfig) *Server { if config.Adapter == nil { panic("Adapter not set") @@ -40,6 +259,7 @@ func NewServer(config *ServerConfig) *Server { s.POST("/networks", s.CreateNetwork) s.GET("/networks", s.GetNetworks) s.GET("/networks/:netid", s.GetNetwork) + s.GET("/networks/:netid/events", s.StreamNetworkEvents) s.POST("/networks/:netid/mock", s.StartMocker) s.POST("/networks/:netid/nodes", s.CreateNode) s.GET("/networks/:netid/nodes", s.GetNodes) @@ -48,10 +268,12 @@ func NewServer(config *ServerConfig) *Server { s.POST("/networks/:netid/nodes/:nodeid/stop", s.StopNode) s.POST("/networks/:netid/nodes/:nodeid/conn/:peerid", s.ConnectNode) s.DELETE("/networks/:netid/nodes/:nodeid/conn/:peerid", s.DisconnectNode) + s.GET("/networks/:netid/nodes/:nodeid/rpc", s.NodeRPC) return s } +// CreateNetwork creates a new simulation network func (s *Server) CreateNetwork(w http.ResponseWriter, req *http.Request) { config := &NetworkConfig{} if err := json.NewDecoder(req.Body).Decode(config); err != nil { @@ -59,49 +281,44 @@ func (s *Server) CreateNetwork(w http.ResponseWriter, req *http.Request) { return } - if config.Id == "" { - config.Id = uuid.NewRandom().String() - } - - err := func() error { + network, err := func() (*Network, error) { s.mtx.Lock() defer s.mtx.Unlock() + if config.Id == "" { + config.Id = fmt.Sprintf("net%d", len(s.networks)+1) + } if _, exists := s.networks[config.Id]; exists { - return fmt.Errorf("network exists: %s", config.Id) + return nil, fmt.Errorf("network exists: %s", config.Id) } network := NewNetwork(s.Adapter, config) s.networks[config.Id] = network - return nil + return network, nil }() if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - s.JSON(w, http.StatusCreated, config) + s.JSON(w, http.StatusCreated, network) } +// GetNetworks returns a list of simulations networks func (s *Server) GetNetworks(w http.ResponseWriter, req *http.Request) { s.mtx.Lock() - networks := make([]NetworkConfig, 0, len(s.networks)) + networks := make([]*Network, 0, len(s.networks)) for _, network := range s.networks { - config := network.Config() - networks = append(networks, *config) + networks = append(networks, network) } s.mtx.Unlock() s.JSON(w, http.StatusOK, networks) } +// GetNetwork returns details of a network func (s *Server) GetNetwork(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) - if req.Header.Get("Accept") == "text/event-stream" { - s.streamNetworkEvents(network, w) - return - } - - s.JSON(w, http.StatusOK, network.Config()) + s.JSON(w, http.StatusOK, network) } func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { @@ -117,7 +334,10 @@ func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) } -func (s *Server) streamNetworkEvents(network *Network, w http.ResponseWriter) { +// StreamNetworkEvents streams network events as a server-sent-events stream +func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { + network := req.Context().Value("network").(*Network) + events := make(chan *Event) sub := network.events.Subscribe(events) defer sub.Unsubscribe() @@ -161,18 +381,27 @@ func (s *Server) streamNetworkEvents(network *Network, w http.ResponseWriter) { } } +// CreateNode creates a node in a network using the given configuration func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) - config, err := network.NewNode() + config := adapters.RandomNodeConfig() + err := json.NewDecoder(req.Body).Decode(config) + if err != nil && err != io.EOF { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + node, err := network.NewNodeWithConfig(config) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - s.JSON(w, http.StatusCreated, network.GetNode(config.Id)) + s.JSON(w, http.StatusCreated, node.NodeInfo()) } +// GetNodes returns all nodes which exist in a network func (s *Server) GetNodes(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) @@ -186,12 +415,14 @@ func (s *Server) GetNodes(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, infos) } +// GetNode returns details of a node func (s *Server) GetNode(w http.ResponseWriter, req *http.Request) { node := req.Context().Value("node").(*Node) s.JSON(w, http.StatusOK, node.NodeInfo()) } +// StartNode starts a node func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) @@ -204,6 +435,7 @@ func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// StopNode stops a node func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) @@ -216,6 +448,7 @@ func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// ConnectNode connects a node to a peer node func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) @@ -229,6 +462,7 @@ func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// DisconnectNode disconnects a node from a peer node func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) @@ -242,28 +476,47 @@ func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// NodeRPC proxies node RPC requests via a WebSocket connection +func (s *Server) NodeRPC(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + handler := func(conn *websocket.Conn) { + node.ServeRPC(conn) + } + + websocket.Server{Handler: handler}.ServeHTTP(w, req) +} + +// ServeHTTP implements the http.Handler interface by delegating to the +// underlying httprouter.Router func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.router.ServeHTTP(w, req) } +// GET registers a handler for GET requests to a particular path func (s *Server) GET(path string, handle http.HandlerFunc) { s.router.GET(path, s.wrapHandler(handle)) } +// POST registers a handler for POST requests to a particular path func (s *Server) POST(path string, handle http.HandlerFunc) { s.router.POST(path, s.wrapHandler(handle)) } +// DELETE registers a handler for DELETE requests to a particular path func (s *Server) DELETE(path string, handle http.HandlerFunc) { s.router.DELETE(path, s.wrapHandler(handle)) } +// JSON sends "data" as a JSON HTTP response func (s *Server) JSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } +// wrapHandler returns a httprouter.Handle which wraps a http.HandlerFunc by +// populating request.Context with any objects from the URL params func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -289,7 +542,12 @@ func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { http.NotFound(w, req) return } - node := network.GetNode(adapters.NewNodeIdFromHex(id)) + var node *Node + if nodeID, err := discover.HexID(id); err == nil { + node = network.GetNode(&adapters.NodeId{NodeID: nodeID}) + } else { + node = network.GetNodeByName(id) + } if node == nil { http.NotFound(w, req) return @@ -302,7 +560,12 @@ func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { http.NotFound(w, req) return } - peer := network.GetNode(adapters.NewNodeIdFromHex(id)) + var peer *Node + if peerID, err := discover.HexID(id); err == nil { + peer = network.GetNode(&adapters.NodeId{NodeID: peerID}) + } else { + peer = network.GetNodeByName(id) + } if peer == nil { http.NotFound(w, req) return diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go new file mode 100644 index 000000000000..0f1b603add7f --- /dev/null +++ b/p2p/simulations/http_test.go @@ -0,0 +1,347 @@ +package simulations + +import ( + "context" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" +) + +type testService struct { + id *adapters.NodeId +} + +func newTestService(id *adapters.NodeId) node.Service { + return &testService{id} +} + +func (t *testService) Protocols() []p2p.Protocol { + return []p2p.Protocol{{ + Name: "test", + Version: 1, + Length: 1, + Run: t.Run, + }} +} + +func (t *testService) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "test", + Version: "1.0", + Service: &TestAPI{}, + }} +} + +func (t *testService) Start(server p2p.Server) error { + return nil +} + +func (t *testService) Stop() error { + return nil +} + +func (t *testService) Run(_ *p2p.Peer, rw p2p.MsgReadWriter) error { + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} + +// TestAPI provides a simple API to get and increment a counter and to +// subscribe to increment events +type TestAPI struct { + counter int64 + feed event.Feed +} + +func (t *TestAPI) Get() int64 { + return atomic.LoadInt64(&t.counter) +} + +func (t *TestAPI) Add(delta int64) { + atomic.AddInt64(&t.counter, delta) + t.feed.Send(delta) +} + +func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + events := make(chan int64) + sub := t.feed.Subscribe(events) + defer sub.Unsubscribe() + + for { + select { + case event := <-events: + notifier.Notify(rpcSub.ID, event) + case <-sub.Err(): + return + case <-rpcSub.Err(): + return + case <-notifier.Closed(): + return + } + } + }() + + return rpcSub, nil +} + +var testServices = adapters.Services{ + "test": newTestService, +} + +// TestHTTPNetwork tests creating and interacting with a simulation +// network using the HTTP API +func TestHTTPNetwork(t *testing.T) { + // start the server + srv := NewServer(&ServerConfig{ + Adapter: adapters.NewSimAdapter(testServices), + }) + s := httptest.NewServer(srv) + defer s.Close() + + // create a network + client := NewClient(s.URL) + config := &NetworkConfig{ + DefaultService: "test", + } + network, err := client.CreateNetwork(config) + if err != nil { + t.Fatalf("error creating network: %s", err) + } + + // subscribe to events so we can check them later + events := make(chan *Event, 100) + sub, err := client.SubscribeNetwork(network.Id, events) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // check the network has an ID + if network.Id == "" { + t.Fatal("expected network.Id to be set") + } + + // check the network exists + networks, err := client.GetNetworks() + if err != nil { + t.Fatalf("error getting networks: %s", err) + } + if len(networks) != 1 { + t.Fatalf("expected 1 network, got %d", len(networks)) + } + if networks[0].Id != network.Id { + t.Fatalf("expected network to have ID %q, got %q", network.Id, networks[0].Id) + } + gotNetwork, err := client.GetNetwork(network.Id) + if err != nil { + t.Fatalf("error getting network: %s", err) + } + if gotNetwork.Id != network.Id { + t.Fatalf("expected network to have ID %q, got %q", network.Id, gotNetwork.Id) + } + + // create 2 nodes + nodeIDs := make([]string, 2) + for i := 0; i < 2; i++ { + config := &adapters.NodeConfig{} + node, err := client.CreateNode(network.Id, config) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + nodeIDs[i] = node.ID + } + + // check both nodes exist + nodes, err := client.GetNodes(network.Id) + if err != nil { + t.Fatalf("error getting nodes: %s", err) + } + if len(nodes) != 2 { + t.Fatalf("expected 2 nodes, got %d", len(nodes)) + } + for i, nodeID := range nodeIDs { + if nodes[i].ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID) + } + node, err := client.GetNode(network.Id, nodeID) + if err != nil { + t.Fatalf("error getting node %d: %s", i, err) + } + if node.ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, node.ID) + } + } + + // start both nodes + for _, nodeID := range nodeIDs { + if err := client.StartNode(network.Id, nodeID); err != nil { + t.Fatalf("error starting node %q: %s", nodeID, err) + } + } + + // connect the nodes + if err := client.ConnectNode(network.Id, nodeIDs[0], nodeIDs[1]); err != nil { + t.Fatalf("error connecting nodes: %s", err) + } + + // check we got all the events + nodeEvent := func(id string, up bool) *Event { + return &Event{ + Type: EventTypeNode, + Node: &Node{ + Config: &adapters.NodeConfig{ + Id: adapters.NewNodeIdFromHex(id), + }, + Up: up, + }, + } + } + connEvent := func(one, other string, up bool) *Event { + return &Event{ + Type: EventTypeConn, + Conn: &Conn{ + One: adapters.NewNodeIdFromHex(one), + Other: adapters.NewNodeIdFromHex(other), + Up: up, + }, + } + } + expectedEvents := []*Event{ + nodeEvent(nodeIDs[0], false), + nodeEvent(nodeIDs[1], false), + nodeEvent(nodeIDs[0], true), + nodeEvent(nodeIDs[1], true), + connEvent(nodeIDs[0], nodeIDs[1], false), + connEvent(nodeIDs[0], nodeIDs[1], true), + } + timeout := time.After(10 * time.Second) + for i := 0; i < len(expectedEvents); i++ { + select { + case event := <-events: + t.Logf("received %s event: %s", event.Type, event) + + expected := expectedEvents[i] + if event.Type != expected.Type { + t.Fatalf("expected event %d to have type %q, got %q", i, expected.Type, event.Type) + } + + switch expected.Type { + + case EventTypeNode: + if event.Node == nil { + t.Fatal("expected event.Node to be set") + } + if event.Node.ID().NodeID != expected.Node.ID().NodeID { + t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().Label(), event.Node.ID().Label()) + } + if event.Node.Up != expected.Node.Up { + t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up) + } + + case EventTypeConn: + if event.Conn == nil { + t.Fatal("expected event.Conn to be set") + } + if event.Conn.One.NodeID != expected.Conn.One.NodeID { + t.Fatalf("expected conn event %d to have one=%q, got one=%q", i, expected.Conn.One.Label(), event.Conn.One.Label()) + } + if event.Conn.Other.NodeID != expected.Conn.Other.NodeID { + t.Fatalf("expected conn event %d to have other=%q, got other=%q", i, expected.Conn.Other.Label(), event.Conn.Other.Label()) + } + if event.Conn.Up != expected.Conn.Up { + t.Fatalf("expected conn event %d to have up=%t, got up=%t", i, expected.Conn.Up, event.Conn.Up) + } + } + + case err := <-sub.Err(): + t.Fatalf("network stream closed unexpectedly: %s", err) + + case <-timeout: + t.Fatal("timed out waiting for expected events") + } + } +} + +// TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API +func TestHTTPNodeRPC(t *testing.T) { + // start the server + srv := NewServer(&ServerConfig{ + Adapter: adapters.NewSimAdapter(testServices), + }) + s := httptest.NewServer(srv) + defer s.Close() + + // start a node in a network + client := NewClient(s.URL) + network, err := client.CreateNetwork(&NetworkConfig{DefaultService: "test"}) + if err != nil { + t.Fatalf("error creating network: %s", err) + } + node, err := client.CreateNode(network.Id, &adapters.NodeConfig{}) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := client.StartNode(network.Id, node.ID); err != nil { + t.Fatalf("error starting node: %s", err) + } + + // create two RPC clients + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + rpcClient1, err := client.RPCClient(ctx, network.Id, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + rpcClient2, err := client.RPCClient(ctx, network.Id, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + + // subscribe to events using client 1 + events := make(chan int64, 1) + sub, err := rpcClient1.Subscribe(ctx, "test", events, "events") + if err != nil { + t.Fatalf("error subscribing to events: %s", err) + } + defer sub.Unsubscribe() + + // call some RPC methods using client 2 + if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + var result int64 + if err := rpcClient2.CallContext(ctx, &result, "test_get"); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + if result != 10 { + t.Fatalf("expected result to be 10, got %d", result) + } + + // check we got an event from client 1 + select { + case event := <-events: + if event != 10 { + t.Fatalf("expected event to be 10, got %d", event) + } + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index b90f367b890d..bed5ae5cb0c6 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -40,26 +40,19 @@ import ( ) type NetworkConfig struct { - // Type NetworkType - // Config json.RawMessage // type-specific configs - // type - // Events []string - Id string - DefaultMockerConfig *MockerConfig - Backend bool - DefaultService string -} - -type NetworkControl interface { - Events() *event.TypeMux - Config() *NetworkConfig - Subscribe(*event.TypeMux, ...interface{}) + Id string `json:"id"` + DefaultService string `json:"default_service,omitempty"` } // Network models a p2p network // the actual logic of bringing nodes and connections up and down and // messaging is implemented in the particular NodeAdapter interface type Network struct { + NetworkConfig + + Nodes []*Node `json:"nodes"` + Conns []*Conn `json:"conns"` + nodeAdapter adapters.NodeAdapter // input trigger events and other events @@ -67,19 +60,16 @@ type Network struct { lock sync.RWMutex nodeMap map[discover.NodeID]int connMap map[string]int - Nodes []*Node `json:"nodes"` - Conns []*Conn `json:"conns"` quitc chan bool - conf *NetworkConfig } func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network { return &Network{ - nodeAdapter: nodeAdapter, - conf: conf, - nodeMap: make(map[discover.NodeID]int), - connMap: make(map[string]int), - quitc: make(chan bool), + NetworkConfig: *conf, + nodeAdapter: nodeAdapter, + nodeMap: make(map[discover.NodeID]int), + connMap: make(map[string]int), + quitc: make(chan bool), } } @@ -122,7 +112,7 @@ func (self *Network) executeNodeEvent(e *Event) error { return self.Stop(e.Node.ID()) } - if err := self.NewNodeWithConfig(e.Node.Config); err != nil { + if _, err := self.NewNodeWithConfig(e.Node.Config); err != nil { return err } return self.Start(e.Node.ID()) @@ -158,6 +148,12 @@ func (self *Node) String() string { return fmt.Sprintf("Node %v", self.ID().Label()) } +func (self *Node) NodeInfo() *p2p.NodeInfo { + info := self.Node.NodeInfo() + info.Name = self.Config.Name + return info +} + // active connections are represented by the Node entry object so that // you journal updates could filter if passive knowledge about peers is // irrelevant @@ -191,34 +187,34 @@ func (self *Msg) String() string { } // NewNode adds a new node to the network with a random ID -func (self *Network) NewNode() (*adapters.NodeConfig, error) { +func (self *Network) NewNode() (*Node, error) { conf := adapters.RandomNodeConfig() - conf.Service = self.conf.DefaultService - if err := self.NewNodeWithConfig(conf); err != nil { - return nil, err - } - return conf, nil + conf.Service = self.DefaultService + return self.NewNodeWithConfig(conf) } // NewNodeWithConfig adds a new node to the network with the given config // errors if a node by the same id already exist -func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) error { +func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) { self.lock.Lock() defer self.lock.Unlock() id := conf.Id + if conf.Name == "" { + conf.Name = fmt.Sprintf("node%02d", len(self.Nodes)+1) + } if conf.Service == "" { - conf.Service = self.conf.DefaultService + conf.Service = self.DefaultService } _, found := self.nodeMap[id.NodeID] if found { - return fmt.Errorf("node %v already added", id) + return nil, fmt.Errorf("node %v already added", id) } self.nodeMap[id.NodeID] = len(self.Nodes) adapterNode, err := self.nodeAdapter.NewNode(conf) if err != nil { - return err + return nil, err } node := &Node{ Node: adapterNode, @@ -227,11 +223,11 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) error { self.Nodes = append(self.Nodes, node) log.Trace(fmt.Sprintf("node %v created", id)) self.events.Send(ControlEvent(node)) - return nil + return node, nil } func (self *Network) Config() *NetworkConfig { - return self.conf + return &self.NetworkConfig } // newConn adds a new connection to the network @@ -495,6 +491,17 @@ func (self *Network) GetNode(id *adapters.NodeId) *Node { return self.getNode(id) } +func (self *Network) GetNodeByName(name string) *Node { + self.lock.Lock() + defer self.lock.Unlock() + for _, node := range self.Nodes { + if node.Config.Name == name { + return node + } + } + return nil +} + func (self *Network) GetNodes() []*Node { self.lock.Lock() defer self.lock.Unlock() diff --git a/p2p/testing/protocoltester.go b/p2p/testing/protocoltester.go index d0ea503e4d6a..16a23f120fa6 100644 --- a/p2p/testing/protocoltester.go +++ b/p2p/testing/protocoltester.go @@ -29,7 +29,7 @@ func NewProtocolTester(t *testing.T, id *adapters.NodeId, n int, run func(*p2p.P } adapter := adapters.NewSimAdapter(services) net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{}) - if err := net.NewNodeWithConfig(&adapters.NodeConfig{Id: id, Service: "test"}); err != nil { + if _, err := net.NewNodeWithConfig(&adapters.NodeConfig{Id: id, Service: "test"}); err != nil { panic(err.Error()) } if err := net.Start(id); err != nil { @@ -65,7 +65,7 @@ func NewProtocolTester(t *testing.T, id *adapters.NodeId, n int, run func(*p2p.P func (self *ProtocolTester) Connect(selfId *adapters.NodeId, peers ...*adapters.NodeConfig) { for _, peer := range peers { log.Trace(fmt.Sprintf("start node %v", peer.Id)) - if err := self.network.NewNodeWithConfig(peer); err != nil { + if _, err := self.network.NewNodeWithConfig(peer); err != nil { panic(fmt.Sprintf("error starting peer %v: %v", peer.Id, err)) } if err := self.network.Start(peer.Id); err != nil { diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go index 5137adf91f1a..51364dfb1538 100644 --- a/swarm/network/simulations/discovery/discovery_test.go +++ b/swarm/network/simulations/discovery/discovery_test.go @@ -68,17 +68,17 @@ func testDiscoverySimulation(t *testing.T, adapter adapters.NodeAdapter) { trigger := make(chan *adapters.NodeId) ids := make([]*adapters.NodeId, nodeCount) for i := 0; i < nodeCount; i++ { - conf, err := net.NewNode() + node, err := net.NewNode() if err != nil { - t.Fatalf("error starting node %s: %s", conf.Id.Label(), err) + t.Fatalf("error starting node %s: %s", node.ID().Label(), err) } - if err := net.Start(conf.Id); err != nil { - t.Fatalf("error starting node %s: %s", conf.Id.Label(), err) + if err := net.Start(node.ID()); err != nil { + t.Fatalf("error starting node %s: %s", node.ID().Label(), err) } - if err := triggerChecks(trigger, net, conf.Id); err != nil { - t.Fatal("error triggering checks for node %s: %s", conf.Id.Label(), err) + if err := triggerChecks(trigger, net, node.ID()); err != nil { + t.Fatal("error triggering checks for node %s: %s", node.ID().Label(), err) } - ids[i] = conf.Id + ids[i] = node.ID() } // run a simulation which connects the 10 nodes in a ring and waits diff --git a/swarm/network/simulations/overlay.go b/swarm/network/simulations/overlay.go index 0e1e81f32bb0..69b42cbd928e 100644 --- a/swarm/network/simulations/overlay.go +++ b/swarm/network/simulations/overlay.go @@ -96,11 +96,11 @@ func mocker(net *simulations.Network) { ids := make([]*adapters.NodeId, 10) for i := 0; i < 10; i++ { - conf, err := net.NewNode() + node, err := net.NewNode() if err != nil { panic(err.Error()) } - ids[i] = conf.Id + ids[i] = node.ID() } for _, id := range ids {