Skip to content

Commit

Permalink
Event based communication over sockets. (#1294)
Browse files Browse the repository at this point in the history
* Update test request sender to use socket server and client (remove old implementation).
Update test request handler to use the new socket implementation.

* Added few tests for test request handler.

* Pushing changes.

* Refactoring test code.

* Added some more tests.

* Fixed tests.

* removed unused method.

* Addressed some PR comments.

* fixed some PR comments.

* Addressed PR Comments.

* Fixed PR comments.

* Trying to check if unit tests will pass in CI.

* handling testhost manager factory sync.

* Fixing inconsistent tests.

* minor update

* add service guid
  • Loading branch information
navin22 authored and mayankbansal018 committed Dec 11, 2017
1 parent 1ad6c24 commit a89c524
Show file tree
Hide file tree
Showing 36 changed files with 2,176 additions and 2,641 deletions.
2 changes: 1 addition & 1 deletion scripts/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ function invoke_test()
local dotnet=$(_get_dotnet_path)
local vstest=$TP_OUT_DIR/$TPB_Configuration/$TPB_TargetFrameworkCore/vstest.console.dll

find ./test -path $PROJECT_NAME_PATTERNS | xargs $dotnet $vstest --parallel
find ./test -path $PROJECT_NAME_PATTERNS | xargs --verbose $dotnet $vstest --parallel
}

