Skip to content

Commit

Permalink
Add fake.NetConnOptions (#324)
Browse files Browse the repository at this point in the history
Includes ChunkSize to set a default for the fake.NetConn.
  • Loading branch information
jhendrixMSFT committed Mar 15, 2024
1 parent 8d8ce79 commit 1003610
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 101 deletions.
10 changes: 5 additions & 5 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func BenchmarkSenderSendSSMUnsettled(b *testing.B) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
conn := fake.NewNetConn(responder)
conn := fake.NewNetConn(responder, fake.NetConnOptions{})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, conn, nil)
cancel()
Expand Down Expand Up @@ -74,7 +74,7 @@ func BenchmarkSenderSendSSMSettled(b *testing.B) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
conn := fake.NewNetConn(responder)
conn := fake.NewNetConn(responder, fake.NetConnOptions{})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, conn, nil)
cancel()
Expand Down Expand Up @@ -117,7 +117,7 @@ func BenchmarkReceiverReceiveRSMFirst(b *testing.B) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
conn := fake.NewNetConn(responder)
conn := fake.NewNetConn(responder, fake.NetConnOptions{})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, conn, nil)
cancel()
Expand Down Expand Up @@ -168,7 +168,7 @@ func BenchmarkReceiverReceiveRSMSecond(b *testing.B) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
conn := fake.NewNetConn(responder)
conn := fake.NewNetConn(responder, fake.NetConnOptions{})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, conn, nil)
cancel()
Expand Down Expand Up @@ -219,7 +219,7 @@ func BenchmarkReceiverSettleMessage(b *testing.B) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
conn := fake.NewNetConn(responder)
conn := fake.NewNetConn(responder, fake.NetConnOptions{})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, conn, nil)
cancel()
Expand Down
44 changes: 22 additions & 22 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ func TestStart(t *testing.T) {

for _, tt := range tests {
t.Run(tt.label, func(t *testing.T) {
netConn := fake.NewNetConn(tt.responder)
netConn := fake.NewNetConn(tt.responder, fake.NetConnOptions{})
conn, err := newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Expand All @@ -294,15 +294,15 @@ func TestStart(t *testing.T) {
}

func TestClose(t *testing.T) {
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled))
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
conn, err := newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
require.NoError(t, conn.start(ctx))
cancel()
require.NoError(t, conn.Close())
// with Close error
netConn = fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled))
netConn = fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
conn, err = newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestServerSideClose(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
conn, err := newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
Expand All @@ -346,7 +346,7 @@ func TestServerSideClose(t *testing.T) {

// with error
closeReceived = make(chan struct{})
netConn = fake.NewNetConn(responder)
netConn = fake.NewNetConn(responder, fake.NetConnOptions{})
conn, err = newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel = context.WithTimeout(context.Background(), 1*time.Second)
Expand Down Expand Up @@ -384,7 +384,7 @@ func TestKeepAlives(t *testing.T) {
}
}

netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
conn, err := newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
Expand Down Expand Up @@ -426,7 +426,7 @@ func TestKeepAlivesIdleTimeout(t *testing.T) {

const idleTimeout = 100 * time.Millisecond

netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
conn, err := newConn(netConn, &ConnOptions{
IdleTimeout: idleTimeout,
})
Expand All @@ -453,7 +453,7 @@ func TestKeepAlivesIdleTimeout(t *testing.T) {
}

func TestConnReaderError(t *testing.T) {
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled))
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
conn, err := newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
Expand All @@ -471,7 +471,7 @@ func TestConnReaderError(t *testing.T) {
}

func TestConnWriterError(t *testing.T) {
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled))
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})
conn, err := newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
Expand Down Expand Up @@ -501,7 +501,7 @@ func TestConnWithZeroByteReads(t *testing.T) {
}
}

netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
netConn.SendFrame([]byte{})

conn, err := newConn(netConn, nil)
Expand All @@ -517,7 +517,7 @@ func TestConnNegotiationTimeout(t *testing.T) {
return fake.Response{}, nil
}

netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
conn, err := newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
Expand All @@ -530,7 +530,7 @@ type mockDialer struct {
}

func (m mockDialer) NetDialerDial(ctx context.Context, c *Conn, host, port string) error {
c.net = fake.NewNetConn(m.resp)
c.net = fake.NewNetConn(m.resp, fake.NetConnOptions{})
return nil
}

