Skip to content

Commit

Permalink
[Port] Fix socket exception on datacollection in parallel (#1514)
Browse files Browse the repository at this point in the history
* Fix socket exception on datacollection in parallel (#1505)

* Add more logging for datacollection manager

* Skip additional messages

* Add more logging

* Add logging for MessageLoopAsync

* Add logging for MessageLoopAsync 2

* Stop the communication server on operation complete

* Add tests and remove not required logging

* Address review comments

* Remove IPEndpoint typecast
  • Loading branch information
smadala committed Apr 2, 2018
1 parent 3307616 commit c36ef30
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public Task Send(string data)
}
catch (Exception ex)
{
EqtTrace.Verbose("LengthPrefixCommunicationChannel: Error sending data: {0}.", ex);
EqtTrace.Error("LengthPrefixCommunicationChannel.Send: Error sending data: {0}.", ex);
throw new CommunicationException("Unable to send data over channel.", ex);
}

Expand All @@ -78,6 +78,7 @@ public Task NotifyDataAvailable()
/// <inheritdoc />
public void Dispose()
{
EqtTrace.Verbose("LengthPrefixCommunicationChannel.Dispose: Dispose reader and writer.");
this.reader.Dispose();
this.writer.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public class SocketClient : ICommunicationEndPoint
private readonly Func<Stream, ICommunicationChannel> channelFactory;
private ICommunicationChannel channel;
private bool stopped;
private string endPoint;

public SocketClient()
: this(stream => new LengthPrefixCommunicationChannel(stream))
Expand All @@ -49,12 +50,10 @@ protected SocketClient(Func<Stream, ICommunicationChannel> channelFactory)
/// <inheritdoc />
public string Start(string endPoint)
{
this.endPoint = endPoint;
var ipEndPoint = endPoint.GetIPEndPoint();

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("Waiting for connecting to server");
}
EqtTrace.Info("SocketClient.Start: connecting to server endpoint: {0}", endPoint);

// Don't start if the endPoint port is zero
this.tcpClient.ConnectAsync(ipEndPoint.Address, ipEndPoint.Port).ContinueWith(this.OnServerConnected);
Expand All @@ -64,6 +63,8 @@ public string Start(string endPoint)
/// <inheritdoc />
public void Stop()
{
EqtTrace.Info("SocketClient.Stop: Stop communication from server endpoint: {0}", this.endPoint);

if (!this.stopped)
{
EqtTrace.Info("SocketClient: Stop: Cancellation requested. Stopping message loop.");
Expand All @@ -73,6 +74,8 @@ public void Stop()

private void OnServerConnected(Task connectAsyncTask)
{
EqtTrace.Info("SocketClient.OnServerConnected: connected to server endpoint: {0}", this.endPoint);

if (this.Connected != null)
{
if (connectAsyncTask.IsFaulted)
Expand Down Expand Up @@ -105,6 +108,8 @@ private void OnServerConnected(Task connectAsyncTask)

private void Stop(Exception error)
{
EqtTrace.Info("SocketClient.PrivateStop: Stop communication from server endpoint: {0}, error:{1}", this.endPoint, error);

if (!this.stopped)
{
// Do not allow stop to be called multiple times.
Expand Down
17 changes: 11 additions & 6 deletions src/Microsoft.TestPlatform.CommunicationUtilities/SocketServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;
using Microsoft.VisualStudio.TestPlatform.Utilities;

/// <summary>
Expand All @@ -31,6 +30,8 @@ public class SocketServer : ICommunicationEndPoint

private bool stopped;

private string endPoint;

/// <summary>
/// Initializes a new instance of the <see cref="SocketServer"/> class.
/// </summary>
Expand Down Expand Up @@ -64,21 +65,22 @@ public string Start(string endPoint)

this.tcpListener.Start();

var connectionInfo = ((IPEndPoint)this.tcpListener.LocalEndpoint).ToString();
EqtTrace.Info("SocketServer: Listening on end point : {0}", connectionInfo);
this.endPoint = this.tcpListener.LocalEndpoint.ToString();
EqtTrace.Info("SocketServer.Start: Listening on endpoint : {0}", this.endPoint);

// Serves a single client at the moment. An error in connection, or message loop just
// terminates the entire server.
this.tcpListener.AcceptTcpClientAsync().ContinueWith(t => this.OnClientConnected(t.Result));
return connectionInfo;
return this.endPoint;
}

/// <inheritdoc />
public void Stop()
{
EqtTrace.Info("SocketServer.Stop: Stop server endPoint: {0}", this.endPoint);
if (!this.stopped)
{
EqtTrace.Info("SocketServer: Stop: Cancellation requested. Stopping message loop.");
EqtTrace.Info("SocketServer.Stop: Cancellation requested. Stopping message loop.");
this.cancellation.Cancel();
}
}
Expand All @@ -95,7 +97,7 @@ private void OnClientConnected(TcpClient client)

if (EqtTrace.IsVerboseEnabled)
{
EqtTrace.Verbose("Client connected, and starting MessageLoopAsync");
EqtTrace.Verbose("SocketServer.OnClientConnected: Client connected for endPoint: {0}, starting MessageLoopAsync:", this.endPoint);
}

// Start the message loop
Expand All @@ -105,6 +107,8 @@ private void OnClientConnected(TcpClient client)

private void Stop(Exception error)
{
EqtTrace.Info("SocketServer.PrivateStop: Stopp server endPoint: {0} error: {1}", this.endPoint, error);

if (!this.stopped)
{
// Do not allow stop to be called multiple times.
Expand All @@ -124,6 +128,7 @@ private void Stop(Exception error)
this.channel.Dispose();
this.cancellation.Dispose();

EqtTrace.Info("SocketServer.Stop: Raise disconnected event endPoint: {0} error: {1}", this.endPoint, error);
this.Disconnected?.SafeInvoke(this, new DisconnectedEventArgs { Error = error }, "SocketServer: ClientDisconnected");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ internal static Task MessageLoopAsync(
CancellationToken cancellationToken)
{
Exception error = null;
var remoteEndPoint = client.Client.RemoteEndPoint.ToString();
var localEndPoint = client.Client.LocalEndPoint.ToString();

// Set read timeout to avoid blocking receive raw message
while (channel != null && !cancellationToken.IsCancellationRequested)
{
EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: Polling on remoteEndPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint);

try
{
if (client.Client.Poll(STREAMREADTIMEOUT, SelectMode.SelectRead))
{
EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: NotifyDataAvailable remoteEndPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint);
channel.NotifyDataAvailable();
}
}
Expand All @@ -43,23 +48,29 @@ internal static Task MessageLoopAsync(
&& socketException.SocketErrorCode == SocketError.TimedOut)
{
EqtTrace.Info(
"Socket: Message loop: failed to receive message due to read timeout {0}",
ioException);
"Socket: Message loop: failed to receive message due to read timeout {0}, remoteEndPoint: {1} localEndPoint: {2}",
ioException,
remoteEndPoint,
localEndPoint);
}
else
{
EqtTrace.Error(
"Socket: Message loop: failed to receive message due to socket error {0}",
ioException);
"Socket: Message loop: failed to receive message due to socket error {0}, remoteEndPoint: {1} localEndPoint: {2}",
ioException,
remoteEndPoint,
localEndPoint);
error = ioException;
break;
}
}
catch (Exception exception)
{
EqtTrace.Error(
"Socket: Message loop: failed to receive message {0}",
exception);
"Socket: Message loop: failed to receive message {0}, remoteEndPoint: {1} localEndPoint: {2}",
exception,
remoteEndPoint,
localEndPoint);
error = exception;
break;
}
Expand All @@ -68,6 +79,8 @@ internal static Task MessageLoopAsync(
// Try clean up and raise client disconnected events
errorHandler(error);

EqtTrace.Verbose("TcpClientExtensions.MessageLoopAsync: exiting MessageLoopAsync remoteEndPoint: {0} localEndPoint: {1}", remoteEndPoint, localEndPoint);

return Task.FromResult(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,7 @@ private void SetOperationComplete()
EqtTrace.Verbose("TestRequestSender.SetOperationComplete: Setting operation complete.");
}

this.communicationEndpoint.Stop();
Interlocked.CompareExchange(ref this.operationCompleted, 1, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ internal class ProxyDataCollectionManager : IProxyDataCollectionManager
private string settingsXml;
private int connectionTimeout;
private IRequestData requestData;
private int dataCollectionPort;
private int dataCollectionProcessId;

/// <summary>
/// Initializes a new instance of the <see cref="ProxyDataCollectionManager"/> class.
Expand Down Expand Up @@ -127,6 +129,7 @@ public Collection<AttachmentSet> AfterTestRunEnd(bool isCanceled, ITestMessageEv
this.InvokeDataCollectionServiceAction(
() =>
{
EqtTrace.Info("ProxyDataCollectionManager.AfterTestRunEnd: Get attachment set for datacollector processId: {0} port: {1}", dataCollectionProcessId, dataCollectionPort);
attachmentSet = this.dataCollectionRequestSender.SendAfterTestRunStartAndGetResult(runEventsHandler, isCanceled);
},
runEventsHandler);
Expand Down Expand Up @@ -160,9 +163,15 @@ public DataCollectionParameters BeforeTestRunStart(
this.InvokeDataCollectionServiceAction(
() =>
{
EqtTrace.Info("ProxyDataCollectionManager.BeforeTestRunStart: Get env variable and port for datacollector processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort);
var result = this.dataCollectionRequestSender.SendBeforeTestRunStartAndGetResult(this.settingsXml, runEventsHandler);
environmentVariables = result.EnvironmentVariables;
dataCollectionEventsPort = result.DataCollectionEventsPort;
EqtTrace.Info(
"ProxyDataCollectionManager.BeforeTestRunStart: SendBeforeTestRunStartAndGetResult successful, env variable from datacollector: {0} and testhost port: {1}",
string.Join(";", environmentVariables),
dataCollectionEventsPort);
},
runEventsHandler);
return new DataCollectionParameters(
Expand All @@ -185,21 +194,27 @@ public void TestHostLaunched(int processId)
/// </summary>
public void Dispose()
{
EqtTrace.Info("ProxyDataCollectionManager.Dispose: calling dospose for datacollector processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort);
this.dataCollectionRequestSender.Close();
}

/// <inheritdoc />
public void Initialize()
{
var port = this.dataCollectionRequestSender.InitializeCommunication();
this.dataCollectionPort = this.dataCollectionRequestSender.InitializeCommunication();

// Warn the user that execution will wait for debugger attach.
var processId = this.dataCollectionLauncher.LaunchDataCollector(null, this.GetCommandLineArguments(port));
this.dataCollectionProcessId = this.dataCollectionLauncher.LaunchDataCollector(null, this.GetCommandLineArguments(this.dataCollectionPort));
EqtTrace.Info("ProxyDataCollectionManager.Initialize: Launched datacollector processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort);

ChangeConnectionTimeoutIfRequired(dataCollectionProcessId);

EqtTrace.Info("ProxyDataCollectionManager.Initialize: waiting for connection with timeout: {0}", this.connectionTimeout);

ChangeConnectionTimeoutIfRequired(processId);
var connected = this.dataCollectionRequestSender.WaitForRequestHandlerConnection(this.connectionTimeout);
if (connected == false)
{
EqtTrace.Error("ProxyDataCollectionManager.Initialize: failed to connect to datacollector process, processId: {0} port: {1}", this.dataCollectionProcessId, this.dataCollectionPort);
throw new TestPlatformException(string.Format(CultureInfo.CurrentUICulture, CrossPlatEngineResources.FailedToConnectDataCollector));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.EventHandlers;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.DataCollection.Interfaces;
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.EventHandlers;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Client;
Expand All @@ -20,7 +19,6 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using Microsoft.VisualStudio.TestPlatform.ObjectModel.Utilities;
using Microsoft.VisualStudio.TestPlatform.Utilities;
using CrossPlatResources = Microsoft.VisualStudio.TestPlatform.CrossPlatEngine.Resources.Resources;
using Microsoft.VisualStudio.TestPlatform.CrossPlatEngine;

public class TestRequestHandler : IDisposable, ITestRequestHandler
{
Expand Down Expand Up @@ -131,14 +129,14 @@ public void Close()
public void SendTestCases(IEnumerable<TestCase> discoveredTestCases)
{
var data = this.dataSerializer.SerializePayload(MessageType.TestCasesFound, discoveredTestCases, this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
public void SendTestRunStatistics(TestRunChangedEventArgs testRunChangedArgs)
{
var data = this.dataSerializer.SerializePayload(MessageType.TestRunStatsChange, testRunChangedArgs, this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
Expand All @@ -148,7 +146,7 @@ public void SendLog(TestMessageLevel messageLevel, string message)
MessageType.TestMessage,
new TestMessagePayload { MessageLevel = messageLevel, Message = message },
this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
Expand All @@ -168,7 +166,7 @@ public void SendExecutionComplete(
ExecutorUris = executorUris
},
this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
Expand All @@ -184,7 +182,7 @@ public void DiscoveryComplete(DiscoveryCompleteEventArgs discoveryCompleteEventA
Metrics = discoveryCompleteEventArgs.Metrics
},
this.protocolVersion);
this.channel.Send(data);
this.SendData(data);
}

/// <inheritdoc />
Expand All @@ -201,7 +199,7 @@ public int LaunchProcessWithDebuggerAttached(TestProcessStartInfo testProcessSta
var data = dataSerializer.SerializePayload(MessageType.LaunchAdapterProcessWithDebuggerAttached,
testProcessStartInfo, protocolVersion);

this.channel.Send(data);
this.SendData(data);

EqtTrace.Verbose("Waiting for LaunchAdapterProcessWithDebuggerAttached ack");
waitHandle.Wait();
Expand Down Expand Up @@ -389,5 +387,10 @@ private void SetCommunicationEndPoint()
}
}

private void SendData(string data)
{
EqtTrace.Verbose("TestRequestHandler.SendData: sending data from testhost: {0}", data);
this.channel.Send(data);
}
}
}
1 change: 1 addition & 0 deletions src/testhost.x86/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public static void Main(string[] args)
finally
{
TestPlatformEventSource.Instance.TestHostStop();
EqtTrace.Info("Testhost process exiting.");
}
}

Expand Down
Loading

0 comments on commit c36ef30

Please sign in to comment.