Skip to content

Commit

Permalink
p2p/simulations: Refactor events
Browse files Browse the repository at this point in the history
Signed-off-by: Lewis Marshall <lewis@lmars.net>
  • Loading branch information
lmars committed May 10, 2017
1 parent aa624cd commit 584f8b8
Show file tree
Hide file tree
Showing 10 changed files with 209 additions and 419 deletions.
17 changes: 17 additions & 0 deletions p2p/simulations/adapters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,23 @@ func (self *NodeId) Label() string {
return self.String()[:4]
}

func (self *NodeId) MarshalJSON() ([]byte, error) {
return json.Marshal(hex.EncodeToString(self.NodeID[:]))
}

func (self *NodeId) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
id, err := discover.HexID(s)
if err != nil {
return err
}
self.NodeID = id
return nil
}

// NodeConfig is the configuration used to start a node in a simulation
// network
type NodeConfig struct {
Expand Down
61 changes: 61 additions & 0 deletions p2p/simulations/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package simulations

import (
"fmt"
"time"
)

type EventType string

const (
EventTypeNode EventType = "node"
EventTypeConn EventType = "conn"
EventTypeMsg EventType = "msg"
)

type Event struct {
Type EventType `json:"type"`
Time time.Time `json:"time"`
Control bool `json:"control"`

Node *Node `json:"node,omitempty"`
Conn *Conn `json:"conn,omitempty"`
Msg *Msg `json:"msg,omitempty"`
}

func NewEvent(v interface{}) *Event {
event := &Event{Time: time.Now()}
switch v := v.(type) {
case *Node:
event.Type = EventTypeNode
event.Node = v
case *Conn:
event.Type = EventTypeConn
event.Conn = v
case *Msg:
event.Type = EventTypeMsg
event.Msg = v
default:
panic(fmt.Sprintf("invalid event type: %T", v))
}
return event
}

func ControlEvent(v interface{}) *Event {
event := NewEvent(v)
event.Control = true
return event
}

func (e *Event) String() string {
switch e.Type {
case EventTypeNode:
return fmt.Sprintf("<node-event> id: %s up: %t", e.Node.ID().Label(), e.Node.Up)
case EventTypeConn:
return fmt.Sprintf("<conn-event> nodes: %s->%s up: %t", e.Conn.One.Label(), e.Conn.Other.Label(), e.Conn.Up)
case EventTypeMsg:
return fmt.Sprintf("<msg-event> nodes: %s->%s code: %d, received: %t", e.Msg.One.Label(), e.Msg.Other.Label(), e.Msg.Code, e.Msg.Received)
default:
return ""
}
}
30 changes: 14 additions & 16 deletions p2p/simulations/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ func (s *Server) StartMocker(w http.ResponseWriter, req *http.Request) {
}

func (s *Server) streamNetworkEvents(network *Network, w http.ResponseWriter) {
sub := network.events.Subscribe(ConnectivityAllEvents...)
events := make(chan *Event)
sub := network.events.Subscribe(events)
defer sub.Unsubscribe()

// stop the stream if the client goes away
Expand All @@ -140,23 +141,20 @@ func (s *Server) streamNetworkEvents(network *Network, w http.ResponseWriter) {
}
}

w.Header().Set("Content-Type", "text/event-stream")
ch := sub.Chan()
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.WriteHeader(http.StatusOK)
if fw, ok := w.(http.Flusher); ok {
fw.Flush()
}
for {
select {
case event := <-ch:
// convert the event to a SimUpdate
update, err := NewSimUpdate(event)
if err != nil {
write("error", err.Error())
return
}
data, err := json.Marshal(update)
case event := <-events:
data, err := json.Marshal(event)
if err != nil {
write("error", err.Error())
return
}
write("simupdate", string(data))
write("network", string(data))
case <-clientGone:
return
}
Expand Down Expand Up @@ -198,7 +196,7 @@ func (s *Server) StartNode(w http.ResponseWriter, req *http.Request) {
network := req.Context().Value("network").(*Network)
node := req.Context().Value("node").(*Node)

if err := network.Start(node.Id); err != nil {
if err := network.Start(node.ID()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -210,7 +208,7 @@ func (s *Server) StopNode(w http.ResponseWriter, req *http.Request) {
network := req.Context().Value("network").(*Network)
node := req.Context().Value("node").(*Node)

if err := network.Stop(node.Id); err != nil {
if err := network.Stop(node.ID()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -223,7 +221,7 @@ func (s *Server) ConnectNode(w http.ResponseWriter, req *http.Request) {
node := req.Context().Value("node").(*Node)
peer := req.Context().Value("peer").(*Node)

if err := network.Connect(node.Id, peer.Id); err != nil {
if err := network.Connect(node.ID(), peer.ID()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand All @@ -236,7 +234,7 @@ func (s *Server) DisconnectNode(w http.ResponseWriter, req *http.Request) {
node := req.Context().Value("node").(*Node)
peer := req.Context().Value("peer").(*Node)

if err := network.Disconnect(node.Id, peer.Id); err != nil {
if err := network.Disconnect(node.ID(), peer.ID()); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
50 changes: 26 additions & 24 deletions p2p/simulations/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ import (
// (using event.TypeMux). Network components POST events to the TypeMux, which then is
// read by the journal. Each journal belongs to a subscription.
type Journal struct {
Id string
Id string `json:"id"`
Events []*Event `json:"events"`

lock sync.Mutex
counter int
cursor int
quitc chan bool
Events []*event.TypeMuxEvent
}

// NewJournal constructor
Expand All @@ -34,19 +35,19 @@ func NewJournal() *Journal {
return &Journal{quitc: make(chan bool)}
}

// Subscribe takes an event.TypeMux and subscibes to types
// and launches a gorourine that appends any new event to the event log
// used for journalling history of a network
// Subscribe subscribes to an event.Feed and launches a gorourine that appends
// any new event to the event log used for journalling history of a network
// the goroutine terminates when the journal is closed
func (self *Journal) Subscribe(eventer *event.TypeMux, types ...interface{}) {
func (self *Journal) Subscribe(feed *event.Feed) {
log.Info("subscribe")
sub := eventer.Subscribe(types...)
events := make(chan *Event)
sub := feed.Subscribe(events)
go func() {
defer sub.Unsubscribe()
for {
select {
case ev := <-sub.Chan():
self.append(ev)
case event := <-events:
self.append(event)
case <-self.quitc:
return
}
Expand Down Expand Up @@ -75,11 +76,12 @@ func NewJournalFromJSON(b []byte) (*Journal, error) {
// params:
// * acc: using acceleration factor acc
// * journal: journal to use
// * eventer: where to post the replayed events
func Replay(acc float64, j *Journal, eventer *event.TypeMux) {
f := func(d interface{}) bool {
// * feed: where to post the replayed events
func Replay(acc float64, j *Journal, feed *event.Feed) {
f := func(event *Event) bool {
// reposts the data with the eventer (the data receives a new timestamp)
eventer.Post(d)
event.Time = time.Now()
feed.Send(event)
return true
}
j.TimedRead(acc, f)
Expand All @@ -97,10 +99,10 @@ func (self *Journal) Close() {
close(self.quitc)
}

func (self *Journal) append(evs ...*event.TypeMuxEvent) {
func (self *Journal) append(events ...*Event) {
self.lock.Lock()
defer self.lock.Unlock()
self.Events = append(self.Events, evs...)
self.Events = append(self.Events, events...)
self.counter++
}

Expand All @@ -116,7 +118,7 @@ func (self *Journal) WaitEntries(n int) {
}
}

func (self *Journal) Read(f func(*event.TypeMuxEvent) bool) (read int) {
func (self *Journal) Read(f func(*Event) bool) (read int) {
self.lock.Lock()
defer self.lock.Unlock()
ok := true
Expand All @@ -139,20 +141,20 @@ func (self *Journal) Read(f func(*event.TypeMuxEvent) bool) (read int) {
// NOTE: the events' timestamps are supposed to be strictly ordered otherwise
// the call panics.
// acc is an acceleration factor
func (self *Journal) TimedRead(acc float64, f func(interface{}) bool) (read int) {
func (self *Journal) TimedRead(acc float64, f func(*Event) bool) (read int) {
var lastEvent time.Time
timer := time.NewTimer(0)
var data interface{}
h := func(ev *event.TypeMuxEvent) bool {
var event *Event
h := func(e *Event) bool {
// wait for the interval time passes event time
if ev.Time.Before(lastEvent) {
if e.Time.Before(lastEvent) {
panic("events not ordered")
}
interval := ev.Time.Sub(lastEvent)
interval := e.Time.Sub(lastEvent)
log.Trace(fmt.Sprintf("reset timer to interval %v", interval))
timer.Reset(time.Duration(acc) * interval)
lastEvent = ev.Time
data = ev.Data
lastEvent = e.Time
event = e
return false
}
var n int
Expand All @@ -168,7 +170,7 @@ func (self *Journal) TimedRead(acc float64, f func(interface{}) bool) (read int)
}
}
read += n
if n == 0 || !f(data) {
if n == 0 || !f(event) {
log.Trace(fmt.Sprintf("timed read ends (read %v entries)", read))
break
}
Expand Down
34 changes: 18 additions & 16 deletions p2p/simulations/journal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ import (
"github.com/ethereum/go-ethereum/p2p/simulations/adapters"
)

func testEvents(intervals ...int) (events []*event.TypeMuxEvent) {
func testEvents(intervals ...int) (events []*Event) {
t := time.Now()
for _, interval := range intervals {
t = t.Add(time.Duration(interval) * time.Millisecond)
events = append(events, &event.TypeMuxEvent{
events = append(events, &Event{
Type: EventTypeNode,
Time: t,
Data: interface{}(&NodeEvent{
Type: "node",
Action: "down",
}),
Node: &Node{Up: false},
})
}
return events
Expand All @@ -33,8 +31,7 @@ func TestTimedRead(t *testing.T) {
var i int
acc := 0.5
length := 4
f := func(data interface{}) bool {
_ = data.(*NodeEvent)
f := func(event *Event) bool {
newTimes = append(newTimes, time.Now())
i++
return i <= length
Expand Down Expand Up @@ -68,10 +65,15 @@ func testIDs() (ids []*adapters.NodeId) {
}

func testJournal(ids []*adapters.NodeId) *Journal {
eventer := &event.TypeMux{}
eventer := &event.Feed{}
journal := NewJournal()
journal.Subscribe(eventer, ConnectivityEvents...)
mockNewNodes(eventer, ids)
journal.Subscribe(eventer)
for _, id := range ids {
eventer.Send(&Event{
Type: EventTypeNode,
Node: &Node{Config: &adapters.NodeConfig{Id: id}, Up: true},
})
}
journal.WaitEntries(len(ids))
return journal
}
Expand All @@ -80,7 +82,7 @@ func TestSubscribe(t *testing.T) {
ids := testIDs()
journal := testJournal(ids)
for i, ev := range journal.Events {
id := ev.Data.(*NodeEvent).node.Id
id := ev.Node.ID()
if id != ids[i] {
t.Fatalf("incorrect id: expected %v, got %v", id, ids[i])
}
Expand Down Expand Up @@ -115,15 +117,15 @@ func TestLoadSave(t *testing.T) {

func TestReplay(t *testing.T) {
_, jo := loadTestJournal(t)
eventer := &event.TypeMux{}
eventer := &event.Feed{}

journal := NewJournal()
journal.Subscribe(eventer, ConnectivityEvents...)
journal.Subscribe(eventer)

Replay(0, jo, eventer)
for i, ev := range jo.Events {
exp := ev.Data.(*NodeEvent).String()
got := journal.Events[i].Data.(*NodeEvent).String()
exp := ev.String()
got := journal.Events[i].String()
if exp != got {
t.Fatalf("incorrent replayed journal entry at pos %v: expected %v, got %v", i, exp, got)
}
Expand Down
Loading

0 comments on commit 584f8b8

Please sign in to comment.