diff --git a/chan.go b/chan.go deleted file mode 100644 index 571393c..0000000 --- a/chan.go +++ /dev/null @@ -1,109 +0,0 @@ -package msgio - -import ( - "io" - - pool "github.com/libp2p/go-buffer-pool" -) - -// Chan is a msgio duplex channel. It is used to have a channel interface -// around a msgio.Reader or Writer. -type Chan struct { - MsgChan chan []byte - ErrChan chan error - CloseChan chan bool -} - -// NewChan constructs a Chan with a given buffer size. -func NewChan(chanSize int) *Chan { - return &Chan{ - MsgChan: make(chan []byte, chanSize), - ErrChan: make(chan error, 1), - CloseChan: make(chan bool, 2), - } -} - -// ReadFrom wraps the given io.Reader with a msgio.Reader, reads all -// messages, ands sends them down the channel. -func (s *Chan) ReadFrom(r io.Reader) { - s.readFrom(NewReader(r)) -} - -// ReadFromWithPool wraps the given io.Reader with a msgio.Reader, reads all -// messages, ands sends them down the channel. Uses given BufferPool. -func (s *Chan) ReadFromWithPool(r io.Reader, p *pool.BufferPool) { - s.readFrom(NewReaderWithPool(r, p)) -} - -// ReadFrom wraps the given io.Reader with a msgio.Reader, reads all -// messages, ands sends them down the channel. -func (s *Chan) readFrom(mr Reader) { -Loop: - for { - buf, err := mr.ReadMsg() - if err != nil { - if err == io.EOF { - break Loop // done - } - - // unexpected error. tell the client. - s.ErrChan <- err - break Loop - } - - select { - case <-s.CloseChan: - break Loop // told we're done - case s.MsgChan <- buf: - // ok seems fine. send it away - } - } - - close(s.MsgChan) - // signal we're done - s.CloseChan <- true -} - -// WriteTo wraps the given io.Writer with a msgio.Writer, listens on the -// channel and writes all messages to the writer. -func (s *Chan) WriteTo(w io.Writer) { - // new buffer per message - // if bottleneck, cycle around a set of buffers - mw := NewWriter(w) - -Loop: - for { - select { - case <-s.CloseChan: - break Loop // told we're done - - case msg, ok := <-s.MsgChan: - if !ok { // chan closed - break Loop - } - - if err := mw.WriteMsg(msg); err != nil { - if err != io.EOF { - // unexpected error. tell the client. - s.ErrChan <- err - } - - break Loop - } - } - } - - // signal we're done - s.CloseChan <- true -} - -// Close the Chan -func (s *Chan) Close() { - s.CloseChan <- true -} - -// nullLocker conforms to the sync.Locker interface but does nothing. -type nullLocker struct{} - -func (l *nullLocker) Lock() {} -func (l *nullLocker) Unlock() {} diff --git a/chan_test.go b/chan_test.go deleted file mode 100644 index fc80133..0000000 --- a/chan_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package msgio - -import ( - "bytes" - "io" - "math/rand" - "testing" - "time" -) - -func randBuf(r *rand.Rand, size int) []byte { - buf := make([]byte, size) - _, _ = r.Read(buf) - return buf -} - -func TestReadChan(t *testing.T) { - buf := bytes.NewBuffer(nil) - writer := NewWriter(buf) - rchan := NewChan(10) - msgs := [1000][]byte{} - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - for i := range msgs { - msgs[i] = randBuf(r, r.Intn(1000)) - err := writer.WriteMsg(msgs[i]) - if err != nil { - t.Fatal(err) - } - } - - if err := writer.Close(); err != nil { - t.Fatal(err) - } - - go rchan.ReadFrom(buf) - defer rchan.Close() - -Loop: - for i := 0; ; i++ { - select { - case err := <-rchan.ErrChan: - if err != nil { - t.Fatal("unexpected error", err) - } - - case msg2, ok := <-rchan.MsgChan: - if !ok { - if i < len(msg2) { - t.Error("failed to read all messages", len(msgs), i) - } - break Loop - } - - msg1 := msgs[i] - if !bytes.Equal(msg1, msg2) { - t.Fatal("message retrieved not equal\n", msg1, "\n\n", msg2) - } - } - } -} - -func TestWriteChan(t *testing.T) { - buf := bytes.NewBuffer(nil) - reader := NewReader(buf) - wchan := NewChan(10) - msgs := [1000][]byte{} - - go wchan.WriteTo(buf) - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - for i := range msgs { - msgs[i] = randBuf(r, r.Intn(1000)) - - select { - case err := <-wchan.ErrChan: - if err != nil { - t.Fatal("unexpected error", err) - } - - case wchan.MsgChan <- msgs[i]: - } - } - - // tell chan we're done. - close(wchan.MsgChan) - // wait for writing to end - <-wchan.CloseChan - - defer wchan.Close() - - for i := 0; ; i++ { - msg2, err := reader.ReadMsg() - if err != nil { - if err == io.EOF { - if i < len(msg2) { - t.Error("failed to read all messages", len(msgs), i) - } - break - } - t.Error("unexpected error", err) - } - - msg1 := msgs[i] - if !bytes.Equal(msg1, msg2) { - t.Fatal("message retrieved not equal\n", msg1, "\n\n", msg2) - } - } - - if err := reader.Close(); err != nil { - t.Error(err) - } -} diff --git a/limit_test.go b/limit_test.go index 9b48d79..a3e43eb 100644 --- a/limit_test.go +++ b/limit_test.go @@ -22,4 +22,7 @@ func TestLimitWriter(t *testing.T) { t.Fatal("Expected to write 3 bytes with no errors") } err = writer.Flush() + if err != nil { + t.Fatal(err) + } } diff --git a/msgio_test.go b/msgio_test.go index e151cda..4167e02 100644 --- a/msgio_test.go +++ b/msgio_test.go @@ -12,6 +12,12 @@ import ( "time" ) +func randBuf(r *rand.Rand, size int) []byte { + buf := make([]byte, size) + _, _ = r.Read(buf) + return buf +} + func TestReadWrite(t *testing.T) { buf := bytes.NewBuffer(nil) writer := NewWriter(buf) @@ -54,8 +60,7 @@ type testIoReadWriter struct { func TestReadWriterClose(t *testing.T) { r, w := io.Pipe() - var rw ReadWriteCloser - rw = NewReadWriter(testIoReadWriter{r, w}) + rw := NewReadWriter(testIoReadWriter{r, w}) SubtestReaderWriterClose(t, rw) }