#
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,10 @@ public static DataCollectionRequestHandler Create(
ICommunicationManager communicationManager,
IMessageSink messageSink)
{
ValidateArg.NotNull(communicationManager, nameof(communicationManager));
ValidateArg.NotNull(messageSink, nameof(messageSink));
if (Instance == null)
{
ValidateArg.NotNull(communicationManager, nameof(communicationManager));
ValidateArg.NotNull(messageSink, nameof(messageSink));

lock (SyncObject)
{
if (Instance == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Microsoft.TestPlatform.CommunicationUtilities.UnitTests, PublicKey=002400000480000094000000060200000024000052534131000400000100010007d1fa57c4aed9f0a32e84aa0faefd0de9e8fd6aec8f87fb03766c834c99921eb23be79ad9d5dcc1dd9ad236132102900b723cf980957fc4e177108fc607774f29e8320e92ea05ece4e821c0a5efe8f1645c4c0c93c1ab99285d622caa652c1dfad63d745d6f2de5f17e5eaf0fc4963d261c8a12436518206dc093344d5ad293")]
[assembly: InternalsVisibleTo("Microsoft.TestPlatform.CommunicationUtilities.PlatformTests, PublicKey=002400000480000094000000060200000024000052534131000400000100010007d1fa57c4aed9f0a32e84aa0faefd0de9e8fd6aec8f87fb03766c834c99921eb23be79ad9d5dcc1dd9ad236132102900b723cf980957fc4e177108fc607774f29e8320e92ea05ece4e821c0a5efe8f1645c4c0c93c1ab99285d622caa652c1dfad63d745d6f2de5f17e5eaf0fc4963d261c8a12436518206dc093344d5ad293")]
[assembly: InternalsVisibleTo("vstest.console.UnitTests, PublicKey=002400000480000094000000060200000024000052534131000400000100010007d1fa57c4aed9f0a32e84aa0faefd0de9e8fd6aec8f87fb03766c834c99921eb23be79ad9d5dcc1dd9ad236132102900b723cf980957fc4e177108fc607774f29e8320e92ea05ece4e821c0a5efe8f1645c4c0c93c1ab99285d622caa652c1dfad63d745d6f2de5f17e5eaf0fc4963d261c8a12436518206dc093344d5ad293")]
[assembly: InternalsVisibleTo("Microsoft.TestPlatform.CrossPlatEngine, PublicKey=002400000480000094000000060200000024000052534131000400000100010007d1fa57c4aed9f0a32e84aa0faefd0de9e8fd6aec8f87fb03766c834c99921eb23be79ad9d5dcc1dd9ad236132102900b723cf980957fc4e177108fc607774f29e8320e92ea05ece4e821c0a5efe8f1645c4c0c93c1ab99285d622caa652c1dfad63d745d6f2de5f17e5eaf0fc4963d261c8a12436518206dc093344d5ad293")]
[assembly: InternalsVisibleTo("Microsoft.TestPlatform.CrossPlatEngine.UnitTests, PublicKey=002400000480000094000000060200000024000052534131000400000100010007d1fa57c4aed9f0a32e84aa0faefd0de9e8fd6aec8f87fb03766c834c99921eb23be79ad9d5dcc1dd9ad236132102900b723cf980957fc4e177108fc607774f29e8320e92ea05ece4e821c0a5efe8f1645c4c0c93c1ab99285d622caa652c1dfad63d745d6f2de5f17e5eaf0fc4963d261c8a12436518206dc093344d5ad293")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,28 @@ public ConnectedEventArgs()
public ConnectedEventArgs(ICommunicationChannel channel)
{
this.Channel = channel;
this.Connected = true;
}

public ConnectedEventArgs(Exception faultException)
{
this.Connected = false;
this.Fault = faultException;
}

/// <summary>
/// Gets the communication channel based on this connection.
/// </summary>
public ICommunicationChannel Channel { get; private set; }

/// <summary>
/// Gets true if it's connected.
/// </summary>
public bool Connected { get; private set; }

/// <summary>
/// Gets the exception if it's not connected.
/// </summary>
public Exception Fault { get; private set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces
{
using System;

public interface ICommunicationEndPoint
{
/// <summary>
/// Event raised when an endPoint is connected.
/// </summary>
event EventHandler<ConnectedEventArgs> Connected;

/// <summary>
/// Event raised when an endPoint is disconnected.
/// </summary>
event EventHandler<DisconnectedEventArgs> Disconnected;

/// <summary>
/// Starts the endPoint and channel.
/// </summary>
/// <param name="endPoint">Address to connect</param>
/// <returns>Address of the connected endPoint</returns>
string Start(string endPoint);

/// <summary>
/// Stops the endPoint and closes the underlying communication channel.
/// </summary>
void Stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ public Task NotifyDataAvailable()
// Try read data even if no one is listening to the data stream. Some server
// implementations (like Sockets) depend on the read operation to determine if a
// connection is closed.
var data = this.reader.ReadString();

if (this.MessageReceived != null)
{
var data = this.reader.ReadString();
this.MessageReceived.SafeInvoke(this, new MessageReceivedEventArgs { Data = data }, "LengthPrefixCommunicationChannel: MessageReceived");
}

Expand Down
61 changes: 40 additions & 21 deletions src/Microsoft.TestPlatform.CommunicationUtilities/SocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
/// <summary>
/// Communication client implementation over sockets.
/// </summary>
public class SocketClient : ICommunicationClient
public class SocketClient : ICommunicationEndPoint
{
private readonly CancellationTokenSource cancellation;
private readonly TcpClient tcpClient;
Expand All @@ -41,16 +41,24 @@ protected SocketClient(Func<Stream, ICommunicationChannel> channelFactory)
}

/// <inheritdoc />
public event EventHandler<ConnectedEventArgs> ServerConnected;
public event EventHandler<ConnectedEventArgs> Connected;

/// <inheritdoc />
public event EventHandler<DisconnectedEventArgs> ServerDisconnected;
public event EventHandler<DisconnectedEventArgs> Disconnected;

/// <inheritdoc />
public void Start(string connectionInfo)
public string Start(string endPoint)
{
this.tcpClient.ConnectAsync(IPAddress.Loopback, int.Parse(connectionInfo))
.ContinueWith(this.OnServerConnected);
var ipEndPoint = endPoint.GetIPEndPoint();

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("Waiting for connecting to server");
}

// Don't start if the endPoint port is zero
this.tcpClient.ConnectAsync(ipEndPoint.Address, ipEndPoint.Port).ContinueWith(this.OnServerConnected);
return ipEndPoint.ToString();
}

/// <inheritdoc />
Expand All @@ -65,22 +73,33 @@ public void Stop()

private void OnServerConnected(Task connectAsyncTask)
{
if (connectAsyncTask.IsFaulted)
if (this.Connected != null)
{
throw connectAsyncTask.Exception;
}
if (connectAsyncTask.IsFaulted)
{
this.Connected.SafeInvoke(this, new ConnectedEventArgs(connectAsyncTask.Exception), "SocketClient: ServerConnected");
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("Unable to connect to server, Exception occured : {0}", connectAsyncTask.Exception);
}
}
else
{
this.channel = this.channelFactory(this.tcpClient.GetStream());
this.Connected.SafeInvoke(this, new ConnectedEventArgs(this.channel), "SocketClient: ServerConnected");

this.channel = this.channelFactory(this.tcpClient.GetStream());
if (this.ServerConnected != null)
{
this.ServerConnected.SafeInvoke(this, new ConnectedEventArgs(this.channel), "SocketClient: ServerConnected");

// Start the message loop
Task.Run(() => this.tcpClient.MessageLoopAsync(
this.channel,
this.Stop,
this.cancellation.Token))
.ConfigureAwait(false);
if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("Connected to server, and starting MessageLoopAsync");
}

// Start the message loop
Task.Run(() => this.tcpClient.MessageLoopAsync(
this.channel,
this.Stop,
this.cancellation.Token))
.ConfigureAwait(false);
}
}
}

Expand All @@ -102,7 +121,7 @@ private void Stop(Exception error)
this.channel.Dispose();
this.cancellation.Dispose();

this.ServerDisconnected?.SafeInvoke(this, new DisconnectedEventArgs(), "SocketClient: ServerDisconnected");
this.Disconnected?.SafeInvoke(this, new DisconnectedEventArgs(), "SocketClient: ServerDisconnected");
}
}
}
Expand Down
29 changes: 16 additions & 13 deletions src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
/// <summary>
/// Communication server implementation over sockets.
/// </summary>
public class SocketServer : ICommunicationServer
public class SocketServer : ICommunicationEndPoint
{
private readonly CancellationTokenSource cancellation;

Expand All @@ -35,7 +35,7 @@ public class SocketServer : ICommunicationServer
/// Initializes a new instance of the <see cref="SocketServer"/> class.
/// </summary>
public SocketServer()
: this((stream) => new LengthPrefixCommunicationChannel(stream))
: this(stream => new LengthPrefixCommunicationChannel(stream))
{
}

Expand All @@ -53,21 +53,19 @@ protected SocketServer(Func<Stream, ICommunicationChannel> channelFactory)
}

/// <inheritdoc />
public event EventHandler<ConnectedEventArgs> ClientConnected;
public event EventHandler<ConnectedEventArgs> Connected;

/// <inheritdoc />
public event EventHandler<DisconnectedEventArgs> ClientDisconnected;
public event EventHandler<DisconnectedEventArgs> Disconnected;

/// <inheritdoc />
public string Start()
public string Start(string endPoint)
{
var endpoint = new IPEndPoint(IPAddress.Loopback, 0);
this.tcpListener = new TcpListener(endpoint);
this.tcpListener = new TcpListener(endPoint.GetIPEndPoint());

this.tcpListener.Start();

var connectionInfo = ((IPEndPoint)this.tcpListener.LocalEndpoint).Port.ToString();
EqtTrace.Info("SocketServer: Listening on port : {0}", connectionInfo);
var connectionInfo = ((IPEndPoint)this.tcpListener.LocalEndpoint).ToString();
EqtTrace.Info("SocketServer: Listening on end point : {0}", connectionInfo);

// Serves a single client at the moment. An error in connection, or message loop just
// terminates the entire server.
Expand All @@ -90,10 +88,15 @@ private void OnClientConnected(TcpClient client)
this.tcpClient = client;
this.tcpClient.Client.NoDelay = true;

if (this.ClientConnected != null)
if (this.Connected != null)
{
this.channel = this.channelFactory(this.tcpClient.GetStream());
this.ClientConnected.SafeInvoke(this, new ConnectedEventArgs(this.channel), "SocketServer: ClientConnected");
this.Connected.SafeInvoke(this, new ConnectedEventArgs(this.channel), "SocketServer: ClientConnected");

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("Client connected, and starting MessageLoopAsync");
}

// Start the message loop
Task.Run(() => this.tcpClient.MessageLoopAsync(this.channel, error => this.Stop(error), this.cancellation.Token)).ConfigureAwait(false);
Expand Down Expand Up @@ -121,7 +124,7 @@ private void Stop(Exception error)
this.channel.Dispose();
this.cancellation.Dispose();

this.ClientDisconnected?.SafeInvoke(this, new DisconnectedEventArgs { Error = error }, "SocketServer: ClientDisconnected");
this.Disconnected?.SafeInvoke(this, new DisconnectedEventArgs { Error = error }, "SocketServer: ClientDisconnected");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
{
using System;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -69,5 +70,20 @@ internal static Task MessageLoopAsync(

return Task.FromResult(0);
}

/// <summary>
/// Converts a given string endpoint address to valid Ipv4, Ipv6 IPEndpoint
/// </summary>
/// <param name="value">Input endpoint address</param>
/// <returns>IPEndpoint from give string, if its not a valid string. It will create endpoint with loop back address with port 0</returns>
internal static IPEndPoint GetIPEndPoint(this string value)
{
if (Uri.TryCreate(string.Concat("tcp://", value), UriKind.Absolute, out Uri uri))
{
return new IPEndPoint(IPAddress.Parse(uri.Host), uri.Port < 0 ? 0 : uri.Port);
}

return new IPEndPoint(IPAddress.Loopback, 0);
}
}
}
Loading

0 comments on commit a89c524

Please sign in to comment.