From 584f8b8cf4d979dd91a348c804d6d9e67505d248 Mon Sep 17 00:00:00 2001 From: Lewis Marshall Date: Sun, 7 May 2017 12:40:24 +0100 Subject: [PATCH 1/2] p2p/simulations: Refactor events Signed-off-by: Lewis Marshall --- p2p/simulations/adapters/types.go | 17 ++ p2p/simulations/events.go | 61 +++++ p2p/simulations/http.go | 30 +- p2p/simulations/journal.go | 50 ++-- p2p/simulations/journal_test.go | 34 +-- p2p/simulations/mocker.go | 33 +-- p2p/simulations/network.go | 259 ++++-------------- p2p/simulations/sim_events.go | 137 --------- p2p/simulations/simulation.go | 6 +- .../simulations/discovery/discovery_test.go | 1 - 10 files changed, 209 insertions(+), 419 deletions(-) create mode 100644 p2p/simulations/events.go delete mode 100644 p2p/simulations/sim_events.go diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index aa08301e86ff..5e9e34ce8d53 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -93,6 +93,23 @@ func (self *NodeId) Label() string { return self.String()[:4] } +func (self *NodeId) MarshalJSON() ([]byte, error) { + return json.Marshal(hex.EncodeToString(self.NodeID[:])) +} + +func (self *NodeId) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + id, err := discover.HexID(s) + if err != nil { + return err + } + self.NodeID = id + return nil +} + // NodeConfig is the configuration used to start a node in a simulation // network type NodeConfig struct { diff --git a/p2p/simulations/events.go b/p2p/simulations/events.go new file mode 100644 index 000000000000..8e8168c34b85 --- /dev/null +++ b/p2p/simulations/events.go @@ -0,0 +1,61 @@ +package simulations + +import ( + "fmt" + "time" +) + +type EventType string + +const ( + EventTypeNode EventType = "node" + EventTypeConn EventType = "conn" + EventTypeMsg EventType = "msg" +) + +type Event struct { + Type EventType `json:"type"` + Time time.Time `json:"time"` + Control bool `json:"control"` + + Node *Node `json:"node,omitempty"` + Conn *Conn `json:"conn,omitempty"` + Msg *Msg `json:"msg,omitempty"` +} + +func NewEvent(v interface{}) *Event { + event := &Event{Time: time.Now()} + switch v := v.(type) { + case *Node: + event.Type = EventTypeNode + event.Node = v + case *Conn: + event.Type = EventTypeConn + event.Conn = v + case *Msg: + event.Type = EventTypeMsg + event.Msg = v + default: + panic(fmt.Sprintf("invalid event type: %T", v)) + } + return event +} + +func ControlEvent(v interface{}) *Event { + event := NewEvent(v) + event.Control = true + return event +} + +func (e *Event) String() string { + switch e.Type { + case EventTypeNode: + return fmt.Sprintf(" id: %s up: %t", e.Node.ID().Label(), e.Node.Up) + case EventTypeConn: + return fmt.Sprintf(" nodes: %s->%s up: %t", e.Conn.One.Label(), e.Conn.Other.Label(), e.Conn.Up) + case EventTypeMsg: + return fmt.Sprintf(" nodes: %s->%s code: %d, received: %t", e.Msg.One.Label(), e.Msg.Other.Label(), e.Msg.Code, e.Msg.Received) + default: + return "" + } +} diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 2d3acd9b0686..7809830d4072 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -118,7 +118,8 @@ func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { } func (s *Server) streamNetworkEvents(network *Network, w http.ResponseWriter) { - sub := network.events.Subscribe(ConnectivityAllEvents...) + events := make(chan *Event) + sub := network.events.Subscribe(events) defer sub.Unsubscribe() // stop the stream if the client goes away @@ -140,23 +141,20 @@ func (s *Server) streamNetworkEvents(network *Network, w http.ResponseWriter) { } } - w.Header().Set("Content-Type", "text/event-stream") - ch := sub.Chan() + w.Header().Set("Content-Type", "text/event-stream; charset=utf-8") + w.WriteHeader(http.StatusOK) + if fw, ok := w.(http.Flusher); ok { + fw.Flush() + } for { select { - case event := <-ch: - // convert the event to a SimUpdate - update, err := NewSimUpdate(event) - if err != nil { - write("error", err.Error()) - return - } - data, err := json.Marshal(update) + case event := <-events: + data, err := json.Marshal(event) if err != nil { write("error", err.Error()) return } - write("simupdate", string(data)) + write("network", string(data)) case <-clientGone: return } @@ -198,7 +196,7 @@ func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) - if err := network.Start(node.Id); err != nil { + if err := network.Start(node.ID()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -210,7 +208,7 @@ func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) - if err := network.Stop(node.Id); err != nil { + if err := network.Stop(node.ID()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -223,7 +221,7 @@ func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { node := req.Context().Value("node").(*Node) peer := req.Context().Value("peer").(*Node) - if err := network.Connect(node.Id, peer.Id); err != nil { + if err := network.Connect(node.ID(), peer.ID()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -236,7 +234,7 @@ func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { node := req.Context().Value("node").(*Node) peer := req.Context().Value("peer").(*Node) - if err := network.Disconnect(node.Id, peer.Id); err != nil { + if err := network.Disconnect(node.ID(), peer.ID()); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } diff --git a/p2p/simulations/journal.go b/p2p/simulations/journal.go index 4d232e3d29cf..a827d3957e66 100644 --- a/p2p/simulations/journal.go +++ b/p2p/simulations/journal.go @@ -16,12 +16,13 @@ import ( // (using event.TypeMux). Network components POST events to the TypeMux, which then is // read by the journal. Each journal belongs to a subscription. type Journal struct { - Id string + Id string `json:"id"` + Events []*Event `json:"events"` + lock sync.Mutex counter int cursor int quitc chan bool - Events []*event.TypeMuxEvent } // NewJournal constructor @@ -34,19 +35,19 @@ func NewJournal() *Journal { return &Journal{quitc: make(chan bool)} } -// Subscribe takes an event.TypeMux and subscibes to types -// and launches a gorourine that appends any new event to the event log -// used for journalling history of a network +// Subscribe subscribes to an event.Feed and launches a gorourine that appends +// any new event to the event log used for journalling history of a network // the goroutine terminates when the journal is closed -func (self *Journal) Subscribe(eventer *event.TypeMux, types ...interface{}) { +func (self *Journal) Subscribe(feed *event.Feed) { log.Info("subscribe") - sub := eventer.Subscribe(types...) + events := make(chan *Event) + sub := feed.Subscribe(events) go func() { defer sub.Unsubscribe() for { select { - case ev := <-sub.Chan(): - self.append(ev) + case event := <-events: + self.append(event) case <-self.quitc: return } @@ -75,11 +76,12 @@ func NewJournalFromJSON(b []byte) (*Journal, error) { // params: // * acc: using acceleration factor acc // * journal: journal to use -// * eventer: where to post the replayed events -func Replay(acc float64, j *Journal, eventer *event.TypeMux) { - f := func(d interface{}) bool { +// * feed: where to post the replayed events +func Replay(acc float64, j *Journal, feed *event.Feed) { + f := func(event *Event) bool { // reposts the data with the eventer (the data receives a new timestamp) - eventer.Post(d) + event.Time = time.Now() + feed.Send(event) return true } j.TimedRead(acc, f) @@ -97,10 +99,10 @@ func (self *Journal) Close() { close(self.quitc) } -func (self *Journal) append(evs ...*event.TypeMuxEvent) { +func (self *Journal) append(events ...*Event) { self.lock.Lock() defer self.lock.Unlock() - self.Events = append(self.Events, evs...) + self.Events = append(self.Events, events...) self.counter++ } @@ -116,7 +118,7 @@ func (self *Journal) WaitEntries(n int) { } } -func (self *Journal) Read(f func(*event.TypeMuxEvent) bool) (read int) { +func (self *Journal) Read(f func(*Event) bool) (read int) { self.lock.Lock() defer self.lock.Unlock() ok := true @@ -139,20 +141,20 @@ func (self *Journal) Read(f func(*event.TypeMuxEvent) bool) (read int) { // NOTE: the events' timestamps are supposed to be strictly ordered otherwise // the call panics. // acc is an acceleration factor -func (self *Journal) TimedRead(acc float64, f func(interface{}) bool) (read int) { +func (self *Journal) TimedRead(acc float64, f func(*Event) bool) (read int) { var lastEvent time.Time timer := time.NewTimer(0) - var data interface{} - h := func(ev *event.TypeMuxEvent) bool { + var event *Event + h := func(e *Event) bool { // wait for the interval time passes event time - if ev.Time.Before(lastEvent) { + if e.Time.Before(lastEvent) { panic("events not ordered") } - interval := ev.Time.Sub(lastEvent) + interval := e.Time.Sub(lastEvent) log.Trace(fmt.Sprintf("reset timer to interval %v", interval)) timer.Reset(time.Duration(acc) * interval) - lastEvent = ev.Time - data = ev.Data + lastEvent = e.Time + event = e return false } var n int @@ -168,7 +170,7 @@ func (self *Journal) TimedRead(acc float64, f func(interface{}) bool) (read int) } } read += n - if n == 0 || !f(data) { + if n == 0 || !f(event) { log.Trace(fmt.Sprintf("timed read ends (read %v entries)", read)) break } diff --git a/p2p/simulations/journal_test.go b/p2p/simulations/journal_test.go index 8a6890a667f6..1c25ea79002c 100644 --- a/p2p/simulations/journal_test.go +++ b/p2p/simulations/journal_test.go @@ -10,16 +10,14 @@ import ( "github.com/ethereum/go-ethereum/p2p/simulations/adapters" ) -func testEvents(intervals ...int) (events []*event.TypeMuxEvent) { +func testEvents(intervals ...int) (events []*Event) { t := time.Now() for _, interval := range intervals { t = t.Add(time.Duration(interval) * time.Millisecond) - events = append(events, &event.TypeMuxEvent{ + events = append(events, &Event{ + Type: EventTypeNode, Time: t, - Data: interface{}(&NodeEvent{ - Type: "node", - Action: "down", - }), + Node: &Node{Up: false}, }) } return events @@ -33,8 +31,7 @@ func TestTimedRead(t *testing.T) { var i int acc := 0.5 length := 4 - f := func(data interface{}) bool { - _ = data.(*NodeEvent) + f := func(event *Event) bool { newTimes = append(newTimes, time.Now()) i++ return i <= length @@ -68,10 +65,15 @@ func testIDs() (ids []*adapters.NodeId) { } func testJournal(ids []*adapters.NodeId) *Journal { - eventer := &event.TypeMux{} + eventer := &event.Feed{} journal := NewJournal() - journal.Subscribe(eventer, ConnectivityEvents...) - mockNewNodes(eventer, ids) + journal.Subscribe(eventer) + for _, id := range ids { + eventer.Send(&Event{ + Type: EventTypeNode, + Node: &Node{Config: &adapters.NodeConfig{Id: id}, Up: true}, + }) + } journal.WaitEntries(len(ids)) return journal } @@ -80,7 +82,7 @@ func TestSubscribe(t *testing.T) { ids := testIDs() journal := testJournal(ids) for i, ev := range journal.Events { - id := ev.Data.(*NodeEvent).node.Id + id := ev.Node.ID() if id != ids[i] { t.Fatalf("incorrect id: expected %v, got %v", id, ids[i]) } @@ -115,15 +117,15 @@ func TestLoadSave(t *testing.T) { func TestReplay(t *testing.T) { _, jo := loadTestJournal(t) - eventer := &event.TypeMux{} + eventer := &event.Feed{} journal := NewJournal() - journal.Subscribe(eventer, ConnectivityEvents...) + journal.Subscribe(eventer) Replay(0, jo, eventer) for i, ev := range jo.Events { - exp := ev.Data.(*NodeEvent).String() - got := journal.Events[i].Data.(*NodeEvent).String() + exp := ev.String() + got := journal.Events[i].String() if exp != got { t.Fatalf("incorrent replayed journal entry at pos %v: expected %v, got %v", i, exp, got) } diff --git a/p2p/simulations/mocker.go b/p2p/simulations/mocker.go index 05d5a2e0170f..8f03c801d62a 100644 --- a/p2p/simulations/mocker.go +++ b/p2p/simulations/mocker.go @@ -57,7 +57,7 @@ func DefaultMockerConfig() *MockerConfig { // to the eventer // The journal using the eventer can then be read to visualise or // drive connections -func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConfig) { +func MockEvents(eventer *event.Feed, ids []*adapters.NodeId, conf *MockerConfig) { var onNodes []*Node offNodes := ids @@ -105,22 +105,15 @@ func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConf for i := 0; len(onNodes) > 0 && i < nodesDown; i++ { c := rand.Intn(len(onNodes)) sn := onNodes[c] - err := eventer.Post(sn.EmitEvent(ControlEvent)) - if err != nil { - panic(err.Error()) - } + eventer.Send(ControlEvent(sn)) onNodes = append(onNodes[0:c], onNodes[c+1:]...) - offNodes = append(offNodes, sn.Id) + offNodes = append(offNodes, sn.ID()) } var mustconnect []int for i := 0; len(offNodes) > 0 && i < nodesUp; i++ { c := rand.Intn(len(offNodes)) - sn := &Node{} - sn.Id = offNodes[c] - err := eventer.Post(sn.EmitEvent(ControlEvent)) - if err != nil { - panic(err.Error()) - } + sn := &Node{Config: &adapters.NodeConfig{Id: offNodes[c]}} + eventer.Send(ControlEvent(sn)) mustconnect = append(mustconnect, len(onNodes)) onNodes = append(onNodes, sn) offNodes = append(offNodes[0:c], offNodes[c+1:]...) @@ -145,7 +138,7 @@ func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConf m := n + rand.Intn(len(onNodes)-n) // m := n + 1 + rand.Intn(len(onNodes)-n-1) for k := m; k < len(onNodes); k++ { - lab := ConnLabel(onNodes[n].Id, onNodes[k].Id) + lab := ConnLabel(onNodes[n].ID(), onNodes[k].ID()) var j int j, found = onConnsMap[lab] if found { @@ -157,8 +150,8 @@ func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConf break } connected[k] = true - caller := onNodes[n].Id - callee := onNodes[k].Id + caller := onNodes[n].ID() + callee := onNodes[k].ID() sc := &Conn{ One: caller, @@ -176,10 +169,7 @@ func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConf lab := ConnLabel(sc.One, sc.Other) onConnsMap[lab] = len(onConns) onConns = append(onConns, sc) - err := eventer.Post(sc.EmitEvent(ControlEvent)) - if err != nil { - panic(err.Error()) - } + eventer.Send(ControlEvent(sc)) } for i := 0; len(onConns) > 0 && i < connsDown; i++ { @@ -188,10 +178,7 @@ func MockEvents(eventer *event.TypeMux, ids []*adapters.NodeId, conf *MockerConf onConns = append(onConns[0:c], onConns[c+1:]...) lab := ConnLabel(conn.One, conn.Other) delete(onConnsMap, lab) - err := eventer.Post(conn.EmitEvent(ControlEvent)) - if err != nil { - panic(err.Error()) - } + eventer.Send(ControlEvent(conn)) } rounds++ } diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index e24c9b5cef41..b90f367b890d 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -56,12 +56,6 @@ type NetworkControl interface { Subscribe(*event.TypeMux, ...interface{}) } -// event types related to connectivity, i.e., nodes coming on dropping off -// and connections established and dropped -var ConnectivityControlEvents = []interface{}{&NodeControlEvent{}, &ConnControlEvent{}, &MsgControlEvent{}} -var ConnectivityLiveEvents = []interface{}{&NodeEvent{}, &ConnEvent{}, &MsgEvent{}} -var ConnectivityAllEvents = append(ConnectivityControlEvents, ConnectivityLiveEvents...) - // Network models a p2p network // the actual logic of bringing nodes and connections up and down and // messaging is implemented in the particular NodeAdapter interface @@ -69,7 +63,7 @@ type Network struct { nodeAdapter adapters.NodeAdapter // input trigger events and other events - events *event.TypeMux // generated events a journal can subsribe to + events event.Feed // generated events a journal can subsribe to lock sync.RWMutex nodeMap map[discover.NodeID]int connMap map[string]int @@ -83,125 +77,85 @@ func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network return &Network{ nodeAdapter: nodeAdapter, conf: conf, - events: &event.TypeMux{}, nodeMap: make(map[discover.NodeID]int), connMap: make(map[string]int), quitc: make(chan bool), } } -// Subscribe takes an event.TypeMux and subscibes to types -// and launches a goroutine that reads control events from an eventer Subsription channel -// and executes the events -func (self *Network) Subscribe(eventer *event.TypeMux, types ...interface{}) { +// Subscribe reads control events from a channel and executes them +func (self *Network) Subscribe(events chan *Event) { log.Info("subscribe") - sub := eventer.Subscribe(types...) - go func() { - defer sub.Unsubscribe() - for { - select { - case ev := <-sub.Chan(): - self.execute(ev) - case <-self.quitc: + for { + select { + case event, ok := <-events: + if !ok { return } + if event.Control { + self.executeControlEvent(event) + } + case <-self.quitc: + return } - }() + } } -func (self *Network) executeNodeEvent(ne *NodeControlEvent) { - if ne.Up { - err := self.NewNodeWithConfig(&ne.Node.NodeConfig) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) - } - err = self.Start(ne.Node.Id) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) +func (self *Network) executeControlEvent(event *Event) { + log.Trace("execute control event", "type", event.Type, "event", event) + switch event.Type { + case EventTypeNode: + if err := self.executeNodeEvent(event); err != nil { + log.Error("error executing node event", "event", event, "err", err) } - } else { - err := self.Stop(ne.Node.Id) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ne, err)) + case EventTypeConn: + if err := self.executeConnEvent(event); err != nil { + log.Error("error executing conn event", "event", event, "err", err) } + case EventTypeMsg: + log.Warn("ignoring control msg event") } - ne.Node.controlFired = ne.Up } -func (self *Network) executeConnEvent(ce *ConnControlEvent) { - if ce.Up { - err := self.Connect(ce.Connection.One, ce.Connection.Other) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ce, err)) - } - } else { - err := self.Disconnect(ce.Connection.One, ce.Connection.Other) - if err != nil { - log.Trace(fmt.Sprintf("error execute event %v: %v", ce, err)) - } +func (self *Network) executeNodeEvent(e *Event) error { + if !e.Node.Up { + return self.Stop(e.Node.ID()) } - ce.Connection.controlFired = ce.Up -} -func (self *Network) execute(in *event.TypeMuxEvent) { - log.Trace(fmt.Sprintf("execute event %v", in)) - ev := in.Data - if ne, ok := ev.(*NodeEvent); ok { - if ne.Up && ne.Node.controlFired || (!ne.Up && !ne.Node.controlFired) { - log.Trace(fmt.Sprintf("Got NodeEvent %v, but Control Event has already been applied for : %v", ne, ne.Node)) - //ignore this real event; control event already took care of this - } else { - self.executeNodeEvent(ne.ToControlEvent()) - } - } else if ce, ok := ev.(*ConnEvent); ok { - if ce.Up && ce.Connection.controlFired || (!ce.Up && !ce.Connection.controlFired) { - log.Trace(fmt.Sprintf("Got ConnEvent %v, but Control Event has already been applied for : %v", ce, ce.Connection)) - //ignore this real event; control event already took care of this - } else { - self.executeConnEvent(ce.ToControlEvent()) - } + if err := self.NewNodeWithConfig(e.Node.Config); err != nil { + return err } - if ne, ok := ev.(*NodeControlEvent); ok { - self.executeNodeEvent(ne) - } else if ce, ok := ev.(*ConnControlEvent); ok { - self.executeConnEvent(ce) + return self.Start(e.Node.ID()) +} + +func (self *Network) executeConnEvent(e *Event) error { + if e.Conn.Up { + return self.Connect(e.Conn.One, e.Conn.Other) } else { - log.Trace(fmt.Sprintf("event: %#v", ev)) - panic("unhandled event") + return self.Disconnect(e.Conn.One, e.Conn.Other) } } // Events returns the output eventer of the Network. -func (self *Network) Events() *event.TypeMux { - return self.events -} - -type EventType int - -const ( - ControlEvent EventType = iota - LiveEvent -) - -type EventEmitter interface { - EmitEvent() -} - -type LiveEventer interface { - ToControlEvent() +func (self *Network) Events() *event.Feed { + return &self.events } type Node struct { - adapters.Node - adapters.NodeConfig + adapters.Node `json:"-"` - Up bool + Config *adapters.NodeConfig `json:"config"` + Up bool `json:"up"` controlFired bool } +func (self *Node) ID() *adapters.NodeId { + return self.Config.Id +} + func (self *Node) String() string { - return fmt.Sprintf("Node %v", self.Id.Label()) + return fmt.Sprintf("Node %v", self.ID().Label()) } // active connections are represented by the Node entry object so that @@ -236,100 +190,6 @@ func (self *Msg) String() string { return fmt.Sprintf("Msg(%d) %v->%v", self.Code, self.One.Label(), self.Other.Label()) } -type NodeEvent struct { - Node *Node - Up bool -} - -type ConnEvent struct { - Connection *Conn - Up bool - Reverse bool -} - -type MsgEvent struct { - Message *Msg -} - -type NodeControlEvent struct { - *NodeEvent -} - -type ConnControlEvent struct { - *ConnEvent -} - -type MsgControlEvent struct { - *MsgEvent -} - -func (self *NodeEvent) String() string { - return fmt.Sprintf("\n", self.Up, self.Node) -} - -func (self *ConnEvent) String() string { - return fmt.Sprintf("\n", self.Up, self.Reverse, self.Connection) -} - -func (self *MsgEvent) String() string { - return fmt.Sprintf("\n", self.Message) -} - -func (self *Node) EmitEvent(eventType EventType) interface{} { - evt := &NodeEvent{ - Node: self, - Up: self.Up, - } - if eventType == ControlEvent { - return &NodeControlEvent{ - evt, - } - } else { - return evt - } -} - -func (self *Conn) EmitEvent(eventType EventType) interface{} { - evt := &ConnEvent{ - Connection: self, - Up: self.Up, - Reverse: self.Reverse, - } - - if eventType == ControlEvent { - return &ConnControlEvent{ - evt, - } - } else { - return evt - } -} - -func (self *Msg) EmitEvent(eventType EventType) interface{} { - evt := &MsgEvent{ - Message: self, - } - if eventType == ControlEvent { - return &MsgControlEvent{ - evt, - } - } else { - return evt - } -} - -func (self *MsgEvent) ToControlEvent() *MsgControlEvent { - return &MsgControlEvent{self} -} - -func (self *ConnEvent) ToControlEvent() *ConnControlEvent { - return &ConnControlEvent{self} -} - -func (self *NodeEvent) ToControlEvent() *NodeControlEvent { - return &NodeControlEvent{self} -} - // NewNode adds a new node to the network with a random ID func (self *Network) NewNode() (*adapters.NodeConfig, error) { conf := adapters.RandomNodeConfig() @@ -361,11 +221,12 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) error { return err } node := &Node{ - Node: adapterNode, - NodeConfig: *conf, + Node: adapterNode, + Config: conf, } self.Nodes = append(self.Nodes, node) log.Trace(fmt.Sprintf("node %v created", id)) + self.events.Send(ControlEvent(node)) return nil } @@ -418,7 +279,7 @@ func (self *Network) Start(id *adapters.NodeId) error { node.Up = true log.Info(fmt.Sprintf("started node %v: %v", id, node.Up)) - self.events.Post(node.EmitEvent(ControlEvent)) + self.events.Send(NewEvent(node)) // subscribe to peer events client, err := node.Client() @@ -485,7 +346,7 @@ func (self *Network) Stop(id *adapters.NodeId) error { node.Up = false log.Info(fmt.Sprintf("stop node %v: %v", id, node.Up)) - self.events.Post(node.EmitEvent(ControlEvent)) + self.events.Send(ControlEvent(node)) return nil } @@ -526,7 +387,7 @@ func (self *Network) Connect(oneId, otherId *adapters.NodeId) error { if err != nil { return err } - self.events.Post(conn.EmitEvent(ControlEvent)) + self.events.Send(ControlEvent(conn)) return client.Call(nil, "admin_addPeer", string(addr)) } @@ -560,7 +421,7 @@ func (self *Network) Disconnect(oneId, otherId *adapters.NodeId) error { if err != nil { return err } - self.events.Post(conn.EmitEvent(ControlEvent)) + self.events.Send(ControlEvent(conn)) return client.Call(nil, "admin_removePeer", string(addr)) } @@ -575,7 +436,7 @@ func (self *Network) DidConnect(one, other *adapters.NodeId) error { conn.Reverse = conn.One.NodeID != one.NodeID conn.Up = true // connection event posted - self.events.Post(conn.EmitEvent(LiveEvent)) + self.events.Send(NewEvent(conn)) return nil } @@ -589,7 +450,7 @@ func (self *Network) DidDisconnect(one, other *adapters.NodeId) error { } conn.Reverse = conn.One.NodeID != one.NodeID conn.Up = false - self.events.Post(conn.EmitEvent(LiveEvent)) + self.events.Send(NewEvent(conn)) return nil } @@ -601,7 +462,7 @@ func (self *Network) Send(senderid, receiverid *adapters.NodeId, msgcode uint64, Code: msgcode, } //self.GetNode(senderid).na.(*adapters.SimNode).GetPeer(receiverid).SendMsg(msgcode, protomsg) // phew! - self.events.Post(msg.EmitEvent(ControlEvent)) + self.events.Send(ControlEvent(msg)) } func (self *Network) DidSend(sender, receiver *adapters.NodeId, msgcode uint64) error { @@ -611,7 +472,7 @@ func (self *Network) DidSend(sender, receiver *adapters.NodeId, msgcode uint64) Code: msgcode, Received: false, } - self.events.Post(msg.EmitEvent(LiveEvent)) + self.events.Send(NewEvent(msg)) return nil } @@ -622,7 +483,7 @@ func (self *Network) DidReceive(sender, receiver *adapters.NodeId, msgcode uint6 Code: msgcode, Received: true, } - self.events.Post(msg.EmitEvent(LiveEvent)) + self.events.Send(NewEvent(msg)) return nil } @@ -697,9 +558,9 @@ func (self *Network) Shutdown() { // stop all nodes for _, node := range self.Nodes { - log.Debug(fmt.Sprintf("stopping node %s", node.Id.Label())) + log.Debug(fmt.Sprintf("stopping node %s", node.ID().Label())) if err := node.Stop(); err != nil { - log.Warn(fmt.Sprintf("error stopping node %s", node.Id.Label()), "err", err) + log.Warn(fmt.Sprintf("error stopping node %s", node.ID().Label()), "err", err) } } } diff --git a/p2p/simulations/sim_events.go b/p2p/simulations/sim_events.go deleted file mode 100644 index e17091a22fb5..000000000000 --- a/p2p/simulations/sim_events.go +++ /dev/null @@ -1,137 +0,0 @@ -package simulations - -import ( - "fmt" - - "github.com/ethereum/go-ethereum/event" -) - -// TODO: to implement simulation global behav -type SimConfig struct { -} - -type SimData struct { - Id string `json:"id"` - Source string `json:"source,omitempty"` - Target string `json:"target,omitempty"` - Up bool `json:"up"` -} - -type SimElement struct { - Data *SimData `json:"data"` - Classes string `json:"classes,omitempty"` - Group string `json:"group"` - Control bool `json:"control"` - // selected: false, // whether the element is selected (default false) - // selectable: true, // whether the selection state is mutable (default true) - // locked: false, // when locked a node's position is immutable (default false) - // grabbable: true, // whether the node can be grabbed and moved by the user -} - -type SimUpdate struct { - Add []*SimElement `json:"add"` - Remove []*SimElement `json:"remove"` - Message []*SimElement `json:"message"` -} - -func NewSimUpdate(e *event.TypeMuxEvent) (*SimUpdate, error) { - var update SimUpdate - var el *SimElement - entry := e.Data - - switch entry.(type) { - case *NodeControlEvent, *NodeEvent: - var data *SimData - var control bool - nce, ok := entry.(*NodeControlEvent) - if ok { - data = &SimData{Id: nce.Node.Id.String()} - data.Up = nce.Up - control = true - } else { - ne := entry.(*NodeEvent) - data = &SimData{Id: ne.Node.Id.String()} - data.Up = ne.Up - control = false - } - el = &SimElement{Group: "nodes", Data: data} - el.Control = control - if el.Data.Up { - update.Add = append(update.Add, el) - } else { - update.Remove = append(update.Remove, el) - } - case *MsgControlEvent, *MsgEvent: - var control bool - var msg *Msg - mce, ok := entry.(*MsgControlEvent) - if ok { - msg = mce.Message - control = true - } else { - me := entry.(*MsgEvent) - msg = me.Message - control = false - } - id := ConnLabel(msg.One, msg.Other) - var source, target string - source = msg.One.String() - target = msg.Other.String() - el = &SimElement{Group: "msgs", Data: &SimData{Id: id, Source: source, Target: target}} - el.Data.Up = true - el.Control = control - update.Message = append(update.Message, el) - case *ConnControlEvent, *ConnEvent: - var control bool - var conn *Conn - var up bool - cce, ok := entry.(*ConnControlEvent) - if ok { - conn = cce.Connection - up = cce.Up - control = true - } else { - ce := entry.(*ConnEvent) - conn = ce.Connection - up = ce.Up - control = false - } - // mutually exclusive directed edge (caller -> callee) - id := ConnLabel(conn.One, conn.Other) - var source, target string - if conn.Reverse { - source = conn.Other.String() - target = conn.One.String() - } else { - source = conn.One.String() - target = conn.Other.String() - } - el = &SimElement{Group: "edges", Data: &SimData{Id: id, Source: source, Target: target}} - el.Control = control - el.Data.Up = up - if up { - update.Add = append(update.Add, el) - } else { - update.Remove = append(update.Remove, el) - } - default: - return nil, fmt.Errorf("unknown event type: %T", entry) - } - - return &update, nil -} - -func UpdateSim(conf *SimConfig, j *Journal) (*SimUpdate, error) { - var update SimUpdate - j.Read(func(e *event.TypeMuxEvent) bool { - u, err := NewSimUpdate(e) - if err != nil { - panic(err.Error()) - } - update.Add = append(update.Add, u.Add...) - update.Remove = append(update.Remove, u.Remove...) - update.Message = append(update.Message, u.Message...) - return true - }) - return &update, nil -} diff --git a/p2p/simulations/simulation.go b/p2p/simulations/simulation.go index f38d1701f4fc..008905fe9ec1 100644 --- a/p2p/simulations/simulation.go +++ b/p2p/simulations/simulation.go @@ -68,11 +68,11 @@ func (s *Simulation) Run(ctx context.Context, step *Step) (result *StepResult) { func (s *Simulation) watchNetwork(result *StepResult) func() { stop := make(chan struct{}) done := make(chan struct{}) - sub := s.network.Events().Subscribe(ConnectivityAllEvents...) + events := make(chan *Event) + sub := s.network.Events().Subscribe(events) go func() { defer close(done) defer sub.Unsubscribe() - events := sub.Chan() for { select { case event := <-events: @@ -128,5 +128,5 @@ type StepResult struct { Passes map[*adapters.NodeId]time.Time // NetworkEvents are the network events which occurred during the step - NetworkEvents []interface{} + NetworkEvents []*Event } diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go index 5a5ec53f37d1..5137adf91f1a 100644 --- a/swarm/network/simulations/discovery/discovery_test.go +++ b/swarm/network/simulations/discovery/discovery_test.go @@ -62,7 +62,6 @@ func testDiscoverySimulation(t *testing.T, adapter adapters.NodeAdapter) { nodeCount := 10 net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{ Id: "0", - Backend: true, DefaultService: serviceName, }) defer net.Shutdown() From cf462b7781974ae21794d00870d73f6831437aad Mon Sep 17 00:00:00 2001 From: Lewis Marshall Date: Sun, 7 May 2017 00:53:21 +0100 Subject: [PATCH 2/2] p2p/simulations: Add p2psim command-line API client Signed-off-by: Lewis Marshall --- p2p/simulations/adapters/exec.go | 14 +- p2p/simulations/adapters/inproc.go | 23 +- p2p/simulations/adapters/rpc_mux.go | 218 +++++++++ p2p/simulations/adapters/types.go | 46 +- p2p/simulations/cmd/p2psim/main.go | 425 ++++++++++++++++++ p2p/simulations/examples/p2psim.sh | 43 ++ p2p/simulations/http.go | 311 ++++++++++++- p2p/simulations/http_test.go | 347 ++++++++++++++ p2p/simulations/network.go | 77 ++-- p2p/testing/protocoltester.go | 4 +- .../simulations/discovery/discovery_test.go | 14 +- swarm/network/simulations/overlay.go | 4 +- 12 files changed, 1441 insertions(+), 85 deletions(-) create mode 100644 p2p/simulations/adapters/rpc_mux.go create mode 100644 p2p/simulations/cmd/p2psim/main.go create mode 100755 p2p/simulations/examples/p2psim.sh create mode 100644 p2p/simulations/http_test.go diff --git a/p2p/simulations/adapters/exec.go b/p2p/simulations/adapters/exec.go index fd31e5ef70a3..bb34a14cf780 100644 --- a/p2p/simulations/adapters/exec.go +++ b/p2p/simulations/adapters/exec.go @@ -97,6 +97,7 @@ type ExecNode struct { Info *p2p.NodeInfo client *rpc.Client + rpcMux *rpcMux newCmd func() *exec.Cmd key *ecdsa.PrivateKey } @@ -157,7 +158,8 @@ func (n *ExecNode) Start() (err error) { n.Cmd = cmd // create the RPC client and load the node info - n.client = rpc.NewClientWithConn(pipe2) + n.rpcMux = newRPCMux(pipe2) + n.client = n.rpcMux.Client() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var info p2p.NodeInfo @@ -222,6 +224,16 @@ func (n *ExecNode) NodeInfo() *p2p.NodeInfo { return info } +// ServeRPC serves RPC requests over the given connection using the node's +// RPC multiplexer +func (n *ExecNode) ServeRPC(conn net.Conn) error { + if n.rpcMux == nil { + return errors.New("RPC not started") + } + n.rpcMux.Serve(conn) + return nil +} + func init() { // register a reexec function to start a devp2p node when the current // binary is executed as "p2p-node" diff --git a/p2p/simulations/adapters/inproc.go b/p2p/simulations/adapters/inproc.go index fbabe9c5f7f4..481fe187689d 100644 --- a/p2p/simulations/adapters/inproc.go +++ b/p2p/simulations/adapters/inproc.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "net" "sync" "github.com/ethereum/go-ethereum/event" @@ -119,6 +120,7 @@ type SimNode struct { peers map[discover.NodeID]MsgReadWriteCloser peerFeed event.Feed client *rpc.Client + rpcMux *rpcMux // dropPeers is used to force peer disconnects when // the node is stopped @@ -146,6 +148,19 @@ func (self *SimNode) Client() (*rpc.Client, error) { return self.client, nil } +// ServeRPC serves RPC requests over the given connection using the node's +// RPC multiplexer +func (self *SimNode) ServeRPC(conn net.Conn) error { + self.lock.Lock() + mux := self.rpcMux + self.lock.Unlock() + if mux == nil { + return errors.New("RPC not started") + } + mux.Serve(conn) + return nil +} + // Start starts the RPC handler and the underlying service func (self *SimNode) Start() error { self.dropPeers = make(chan struct{}) @@ -203,8 +218,13 @@ func (self *SimNode) startRPC() error { } } + // create an in-process RPC multiplexer + pipe1, pipe2 := net.Pipe() + go handler.ServeCodec(rpc.NewJSONCodec(pipe1), rpc.OptionMethodInvocation|rpc.OptionSubscriptions) + self.rpcMux = newRPCMux(pipe2) + // create an in-process RPC client - self.client = rpc.DialInProc(handler) + self.client = self.rpcMux.Client() return nil } @@ -216,6 +236,7 @@ func (self *SimNode) stopRPC() { if self.client != nil { self.client.Close() self.client = nil + self.rpcMux = nil } } diff --git a/p2p/simulations/adapters/rpc_mux.go b/p2p/simulations/adapters/rpc_mux.go new file mode 100644 index 000000000000..4cae58c223f7 --- /dev/null +++ b/p2p/simulations/adapters/rpc_mux.go @@ -0,0 +1,218 @@ +// Copyright 2016 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package adapters + +import ( + "encoding/json" + "net" + "strconv" + "strings" + "sync" + + "github.com/ethereum/go-ethereum/rpc" +) + +// rpcMux is an RPC multiplexer which allows many clients to make RPC requests +// over a single connection by changing each request's ID to a unique value. +// +// This is used by node adapters so that simulations can create many RPC +// clients all sending requests over the underlying node's stdin / stdout. +type rpcMux struct { + conn net.Conn + + mtx sync.Mutex + idCounter uint64 + msgMap map[uint64]*rpcMsg + subMap map[string]*rpcReply + send chan *rpcMsg +} + +type rpcMsg struct { + Method string `json:"method,omitempty"` + Version string `json:"jsonrpc,omitempty"` + Id json.RawMessage `json:"id,omitempty"` + Payload json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error json.RawMessage `json:"error,omitempty"` + + id uint64 + reply *rpcReply +} + +// rpcSub is the payload or result of a subscription RPC message +type rpcSub struct { + Subscription string `json:"subscription"` + Result json.RawMessage `json:"result,omitempty"` +} + +// rpcReply receives replies to RPC messages for a particular client +type rpcReply struct { + ch chan *rpcMsg + closeOnce sync.Once +} + +func (r *rpcReply) close() { + r.closeOnce.Do(func() { close(r.ch) }) +} + +func newRPCMux(conn net.Conn) *rpcMux { + mux := &rpcMux{ + msgMap: make(map[uint64]*rpcMsg), + subMap: make(map[string]*rpcReply), + send: make(chan *rpcMsg), + } + go mux.sendLoop(conn) + go mux.recvLoop(conn) + return mux +} + +// Client creates a new RPC client which sends messages through the multiplexer +func (mux *rpcMux) Client() *rpc.Client { + pipe1, pipe2 := net.Pipe() + go mux.Serve(pipe1) + return rpc.NewClientWithConn(pipe2) +} + +// Serve reads RPC messages from the given connection, forwards them to the +// multiplexed connnection and writes replies back to the given connection +func (mux *rpcMux) Serve(conn net.Conn) { + // reply will receive replies to any messages we send + reply := &rpcReply{ch: make(chan *rpcMsg)} + defer func() { + // drain the channel to prevent blocking the recvLoop + for range reply.ch { + } + }() + + // start a goroutine to read RPC messages from the connection and + // forward them to the sendLoop + done := make(chan struct{}) + go func() { + defer close(done) + dec := json.NewDecoder(conn) + for { + msg := &rpcMsg{} + if err := dec.Decode(msg); err != nil { + return + } + msg.reply = reply + mux.send <- msg + } + }() + + // write message replies to the connection + enc := json.NewEncoder(conn) + for { + select { + case msg, ok := <-reply.ch: + if !ok { + return + } + if err := enc.Encode(msg); err != nil { + return + } + case <-done: + return + } + } +} + +// sendLoop receives messages from the send channel, changes their ID and +// writes them to the given connection +func (mux *rpcMux) sendLoop(conn net.Conn) { + enc := json.NewEncoder(conn) + for msg := range mux.send { + if err := enc.Encode(mux.newMsg(msg)); err != nil { + return + } + } +} + +// recvLoop reads messages from the given connection, changes their ID back +// to the oringal value and sends them to the message's reply channel +func (mux *rpcMux) recvLoop(conn net.Conn) { + // close all reply channels if we get an error + defer func() { + mux.mtx.Lock() + defer mux.mtx.Unlock() + for _, msg := range mux.msgMap { + msg.reply.close() + } + }() + + dec := json.NewDecoder(conn) + for { + msg := &rpcMsg{} + if err := dec.Decode(msg); err != nil { + return + } + if reply := mux.lookup(msg); reply != nil { + reply.ch <- msg + } + } +} + +// newMsg copies the given message and changes it's ID to a unique value +func (mux *rpcMux) newMsg(msg *rpcMsg) *rpcMsg { + mux.mtx.Lock() + defer mux.mtx.Unlock() + id := mux.idCounter + mux.idCounter++ + mux.msgMap[id] = msg + newMsg := *msg + newMsg.Id = json.RawMessage(strconv.FormatUint(id, 10)) + return &newMsg +} + +// lookup looks up the original message for which the given message is a reply +func (mux *rpcMux) lookup(msg *rpcMsg) *rpcReply { + mux.mtx.Lock() + defer mux.mtx.Unlock() + + // if the message has no ID, it is a subscription notification so + // lookup the original subscribe message + if msg.Id == nil { + sub := &rpcSub{} + if err := json.Unmarshal(msg.Payload, sub); err != nil { + return nil + } + return mux.subMap[sub.Subscription] + } + + // lookup the original message and restore the ID + id, err := strconv.ParseUint(string(msg.Id), 10, 64) + if err != nil { + return nil + } + origMsg, ok := mux.msgMap[id] + if !ok { + return nil + } + delete(mux.msgMap, id) + msg.Id = origMsg.Id + + // if the original message was a subscription, store the subscription + // ID so we can detect notifications + if strings.HasSuffix(string(origMsg.Method), "_subscribe") { + var result string + if err := json.Unmarshal(msg.Result, &result); err == nil { + mux.subMap[result] = origMsg.reply + } + } + + return origMsg.reply +} diff --git a/p2p/simulations/adapters/types.go b/p2p/simulations/adapters/types.go index 5e9e34ce8d53..4bfdded042b5 100644 --- a/p2p/simulations/adapters/types.go +++ b/p2p/simulations/adapters/types.go @@ -21,6 +21,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "net" "os" "github.com/docker/docker/pkg/reexec" @@ -46,6 +47,10 @@ type Node interface { // up and running Client() (*rpc.Client, error) + // ServeRPC serves RPC requests over the given connection using the + // node's RPC multiplexer + ServeRPC(net.Conn) error + // Start starts the node Start() error @@ -116,6 +121,9 @@ type NodeConfig struct { Id *NodeId PrivateKey *ecdsa.PrivateKey + // Name is a human friendly name for the node like "node01" + Name string + // Service is the name of the service which should be run when starting // the node (for SimNodes it should be the name of a service contained // in SimAdapter.services, for other nodes it should be a service @@ -128,15 +136,22 @@ type NodeConfig struct { type nodeConfigJSON struct { Id string `json:"id"` PrivateKey string `json:"private_key"` + Name string `json:"name"` Service string `json:"service"` } func (n *NodeConfig) MarshalJSON() ([]byte, error) { - return json.Marshal(nodeConfigJSON{ - n.Id.String(), - hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)), - n.Service, - }) + confJSON := nodeConfigJSON{ + Name: n.Name, + Service: n.Service, + } + if n.Id != nil { + confJSON.Id = n.Id.String() + } + if n.PrivateKey != nil { + confJSON.PrivateKey = hex.EncodeToString(crypto.FromECDSA(n.PrivateKey)) + } + return json.Marshal(confJSON) } func (n *NodeConfig) UnmarshalJSON(data []byte) error { @@ -145,18 +160,23 @@ func (n *NodeConfig) UnmarshalJSON(data []byte) error { return err } - nodeID, err := discover.HexID(confJSON.Id) - if err != nil { - return err + if confJSON.Id != "" { + nodeID, err := discover.HexID(confJSON.Id) + if err != nil { + return err + } + n.Id = &NodeId{NodeID: nodeID} } - n.Id = &NodeId{NodeID: nodeID} - key, err := hex.DecodeString(confJSON.PrivateKey) - if err != nil { - return err + if confJSON.PrivateKey != "" { + key, err := hex.DecodeString(confJSON.PrivateKey) + if err != nil { + return err + } + n.PrivateKey = crypto.ToECDSA(key) } - n.PrivateKey = crypto.ToECDSA(key) + n.Name = confJSON.Name n.Service = confJSON.Service return nil diff --git a/p2p/simulations/cmd/p2psim/main.go b/p2p/simulations/cmd/p2psim/main.go new file mode 100644 index 000000000000..b0b53a501633 --- /dev/null +++ b/p2p/simulations/cmd/p2psim/main.go @@ -0,0 +1,425 @@ +// p2psim provides a command-line client for a simulation API. +// +// Here is an example of creating a 2 node network with the first node +// connected to the second: +// +// $ p2psim network create +// Created network net1 +// +// $ p2psim node create net1 +// Created node01 +// +// $ p2psim node start net1 node01 +// Started node01 +// +// $ p2psim node create net1 +// Created node02 +// +// $ p2psim node start net1 node02 +// Started node02 +// +// $ p2psim node connect net1 node01 node02 +// Connected node01 to node02 +// +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "os" + "strings" + "text/tabwriter" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/simulations" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" + "gopkg.in/urfave/cli.v1" +) + +var client *simulations.Client + +func main() { + app := cli.NewApp() + app.Usage = "devp2p simulation command-line client" + app.Flags = []cli.Flag{ + cli.StringFlag{ + Name: "api", + Value: "http://localhost:8888", + Usage: "simulation API URL", + EnvVar: "P2PSIM_API_URL", + }, + } + app.Before = func(ctx *cli.Context) error { + client = simulations.NewClient(ctx.GlobalString("api")) + return nil + } + app.Commands = []cli.Command{ + { + Name: "network", + Usage: "manage simulation networks", + Action: listNetworks, + Subcommands: []cli.Command{ + { + Name: "list", + Usage: "list networks", + Action: listNetworks, + }, + { + Name: "create", + Usage: "create a network", + Action: createNetwork, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config", + Value: "{}", + Usage: "JSON encoded network config", + }, + }, + }, + { + Name: "show", + ArgsUsage: "", + Usage: "show network information", + Action: showNetwork, + }, + { + Name: "events", + ArgsUsage: "", + Usage: "stream network events", + Action: streamNetwork, + }, + }, + }, + { + Name: "node", + Usage: "manage simulation nodes", + Action: listNodes, + Subcommands: []cli.Command{ + { + Name: "list", + ArgsUsage: "", + Usage: "list nodes", + Action: listNodes, + }, + { + Name: "create", + ArgsUsage: "", + Usage: "create a node", + Action: createNode, + Flags: []cli.Flag{ + cli.StringFlag{ + Name: "config", + Value: "{}", + Usage: "JSON encoded node config", + }, + }, + }, + { + Name: "show", + ArgsUsage: " ", + Usage: "show node information", + Action: showNode, + }, + { + Name: "start", + ArgsUsage: " ", + Usage: "start a node", + Action: startNode, + }, + { + Name: "stop", + ArgsUsage: " ", + Usage: "stop a node", + Action: stopNode, + }, + { + Name: "connect", + ArgsUsage: " ", + Usage: "connect a node to a peer node", + Action: connectNode, + }, + { + Name: "disconnect", + ArgsUsage: " ", + Usage: "disconnect a node from a peer node", + Action: disconnectNode, + }, + { + Name: "rpc", + ArgsUsage: " []", + Usage: "call a node RPC method", + Action: rpcNode, + Flags: []cli.Flag{ + cli.BoolFlag{ + Name: "subscribe", + Usage: "method is a subscription", + }, + }, + }, + }, + }, + } + app.Run(os.Args) +} + +func listNetworks(ctx *cli.Context) error { + if len(ctx.Args()) != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networks, err := client.GetNetworks() + if err != nil { + return err + } + w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) + defer w.Flush() + fmt.Fprintf(w, "ID\tNODES\tCONNS\n") + for _, network := range networks { + fmt.Fprintf(w, "%s\t%d\t%d\n", network.Id, len(network.Nodes), len(network.Conns)) + } + return nil +} + +func createNetwork(ctx *cli.Context) error { + if len(ctx.Args()) != 0 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + config := &simulations.NetworkConfig{} + if err := json.Unmarshal([]byte(ctx.String("config")), config); err != nil { + return err + } + network, err := client.CreateNetwork(config) + if err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Created network", network.Id) + return nil +} + +func showNetwork(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + network, err := client.GetNetwork(networkID) + if err != nil { + return err + } + w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) + defer w.Flush() + fmt.Fprintf(w, "ID\t%s\n", network.Id) + fmt.Fprintf(w, "NODES\t%d\n", len(network.Nodes)) + fmt.Fprintf(w, "CONNS\t%d\n", len(network.Conns)) + return nil +} + +func streamNetwork(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + events := make(chan *simulations.Event) + sub, err := client.SubscribeNetwork(networkID, events) + if err != nil { + return err + } + defer sub.Unsubscribe() + enc := json.NewEncoder(ctx.App.Writer) + for { + select { + case event := <-events: + if err := enc.Encode(event); err != nil { + return err + } + case err := <-sub.Err(): + return err + } + } +} + +func listNodes(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodes, err := client.GetNodes(networkID) + if err != nil { + return err + } + w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) + defer w.Flush() + fmt.Fprintf(w, "NAME\tPROTOCOLS\tID\n") + for _, node := range nodes { + fmt.Fprintf(w, "%s\t%s\t%s\n", node.Name, strings.Join(protocolList(node), ","), node.ID) + } + return nil +} + +func protocolList(node *p2p.NodeInfo) []string { + protos := make([]string, 0, len(node.Protocols)) + for name := range node.Protocols { + protos = append(protos, name) + } + return protos +} + +func createNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 1 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + config := &adapters.NodeConfig{} + if err := json.Unmarshal([]byte(ctx.String("config")), config); err != nil { + return err + } + node, err := client.CreateNode(networkID, config) + if err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Created", node.Name) + return nil +} + +func showNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 2 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + node, err := client.GetNode(networkID, nodeName) + if err != nil { + return err + } + w := tabwriter.NewWriter(ctx.App.Writer, 1, 2, 2, ' ', 0) + defer w.Flush() + fmt.Fprintf(w, "NAME\t%s\n", node.Name) + fmt.Fprintf(w, "PROTOCOLS\t%s\n", strings.Join(protocolList(node), ",")) + fmt.Fprintf(w, "ID\t%s\n", node.ID) + fmt.Fprintf(w, "ENODE\t%s\n", node.Enode) + for name, proto := range node.Protocols { + fmt.Fprintln(w) + fmt.Fprintf(w, "--- PROTOCOL INFO: %s\n", name) + fmt.Fprintf(w, "%v\n", proto) + fmt.Fprintf(w, "---\n") + } + return nil +} + +func startNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 2 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + if err := client.StartNode(networkID, nodeName); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Started", nodeName) + return nil +} + +func stopNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 2 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + if err := client.StopNode(networkID, nodeName); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Stopped", nodeName) + return nil +} + +func connectNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 3 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + peerName := args[2] + if err := client.ConnectNode(networkID, nodeName, peerName); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Connected", nodeName, "to", peerName) + return nil +} + +func disconnectNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) != 3 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + peerName := args[2] + if err := client.DisconnectNode(networkID, nodeName, peerName); err != nil { + return err + } + fmt.Fprintln(ctx.App.Writer, "Disconnected", nodeName, "from", peerName) + return nil +} + +func rpcNode(ctx *cli.Context) error { + args := ctx.Args() + if len(args) < 3 { + return cli.ShowCommandHelp(ctx, ctx.Command.Name) + } + networkID := args[0] + nodeName := args[1] + method := args[2] + rpcClient, err := client.RPCClient(context.Background(), networkID, nodeName) + if err != nil { + return err + } + if ctx.Bool("subscribe") { + return rpcSubscribe(rpcClient, ctx.App.Writer, method, args[3:]...) + } + var result interface{} + params := make([]interface{}, len(args[3:])) + for i, v := range args[3:] { + params[i] = v + } + if err := rpcClient.Call(&result, method, params...); err != nil { + return err + } + return json.NewEncoder(ctx.App.Writer).Encode(result) +} + +func rpcSubscribe(client *rpc.Client, out io.Writer, method string, args ...string) error { + parts := strings.SplitN(method, "_", 2) + namespace := parts[0] + method = parts[1] + ch := make(chan interface{}) + subArgs := make([]interface{}, len(args)+1) + subArgs[0] = method + for i, v := range args { + subArgs[i+1] = v + } + sub, err := client.Subscribe(context.Background(), namespace, ch, subArgs...) + if err != nil { + return err + } + defer sub.Unsubscribe() + enc := json.NewEncoder(out) + for { + select { + case v := <-ch: + if err := enc.Encode(v); err != nil { + return err + } + case err := <-sub.Err(): + return err + } + } +} diff --git a/p2p/simulations/examples/p2psim.sh b/p2p/simulations/examples/p2psim.sh new file mode 100755 index 000000000000..7c83285cc78b --- /dev/null +++ b/p2p/simulations/examples/p2psim.sh @@ -0,0 +1,43 @@ +#!/bin/bash +# +# Start a network simulation using the API started by connectivity.go + +set -e + +main() { + if ! which p2psim &>/dev/null; then + fail "missing p2psim binary (you need to build p2p/simulations/cmd/p2psim)" + fi + + info "creating the example network" + p2psim network create --config '{"id": "example", "default_service": "ping-pong"}' + + info "creating 10 nodes" + for i in $(seq 1 10); do + p2psim node create "example" + p2psim node start "example" "$(node_name $i)" + done + + info "connecting node01 to all other nodes" + for i in $(seq 2 10); do + p2psim node connect "example" "node01" "$(node_name $i)" + done + + info "done" +} + +node_name() { + local num=$1 + echo "node$(printf '%02d' $num)" +} + +info() { + echo -e "\033[1;32m---> $(date +%H:%M:%S) ${@}\033[0m" +} + +fail() { + echo -e "\033[1;31mERROR: ${@}\033[0m" >&2 + exit 1 +} + +main "$@" diff --git a/p2p/simulations/http.go b/p2p/simulations/http.go index 7809830d4072..9ae935e08d9f 100644 --- a/p2p/simulations/http.go +++ b/p2p/simulations/http.go @@ -1,23 +1,241 @@ package simulations import ( + "bufio" + "bytes" "context" "encoding/json" "fmt" + "io" + "io/ioutil" "net/http" + "strings" "sync" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" "github.com/julienschmidt/httprouter" - "github.com/pborman/uuid" + "golang.org/x/net/websocket" ) +// DefaultClient is the default simulation API client which expects the API +// to be running at http://localhost:8888 +var DefaultClient = NewClient("http://localhost:8888") + +// Client is a client for the simulation HTTP API which supports creating +// and managing simulation networks +type Client struct { + URL string + + client *http.Client +} + +// NewClient returns a new simulation API client +func NewClient(url string) *Client { + return &Client{ + URL: url, + client: http.DefaultClient, + } +} + +// GetNetworks returns a list of simulations networks +func (c *Client) GetNetworks() ([]*Network, error) { + var networks []*Network + return networks, c.Get("/networks", &networks) +} + +// CreateNetwork creates a new simulation network +func (c *Client) CreateNetwork(config *NetworkConfig) (*Network, error) { + network := &Network{} + return network, c.Post("/networks", config, network) +} + +// GetNetwork returns details of a network +func (c *Client) GetNetwork(networkID string) (*Network, error) { + network := &Network{} + return network, c.Get(fmt.Sprintf("/networks/%s", networkID), network) +} + +// SubscribeNetwork subscribes to network events which are sent from the server +// as a server-sent-events stream +func (c *Client) SubscribeNetwork(networkID string, events chan *Event) (event.Subscription, error) { + req, err := http.NewRequest("GET", fmt.Sprintf("%s/networks/%s/events", c.URL, networkID), nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "text/event-stream") + res, err := c.client.Do(req) + if err != nil { + return nil, err + } + if res.StatusCode != http.StatusOK { + response, _ := ioutil.ReadAll(res.Body) + res.Body.Close() + return nil, fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + + // define a producer function to pass to event.Subscription + // which reads server-sent events from res.Body and sends + // them to the events channel + producer := func(stop <-chan struct{}) error { + defer res.Body.Close() + + // read lines from res.Body in a goroutine so that we are + // always reading from the stop channel + lines := make(chan string) + errC := make(chan error, 1) + go func() { + s := bufio.NewScanner(res.Body) + for s.Scan() { + select { + case lines <- s.Text(): + case <-stop: + return + } + } + errC <- s.Err() + }() + + // detect any lines which start with "data:", decode the data + // into an event and send it to the events channel + for { + select { + case line := <-lines: + if !strings.HasPrefix(line, "data:") { + continue + } + data := strings.TrimSpace(strings.TrimPrefix(line, "data:")) + event := &Event{} + if err := json.Unmarshal([]byte(data), event); err != nil { + return fmt.Errorf("error decoding SSE event: %s", err) + } + select { + case events <- event: + case <-stop: + return nil + } + case err := <-errC: + return err + case <-stop: + return nil + } + } + } + + return event.NewSubscription(producer), nil +} + +// GetNodes returns all nodes which exist in a network +func (c *Client) GetNodes(networkID string) ([]*p2p.NodeInfo, error) { + var nodes []*p2p.NodeInfo + return nodes, c.Get(fmt.Sprintf("/networks/%s/nodes", networkID), &nodes) +} + +// CreateNode creates a node in a network using the given configuration +func (c *Client) CreateNode(networkID string, config *adapters.NodeConfig) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Post(fmt.Sprintf("/networks/%s/nodes", networkID), config, node) +} + +// GetNode returns details of a node +func (c *Client) GetNode(networkID, nodeID string) (*p2p.NodeInfo, error) { + node := &p2p.NodeInfo{} + return node, c.Get(fmt.Sprintf("/networks/%s/nodes/%s", networkID, nodeID), node) +} + +// StartNode starts a node +func (c *Client) StartNode(networkID, nodeID string) error { + return c.Post(fmt.Sprintf("/networks/%s/nodes/%s/start", networkID, nodeID), nil, nil) +} + +// StopNode stops a node +func (c *Client) StopNode(networkID, nodeID string) error { + return c.Post(fmt.Sprintf("/networks/%s/nodes/%s/stop", networkID, nodeID), nil, nil) +} + +// ConnectNode connects a node to a peer node +func (c *Client) ConnectNode(networkID, nodeID, peerID string) error { + return c.Post(fmt.Sprintf("/networks/%s/nodes/%s/conn/%s", networkID, nodeID, peerID), nil, nil) +} + +// DisconnectNode disconnects a node from a peer node +func (c *Client) DisconnectNode(networkID, nodeID, peerID string) error { + return c.Delete(fmt.Sprintf("/networks/%s/nodes/%s/conn/%s", networkID, nodeID, peerID)) +} + +// RPCClient returns an RPC client connected to a node +func (c *Client) RPCClient(ctx context.Context, networkID, nodeID string) (*rpc.Client, error) { + baseURL := strings.Replace(c.URL, "http", "ws", 1) + return rpc.DialWebsocket(ctx, fmt.Sprintf("%s/networks/%s/nodes/%s/rpc", baseURL, networkID, nodeID), "") +} + +// Get performs a HTTP GET request decoding the resulting JSON response +// into "out" +func (c *Client) Get(path string, out interface{}) error { + return c.Send("GET", path, nil, out) +} + +// Post performs a HTTP POST request sending "in" as the JSON body and +// decoding the resulting JSON response into "out" +func (c *Client) Post(path string, in, out interface{}) error { + return c.Send("POST", path, in, out) +} + +// Delete performs a HTTP DELETE request +func (c *Client) Delete(path string) error { + return c.Send("DELETE", path, nil, nil) +} + +// Send performs a HTTP request, sending "in" as the JSON request body and +// decoding the JSON response into "out" +func (c *Client) Send(method, path string, in, out interface{}) error { + var body []byte + if in != nil { + var err error + body, err = json.Marshal(in) + if err != nil { + return err + } + } + req, err := http.NewRequest(method, c.URL+path, bytes.NewReader(body)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Accept", "application/json") + res, err := c.client.Do(req) + if err != nil { + return err + } + defer res.Body.Close() + if res.StatusCode != http.StatusOK && res.StatusCode != http.StatusCreated { + response, _ := ioutil.ReadAll(res.Body) + return fmt.Errorf("unexpected HTTP status: %s: %s", res.Status, response) + } + if out != nil { + if err := json.NewDecoder(res.Body).Decode(out); err != nil { + return err + } + } + return nil +} + +// ServerConfig is the configuration used to start an API server type ServerConfig struct { + // Adapter is the NodeAdapter to use when creating new networks Adapter adapters.NodeAdapter - Mocker func(*Network) + + // Mocker is the function which will be called when a client sends a + // POST request to /networks//mock and is expected to + // generate some mock events in the network + Mocker func(*Network) } +// Server is an HTTP server providing an API to create and manage simulation +// networks type Server struct { ServerConfig @@ -26,6 +244,7 @@ type Server struct { mtx sync.Mutex } +// NewServer returns a new simulation API server func NewServer(config *ServerConfig) *Server { if config.Adapter == nil { panic("Adapter not set") @@ -40,6 +259,7 @@ func NewServer(config *ServerConfig) *Server { s.POST("/networks", s.CreateNetwork) s.GET("/networks", s.GetNetworks) s.GET("/networks/:netid", s.GetNetwork) + s.GET("/networks/:netid/events", s.StreamNetworkEvents) s.POST("/networks/:netid/mock", s.StartMocker) s.POST("/networks/:netid/nodes", s.CreateNode) s.GET("/networks/:netid/nodes", s.GetNodes) @@ -48,10 +268,12 @@ func NewServer(config *ServerConfig) *Server { s.POST("/networks/:netid/nodes/:nodeid/stop", s.StopNode) s.POST("/networks/:netid/nodes/:nodeid/conn/:peerid", s.ConnectNode) s.DELETE("/networks/:netid/nodes/:nodeid/conn/:peerid", s.DisconnectNode) + s.GET("/networks/:netid/nodes/:nodeid/rpc", s.NodeRPC) return s } +// CreateNetwork creates a new simulation network func (s *Server) CreateNetwork(w http.ResponseWriter, req *http.Request) { config := &NetworkConfig{} if err := json.NewDecoder(req.Body).Decode(config); err != nil { @@ -59,49 +281,44 @@ func (s *Server) CreateNetwork(w http.ResponseWriter, req *http.Request) { return } - if config.Id == "" { - config.Id = uuid.NewRandom().String() - } - - err := func() error { + network, err := func() (*Network, error) { s.mtx.Lock() defer s.mtx.Unlock() + if config.Id == "" { + config.Id = fmt.Sprintf("net%d", len(s.networks)+1) + } if _, exists := s.networks[config.Id]; exists { - return fmt.Errorf("network exists: %s", config.Id) + return nil, fmt.Errorf("network exists: %s", config.Id) } network := NewNetwork(s.Adapter, config) s.networks[config.Id] = network - return nil + return network, nil }() if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) return } - s.JSON(w, http.StatusCreated, config) + s.JSON(w, http.StatusCreated, network) } +// GetNetworks returns a list of simulations networks func (s *Server) GetNetworks(w http.ResponseWriter, req *http.Request) { s.mtx.Lock() - networks := make([]NetworkConfig, 0, len(s.networks)) + networks := make([]*Network, 0, len(s.networks)) for _, network := range s.networks { - config := network.Config() - networks = append(networks, *config) + networks = append(networks, network) } s.mtx.Unlock() s.JSON(w, http.StatusOK, networks) } +// GetNetwork returns details of a network func (s *Server) GetNetwork(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) - if req.Header.Get("Accept") == "text/event-stream" { - s.streamNetworkEvents(network, w) - return - } - - s.JSON(w, http.StatusOK, network.Config()) + s.JSON(w, http.StatusOK, network) } func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { @@ -117,7 +334,10 @@ func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) } -func (s *Server) streamNetworkEvents(network *Network, w http.ResponseWriter) { +// StreamNetworkEvents streams network events as a server-sent-events stream +func (s *Server) StreamNetworkEvents(w http.ResponseWriter, req *http.Request) { + network := req.Context().Value("network").(*Network) + events := make(chan *Event) sub := network.events.Subscribe(events) defer sub.Unsubscribe() @@ -161,18 +381,27 @@ func (s *Server) streamNetworkEvents(network *Network, w http.ResponseWriter) { } } +// CreateNode creates a node in a network using the given configuration func (s *Server) CreateNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) - config, err := network.NewNode() + config := adapters.RandomNodeConfig() + err := json.NewDecoder(req.Body).Decode(config) + if err != nil && err != io.EOF { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + node, err := network.NewNodeWithConfig(config) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } - s.JSON(w, http.StatusCreated, network.GetNode(config.Id)) + s.JSON(w, http.StatusCreated, node.NodeInfo()) } +// GetNodes returns all nodes which exist in a network func (s *Server) GetNodes(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) @@ -186,12 +415,14 @@ func (s *Server) GetNodes(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, infos) } +// GetNode returns details of a node func (s *Server) GetNode(w http.ResponseWriter, req *http.Request) { node := req.Context().Value("node").(*Node) s.JSON(w, http.StatusOK, node.NodeInfo()) } +// StartNode starts a node func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) @@ -204,6 +435,7 @@ func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// StopNode stops a node func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) @@ -216,6 +448,7 @@ func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// ConnectNode connects a node to a peer node func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) @@ -229,6 +462,7 @@ func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// DisconnectNode disconnects a node from a peer node func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { network := req.Context().Value("network").(*Network) node := req.Context().Value("node").(*Node) @@ -242,28 +476,47 @@ func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) { s.JSON(w, http.StatusOK, node.NodeInfo()) } +// NodeRPC proxies node RPC requests via a WebSocket connection +func (s *Server) NodeRPC(w http.ResponseWriter, req *http.Request) { + node := req.Context().Value("node").(*Node) + + handler := func(conn *websocket.Conn) { + node.ServeRPC(conn) + } + + websocket.Server{Handler: handler}.ServeHTTP(w, req) +} + +// ServeHTTP implements the http.Handler interface by delegating to the +// underlying httprouter.Router func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { s.router.ServeHTTP(w, req) } +// GET registers a handler for GET requests to a particular path func (s *Server) GET(path string, handle http.HandlerFunc) { s.router.GET(path, s.wrapHandler(handle)) } +// POST registers a handler for POST requests to a particular path func (s *Server) POST(path string, handle http.HandlerFunc) { s.router.POST(path, s.wrapHandler(handle)) } +// DELETE registers a handler for DELETE requests to a particular path func (s *Server) DELETE(path string, handle http.HandlerFunc) { s.router.DELETE(path, s.wrapHandler(handle)) } +// JSON sends "data" as a JSON HTTP response func (s *Server) JSON(w http.ResponseWriter, status int, data interface{}) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(status) json.NewEncoder(w).Encode(data) } +// wrapHandler returns a httprouter.Handle which wraps a http.HandlerFunc by +// populating request.Context with any objects from the URL params func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { return func(w http.ResponseWriter, req *http.Request, params httprouter.Params) { w.Header().Set("Access-Control-Allow-Origin", "*") @@ -289,7 +542,12 @@ func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { http.NotFound(w, req) return } - node := network.GetNode(adapters.NewNodeIdFromHex(id)) + var node *Node + if nodeID, err := discover.HexID(id); err == nil { + node = network.GetNode(&adapters.NodeId{NodeID: nodeID}) + } else { + node = network.GetNodeByName(id) + } if node == nil { http.NotFound(w, req) return @@ -302,7 +560,12 @@ func (s *Server) wrapHandler(handler http.HandlerFunc) httprouter.Handle { http.NotFound(w, req) return } - peer := network.GetNode(adapters.NewNodeIdFromHex(id)) + var peer *Node + if peerID, err := discover.HexID(id); err == nil { + peer = network.GetNode(&adapters.NodeId{NodeID: peerID}) + } else { + peer = network.GetNodeByName(id) + } if peer == nil { http.NotFound(w, req) return diff --git a/p2p/simulations/http_test.go b/p2p/simulations/http_test.go new file mode 100644 index 000000000000..0f1b603add7f --- /dev/null +++ b/p2p/simulations/http_test.go @@ -0,0 +1,347 @@ +package simulations + +import ( + "context" + "net/http/httptest" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/simulations/adapters" + "github.com/ethereum/go-ethereum/rpc" +) + +type testService struct { + id *adapters.NodeId +} + +func newTestService(id *adapters.NodeId) node.Service { + return &testService{id} +} + +func (t *testService) Protocols() []p2p.Protocol { + return []p2p.Protocol{{ + Name: "test", + Version: 1, + Length: 1, + Run: t.Run, + }} +} + +func (t *testService) APIs() []rpc.API { + return []rpc.API{{ + Namespace: "test", + Version: "1.0", + Service: &TestAPI{}, + }} +} + +func (t *testService) Start(server p2p.Server) error { + return nil +} + +func (t *testService) Stop() error { + return nil +} + +func (t *testService) Run(_ *p2p.Peer, rw p2p.MsgReadWriter) error { + for { + _, err := rw.ReadMsg() + if err != nil { + return err + } + } +} + +// TestAPI provides a simple API to get and increment a counter and to +// subscribe to increment events +type TestAPI struct { + counter int64 + feed event.Feed +} + +func (t *TestAPI) Get() int64 { + return atomic.LoadInt64(&t.counter) +} + +func (t *TestAPI) Add(delta int64) { + atomic.AddInt64(&t.counter, delta) + t.feed.Send(delta) +} + +func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) { + notifier, supported := rpc.NotifierFromContext(ctx) + if !supported { + return nil, rpc.ErrNotificationsUnsupported + } + + rpcSub := notifier.CreateSubscription() + + go func() { + events := make(chan int64) + sub := t.feed.Subscribe(events) + defer sub.Unsubscribe() + + for { + select { + case event := <-events: + notifier.Notify(rpcSub.ID, event) + case <-sub.Err(): + return + case <-rpcSub.Err(): + return + case <-notifier.Closed(): + return + } + } + }() + + return rpcSub, nil +} + +var testServices = adapters.Services{ + "test": newTestService, +} + +// TestHTTPNetwork tests creating and interacting with a simulation +// network using the HTTP API +func TestHTTPNetwork(t *testing.T) { + // start the server + srv := NewServer(&ServerConfig{ + Adapter: adapters.NewSimAdapter(testServices), + }) + s := httptest.NewServer(srv) + defer s.Close() + + // create a network + client := NewClient(s.URL) + config := &NetworkConfig{ + DefaultService: "test", + } + network, err := client.CreateNetwork(config) + if err != nil { + t.Fatalf("error creating network: %s", err) + } + + // subscribe to events so we can check them later + events := make(chan *Event, 100) + sub, err := client.SubscribeNetwork(network.Id, events) + if err != nil { + t.Fatalf("error subscribing to network events: %s", err) + } + defer sub.Unsubscribe() + + // check the network has an ID + if network.Id == "" { + t.Fatal("expected network.Id to be set") + } + + // check the network exists + networks, err := client.GetNetworks() + if err != nil { + t.Fatalf("error getting networks: %s", err) + } + if len(networks) != 1 { + t.Fatalf("expected 1 network, got %d", len(networks)) + } + if networks[0].Id != network.Id { + t.Fatalf("expected network to have ID %q, got %q", network.Id, networks[0].Id) + } + gotNetwork, err := client.GetNetwork(network.Id) + if err != nil { + t.Fatalf("error getting network: %s", err) + } + if gotNetwork.Id != network.Id { + t.Fatalf("expected network to have ID %q, got %q", network.Id, gotNetwork.Id) + } + + // create 2 nodes + nodeIDs := make([]string, 2) + for i := 0; i < 2; i++ { + config := &adapters.NodeConfig{} + node, err := client.CreateNode(network.Id, config) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + nodeIDs[i] = node.ID + } + + // check both nodes exist + nodes, err := client.GetNodes(network.Id) + if err != nil { + t.Fatalf("error getting nodes: %s", err) + } + if len(nodes) != 2 { + t.Fatalf("expected 2 nodes, got %d", len(nodes)) + } + for i, nodeID := range nodeIDs { + if nodes[i].ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, nodes[i].ID) + } + node, err := client.GetNode(network.Id, nodeID) + if err != nil { + t.Fatalf("error getting node %d: %s", i, err) + } + if node.ID != nodeID { + t.Fatalf("expected node %d to have ID %q, got %q", i, nodeID, node.ID) + } + } + + // start both nodes + for _, nodeID := range nodeIDs { + if err := client.StartNode(network.Id, nodeID); err != nil { + t.Fatalf("error starting node %q: %s", nodeID, err) + } + } + + // connect the nodes + if err := client.ConnectNode(network.Id, nodeIDs[0], nodeIDs[1]); err != nil { + t.Fatalf("error connecting nodes: %s", err) + } + + // check we got all the events + nodeEvent := func(id string, up bool) *Event { + return &Event{ + Type: EventTypeNode, + Node: &Node{ + Config: &adapters.NodeConfig{ + Id: adapters.NewNodeIdFromHex(id), + }, + Up: up, + }, + } + } + connEvent := func(one, other string, up bool) *Event { + return &Event{ + Type: EventTypeConn, + Conn: &Conn{ + One: adapters.NewNodeIdFromHex(one), + Other: adapters.NewNodeIdFromHex(other), + Up: up, + }, + } + } + expectedEvents := []*Event{ + nodeEvent(nodeIDs[0], false), + nodeEvent(nodeIDs[1], false), + nodeEvent(nodeIDs[0], true), + nodeEvent(nodeIDs[1], true), + connEvent(nodeIDs[0], nodeIDs[1], false), + connEvent(nodeIDs[0], nodeIDs[1], true), + } + timeout := time.After(10 * time.Second) + for i := 0; i < len(expectedEvents); i++ { + select { + case event := <-events: + t.Logf("received %s event: %s", event.Type, event) + + expected := expectedEvents[i] + if event.Type != expected.Type { + t.Fatalf("expected event %d to have type %q, got %q", i, expected.Type, event.Type) + } + + switch expected.Type { + + case EventTypeNode: + if event.Node == nil { + t.Fatal("expected event.Node to be set") + } + if event.Node.ID().NodeID != expected.Node.ID().NodeID { + t.Fatalf("expected node event %d to have id %q, got %q", i, expected.Node.ID().Label(), event.Node.ID().Label()) + } + if event.Node.Up != expected.Node.Up { + t.Fatalf("expected node event %d to have up=%t, got up=%t", i, expected.Node.Up, event.Node.Up) + } + + case EventTypeConn: + if event.Conn == nil { + t.Fatal("expected event.Conn to be set") + } + if event.Conn.One.NodeID != expected.Conn.One.NodeID { + t.Fatalf("expected conn event %d to have one=%q, got one=%q", i, expected.Conn.One.Label(), event.Conn.One.Label()) + } + if event.Conn.Other.NodeID != expected.Conn.Other.NodeID { + t.Fatalf("expected conn event %d to have other=%q, got other=%q", i, expected.Conn.Other.Label(), event.Conn.Other.Label()) + } + if event.Conn.Up != expected.Conn.Up { + t.Fatalf("expected conn event %d to have up=%t, got up=%t", i, expected.Conn.Up, event.Conn.Up) + } + } + + case err := <-sub.Err(): + t.Fatalf("network stream closed unexpectedly: %s", err) + + case <-timeout: + t.Fatal("timed out waiting for expected events") + } + } +} + +// TestHTTPNodeRPC tests calling RPC methods on nodes via the HTTP API +func TestHTTPNodeRPC(t *testing.T) { + // start the server + srv := NewServer(&ServerConfig{ + Adapter: adapters.NewSimAdapter(testServices), + }) + s := httptest.NewServer(srv) + defer s.Close() + + // start a node in a network + client := NewClient(s.URL) + network, err := client.CreateNetwork(&NetworkConfig{DefaultService: "test"}) + if err != nil { + t.Fatalf("error creating network: %s", err) + } + node, err := client.CreateNode(network.Id, &adapters.NodeConfig{}) + if err != nil { + t.Fatalf("error creating node: %s", err) + } + if err := client.StartNode(network.Id, node.ID); err != nil { + t.Fatalf("error starting node: %s", err) + } + + // create two RPC clients + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + rpcClient1, err := client.RPCClient(ctx, network.Id, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + rpcClient2, err := client.RPCClient(ctx, network.Id, node.ID) + if err != nil { + t.Fatalf("error getting node RPC client: %s", err) + } + + // subscribe to events using client 1 + events := make(chan int64, 1) + sub, err := rpcClient1.Subscribe(ctx, "test", events, "events") + if err != nil { + t.Fatalf("error subscribing to events: %s", err) + } + defer sub.Unsubscribe() + + // call some RPC methods using client 2 + if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + var result int64 + if err := rpcClient2.CallContext(ctx, &result, "test_get"); err != nil { + t.Fatalf("error calling RPC method: %s", err) + } + if result != 10 { + t.Fatalf("expected result to be 10, got %d", result) + } + + // check we got an event from client 1 + select { + case event := <-events: + if event != 10 { + t.Fatalf("expected event to be 10, got %d", event) + } + case <-ctx.Done(): + t.Fatal(ctx.Err()) + } +} diff --git a/p2p/simulations/network.go b/p2p/simulations/network.go index b90f367b890d..bed5ae5cb0c6 100644 --- a/p2p/simulations/network.go +++ b/p2p/simulations/network.go @@ -40,26 +40,19 @@ import ( ) type NetworkConfig struct { - // Type NetworkType - // Config json.RawMessage // type-specific configs - // type - // Events []string - Id string - DefaultMockerConfig *MockerConfig - Backend bool - DefaultService string -} - -type NetworkControl interface { - Events() *event.TypeMux - Config() *NetworkConfig - Subscribe(*event.TypeMux, ...interface{}) + Id string `json:"id"` + DefaultService string `json:"default_service,omitempty"` } // Network models a p2p network // the actual logic of bringing nodes and connections up and down and // messaging is implemented in the particular NodeAdapter interface type Network struct { + NetworkConfig + + Nodes []*Node `json:"nodes"` + Conns []*Conn `json:"conns"` + nodeAdapter adapters.NodeAdapter // input trigger events and other events @@ -67,19 +60,16 @@ type Network struct { lock sync.RWMutex nodeMap map[discover.NodeID]int connMap map[string]int - Nodes []*Node `json:"nodes"` - Conns []*Conn `json:"conns"` quitc chan bool - conf *NetworkConfig } func NewNetwork(nodeAdapter adapters.NodeAdapter, conf *NetworkConfig) *Network { return &Network{ - nodeAdapter: nodeAdapter, - conf: conf, - nodeMap: make(map[discover.NodeID]int), - connMap: make(map[string]int), - quitc: make(chan bool), + NetworkConfig: *conf, + nodeAdapter: nodeAdapter, + nodeMap: make(map[discover.NodeID]int), + connMap: make(map[string]int), + quitc: make(chan bool), } } @@ -122,7 +112,7 @@ func (self *Network) executeNodeEvent(e *Event) error { return self.Stop(e.Node.ID()) } - if err := self.NewNodeWithConfig(e.Node.Config); err != nil { + if _, err := self.NewNodeWithConfig(e.Node.Config); err != nil { return err } return self.Start(e.Node.ID()) @@ -158,6 +148,12 @@ func (self *Node) String() string { return fmt.Sprintf("Node %v", self.ID().Label()) } +func (self *Node) NodeInfo() *p2p.NodeInfo { + info := self.Node.NodeInfo() + info.Name = self.Config.Name + return info +} + // active connections are represented by the Node entry object so that // you journal updates could filter if passive knowledge about peers is // irrelevant @@ -191,34 +187,34 @@ func (self *Msg) String() string { } // NewNode adds a new node to the network with a random ID -func (self *Network) NewNode() (*adapters.NodeConfig, error) { +func (self *Network) NewNode() (*Node, error) { conf := adapters.RandomNodeConfig() - conf.Service = self.conf.DefaultService - if err := self.NewNodeWithConfig(conf); err != nil { - return nil, err - } - return conf, nil + conf.Service = self.DefaultService + return self.NewNodeWithConfig(conf) } // NewNodeWithConfig adds a new node to the network with the given config // errors if a node by the same id already exist -func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) error { +func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) (*Node, error) { self.lock.Lock() defer self.lock.Unlock() id := conf.Id + if conf.Name == "" { + conf.Name = fmt.Sprintf("node%02d", len(self.Nodes)+1) + } if conf.Service == "" { - conf.Service = self.conf.DefaultService + conf.Service = self.DefaultService } _, found := self.nodeMap[id.NodeID] if found { - return fmt.Errorf("node %v already added", id) + return nil, fmt.Errorf("node %v already added", id) } self.nodeMap[id.NodeID] = len(self.Nodes) adapterNode, err := self.nodeAdapter.NewNode(conf) if err != nil { - return err + return nil, err } node := &Node{ Node: adapterNode, @@ -227,11 +223,11 @@ func (self *Network) NewNodeWithConfig(conf *adapters.NodeConfig) error { self.Nodes = append(self.Nodes, node) log.Trace(fmt.Sprintf("node %v created", id)) self.events.Send(ControlEvent(node)) - return nil + return node, nil } func (self *Network) Config() *NetworkConfig { - return self.conf + return &self.NetworkConfig } // newConn adds a new connection to the network @@ -495,6 +491,17 @@ func (self *Network) GetNode(id *adapters.NodeId) *Node { return self.getNode(id) } +func (self *Network) GetNodeByName(name string) *Node { + self.lock.Lock() + defer self.lock.Unlock() + for _, node := range self.Nodes { + if node.Config.Name == name { + return node + } + } + return nil +} + func (self *Network) GetNodes() []*Node { self.lock.Lock() defer self.lock.Unlock() diff --git a/p2p/testing/protocoltester.go b/p2p/testing/protocoltester.go index d0ea503e4d6a..16a23f120fa6 100644 --- a/p2p/testing/protocoltester.go +++ b/p2p/testing/protocoltester.go @@ -29,7 +29,7 @@ func NewProtocolTester(t *testing.T, id *adapters.NodeId, n int, run func(*p2p.P } adapter := adapters.NewSimAdapter(services) net := simulations.NewNetwork(adapter, &simulations.NetworkConfig{}) - if err := net.NewNodeWithConfig(&adapters.NodeConfig{Id: id, Service: "test"}); err != nil { + if _, err := net.NewNodeWithConfig(&adapters.NodeConfig{Id: id, Service: "test"}); err != nil { panic(err.Error()) } if err := net.Start(id); err != nil { @@ -65,7 +65,7 @@ func NewProtocolTester(t *testing.T, id *adapters.NodeId, n int, run func(*p2p.P func (self *ProtocolTester) Connect(selfId *adapters.NodeId, peers ...*adapters.NodeConfig) { for _, peer := range peers { log.Trace(fmt.Sprintf("start node %v", peer.Id)) - if err := self.network.NewNodeWithConfig(peer); err != nil { + if _, err := self.network.NewNodeWithConfig(peer); err != nil { panic(fmt.Sprintf("error starting peer %v: %v", peer.Id, err)) } if err := self.network.Start(peer.Id); err != nil { diff --git a/swarm/network/simulations/discovery/discovery_test.go b/swarm/network/simulations/discovery/discovery_test.go index 5137adf91f1a..51364dfb1538 100644 --- a/swarm/network/simulations/discovery/discovery_test.go +++ b/swarm/network/simulations/discovery/discovery_test.go @@ -68,17 +68,17 @@ func testDiscoverySimulation(t *testing.T, adapter adapters.NodeAdapter) { trigger := make(chan *adapters.NodeId) ids := make([]*adapters.NodeId, nodeCount) for i := 0; i < nodeCount; i++ { - conf, err := net.NewNode() + node, err := net.NewNode() if err != nil { - t.Fatalf("error starting node %s: %s", conf.Id.Label(), err) + t.Fatalf("error starting node %s: %s", node.ID().Label(), err) } - if err := net.Start(conf.Id); err != nil { - t.Fatalf("error starting node %s: %s", conf.Id.Label(), err) + if err := net.Start(node.ID()); err != nil { + t.Fatalf("error starting node %s: %s", node.ID().Label(), err) } - if err := triggerChecks(trigger, net, conf.Id); err != nil { - t.Fatal("error triggering checks for node %s: %s", conf.Id.Label(), err) + if err := triggerChecks(trigger, net, node.ID()); err != nil { + t.Fatal("error triggering checks for node %s: %s", node.ID().Label(), err) } - ids[i] = conf.Id + ids[i] = node.ID() } // run a simulation which connects the 10 nodes in a ring and waits diff --git a/swarm/network/simulations/overlay.go b/swarm/network/simulations/overlay.go index 0e1e81f32bb0..69b42cbd928e 100644 --- a/swarm/network/simulations/overlay.go +++ b/swarm/network/simulations/overlay.go @@ -96,11 +96,11 @@ func mocker(net *simulations.Network) { ids := make([]*adapters.NodeId, 10) for i := 0; i < 10; i++ { - conf, err := net.NewNode() + node, err := net.NewNode() if err != nil { panic(err.Error()) } - ids[i] = conf.Id + ids[i] = node.ID() } for _, id := range ids {