Expand Down Expand Up @@ -640,7 +640,7 @@ func TestClientNewSession(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
Expand Down Expand Up @@ -686,7 +686,7 @@ func TestClientMultipleSessions(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
Expand Down Expand Up @@ -736,7 +736,7 @@ func TestClientTooManySessions(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
Expand Down Expand Up @@ -777,7 +777,7 @@ func TestClientNewSessionMissingRemoteChannel(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
Expand Down Expand Up @@ -807,7 +807,7 @@ func TestClientNewSessionInvalidInitialResponse(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
Expand Down Expand Up @@ -843,7 +843,7 @@ func TestClientNewSessionInvalidSecondResponseSameChannel(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
Expand Down Expand Up @@ -887,7 +887,7 @@ func TestClientNewSessionInvalidSecondResponseDifferentChannel(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
Expand Down Expand Up @@ -935,7 +935,7 @@ func TestNewSessionTimedOut(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
Expand Down Expand Up @@ -980,7 +980,7 @@ func TestNewSessionWriteError(t *testing.T) {
return fake.Response{}, fmt.Errorf("unhandled frame %T", req)
}
}
netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
client, err := NewConn(ctx, netConn, nil)
Expand Down Expand Up @@ -1055,7 +1055,7 @@ func TestConnSmallFrames(t *testing.T) {
}
}

netConn := fake.NewNetConn(responder)
netConn := fake.NewNetConn(responder, fake.NetConnOptions{})
conn, err := newConn(netConn, nil)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
Expand Down
19 changes: 18 additions & 1 deletion internal/fake/net_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,25 @@ import (
"github.com/Azure/go-amqp/internal/frames"
)

// NetConnOptions contains options when creating a NetConn.
// Pass the zero-value to accept the default values.
type NetConnOptions struct {
// ChunkSize is the size of chunks to split responses into.
// A zero or negative value means no chunking.
// The default value is zero.
ChunkSize int
}

// NewNetConn creates a new instance of NetConn.
// Responder is invoked by Write when a frame is received.
// Return a zero-value Response/nil error to swallow the frame.
// Return a non-nil error to simulate a write error.
// NOTE: resp is called on a separate goroutine so it MUST NOT access any *testing.T etc
func NewNetConn(resp func(remoteChannel uint16, fr frames.FrameBody) (Response, error)) *NetConn {
func NewNetConn(resp func(remoteChannel uint16, fr frames.FrameBody) (Response, error), opts NetConnOptions) *NetConn {
netConn := &NetConn{
ReadErr: make(chan error),
WriteErr: make(chan error, 1),
opts: opts,
resp: resp,
// during shutdown, connReader can close before connWriter as they both
// both return on c.Done being closed, so there is some non-determinism
Expand Down Expand Up @@ -55,6 +65,7 @@ type NetConn struct {
// Has a buffer of one so setting a pending error won't block.
WriteErr chan error

opts NetConnOptions
resp func(uint16, frames.FrameBody) (Response, error)
readDL readTimer
readData chan []byte
Expand Down Expand Up @@ -99,6 +110,7 @@ type Response struct {

// ChunkSize is the size of chunks to split Payload into.
// A zero or negative value means no chunking.
// This value supercedes the NetConnOptions.ChunkSize.
ChunkSize int
}

Expand Down Expand Up @@ -187,6 +199,11 @@ func (n *NetConn) write() {
// else all we do is stall Conn.connWriter() which doesn't
// actually simulate a delayed response to a frame.
time.Sleep(resp.WriteDelay)

if resp.ChunkSize < 1 {
// no chunk size for this response, fall back to options
resp.ChunkSize = n.opts.ChunkSize
}
if resp.ChunkSize < 1 {
// send in one chunk
resp.ChunkSize = len(resp.Payload)
Expand Down
4 changes: 2 additions & 2 deletions link_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func TestMuxFlowHandlesDrainProperly(t *testing.T) {
}

func newTestLink(t *testing.T) *Receiver {
fakeConn := fake.NewNetConn(receiverFrameHandlerNoUnhandled(0, ReceiverSettleModeFirst))
fakeConn := fake.NewNetConn(receiverFrameHandlerNoUnhandled(0, ReceiverSettleModeFirst), fake.NetConnOptions{})
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
conn, err := NewConn(ctx, fakeConn, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestNewReceivingLink(t *testing.T) {
func TestSessionFlowDisablesTransfer(t *testing.T) {
t.Skip("TODO: finish for link testing")
nextIncomingID := uint32(0)
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled))
netConn := fake.NewNetConn(senderFrameHandlerNoUnhandled(0, SenderSettleModeUnsettled), fake.NetConnOptions{})

ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
client, err := NewConn(ctx, netConn, nil)
Expand Down
Loading

0 comments on commit 1003610

Please sign in to comment.