Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add variables for retries in Properties #168

Draft
wants to merge 3 commits into
base: 0.8.2.X
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Client/src/Common/Properties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -319,4 +319,14 @@ public string ConnectionString
/// The TaskOptions to pass to the session or the submission session
/// </summary>
public TaskOptions TaskOptions { get; set; }

/// <summary>
/// Gets or sets the maximum number of retries. Default 5 retries
/// </summary>
public static int MaxRetries { get; set; } = 5;

/// <summary>
/// Gets or sets the time interval between retries. Default 2000 ms
/// </summary>
public static int TimeIntervalRetriesInMs { get; set; } = 2000;
}
72 changes: 50 additions & 22 deletions Client/src/Common/Submitter/BaseClientSubmitter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,16 @@ public void WaitForTasksCompletion(IEnumerable<string> taskIds)
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Retry.WhileException(5,
200,
Retry.WhileException(Properties.MaxRetries,
Properties.TimeIntervalRetriesInMs,
retry =>
{
Logger?.LogDebug("Try {try} for {funcName}",
retry,
nameof(submitterService.WaitForCompletion));
if (retry > 1)
{
Logger?.LogWarning("Try {try} for {funcName}",
retry,
nameof(submitterService.WaitForCompletion));
}

var __ = submitterService.WaitForCompletion(new WaitRequest
{
Expand Down Expand Up @@ -442,8 +445,8 @@ public ResultStatusCollection GetResultStatus(IEnumerable<string> taskIds,
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

var idStatus = Retry.WhileException(5,
200,
var idStatus = Retry.WhileException(Properties.MaxRetries,
Properties.TimeIntervalRetriesInMs,
retry =>
{
Logger?.LogDebug("Try {try} for {funcName}",
Expand Down Expand Up @@ -508,15 +511,36 @@ public ResultStatusCollection GetResultStatus(IEnumerable<string> taskIds,
return resultStatusList;
}

/// <summary>
/// Gets the result ids for a given list of task ids.
/// </summary>
/// <param name="taskIds">The list of task ids.</param>
/// <returns>A collection of map task results.</returns>
public ICollection<GetResultIdsResponse.Types.MapTaskResult> GetResultIds(IEnumerable<string> taskIds)
=> channelPool_.WithChannel(channel => new Tasks.TasksClient(channel).GetResultIds(new GetResultIdsRequest
{
TaskId =
{
taskIds,
},
})
.TaskResults);
=> Retry.WhileException(Properties.MaxRetries,
Properties.TimeIntervalRetriesInMs,
retry =>
{
if (retry > 1)
{
Logger?.LogWarning("Try {try} for {funcName}",
retry,
nameof(GetResultIds));
}

return channelPool_.WithChannel(channel => new Tasks.TasksClient(channel).GetResultIds(new GetResultIdsRequest
{
TaskId =
{
taskIds,
},
})
.TaskResults);
},
true,
typeof(IOException),
typeof(RpcException));


/// <summary>
/// Try to find the result of One task. If there no result, the function return byte[0]
Expand Down Expand Up @@ -545,8 +569,8 @@ public byte[] GetResult(string taskId,
using var channel = channelPool_.GetChannel();
var submitterService = new Api.gRPC.V1.Submitter.Submitter.SubmitterClient(channel);

Retry.WhileException(5,
200,
Retry.WhileException(Properties.MaxRetries,
Properties.TimeIntervalRetriesInMs,
retry =>
{
Logger?.LogDebug("Try {try} for {funcName}",
Expand Down Expand Up @@ -713,13 +737,17 @@ public byte[] TryGetResult(string taskId,
Session = SessionId.Id,
};

var resultReply = Retry.WhileException(5,
200,
var resultReply = Retry.WhileException(Properties.MaxRetries,
Properties.TimeIntervalRetriesInMs,
retry =>
{
Logger?.LogDebug("Try {try} for {funcName}",
retry,
"SubmitterService.TryGetResultAsync");
if (retry > 1)
{
Logger?.LogWarning("Try {try} for {funcName}",
retry,
"SubmitterService.TryGetResultAsync");
}

try
{
var response = TryGetResultAsync(resultRequest,
Expand Down
13 changes: 11 additions & 2 deletions Client/src/Unified/Services/Common/AbstractClientService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ public abstract class AbstractClientService : IDisposable
public AbstractClientService(Properties properties,
[CanBeNull] ILoggerFactory loggerFactory = null)
{
LoggerFactory = loggerFactory;

LoggerFactory = loggerFactory;
Properties = properties;
ResultHandlerDictionary = new ConcurrentDictionary<string, IServiceInvocationHandler>();
}


/// <summary>
/// Instant view of currently handled task ids.
/// The list is only valid at the time of access.
Expand All @@ -36,6 +37,14 @@ public AbstractClientService(Properties properties,
public IReadOnlyCollection<string> CurrentlyHandledTaskIds
=> (IReadOnlyCollection<string>)ResultHandlerDictionary.Keys;

/// <summary>
/// Gets or sets the Properties object.
/// </summary>
/// <value>
/// The Properties object.
/// </value>
protected Properties Properties { get; set; }

/// <summary>
/// The result dictionary to return result
/// </summary>
Expand Down
43 changes: 33 additions & 10 deletions Client/src/Unified/Services/Submitter/Service.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -56,7 +57,7 @@ namespace ArmoniK.DevelopmentKit.Client.Unified.Services.Submitter;
[MarkDownDoc]
public class Service : AbstractClientService, ISubmitterService
{
private const int MaxRetries = 5;
private const int MaxRetries = 10;

// *** you need some mechanism to map types to fields
private static readonly IDictionary<TaskStatus, ArmonikStatusCode> StatusCodesLookUp = new List<Tuple<TaskStatus, ArmonikStatusCode>>
Expand Down Expand Up @@ -209,7 +210,7 @@ public Service(Properties properties,
MaxRetries);

//Delay before submission
Task.Delay(TimeSpan.FromMilliseconds(100));
Task.Delay(TimeSpan.FromMilliseconds(Properties.TimeIntervalRetriesInMs));
}
}

Expand Down Expand Up @@ -558,10 +559,17 @@ public ServiceResult Execute(string methodName,
};
}

/// <summary>
/// Retrieves the results for the given taskIds.
/// </summary>
/// <param name="taskIds">The taskIds to retrieve results for.</param>
/// <param name="responseHandler">The action to take when a response is received.</param>
/// <param name="errorHandler">The action to take when an error occurs.</param>
/// <param name="chunkResultSize">The size of the chunk to retrieve results in.</param>
private void ProxyTryGetResults(IEnumerable<string> taskIds,
Action<string, byte[]> responseHandler,
Action<string, TaskStatus, string> errorHandler,
int chunkResultSize = 500)
int chunkResultSize = 200)
{
var missing = taskIds.ToHashSet();
var holdPrev = missing.Count;
Expand Down Expand Up @@ -590,13 +598,28 @@ private void ProxyTryGetResults(IEnumerable<string> taskIds,
Logger?.LogTrace("Response handler for {taskId}",
resultStatusData.TaskId);
responseHandler(resultStatusData.TaskId,
SessionService.TryGetResultAsync(new ResultRequest
{
ResultId = resultStatusData.ResultId,
Session = SessionId,
},
CancellationToken.None)
.Result);
Retry.WhileException(Properties.MaxRetries,
Properties.TimeIntervalRetriesInMs,
retry =>
{
if (retry > 1)
{
Logger?.LogWarning("Try {try} for {funcName}",
retry,
nameof(SessionService.TryGetResultAsync));
}

return SessionService.TryGetResultAsync(new ResultRequest
{
ResultId = resultStatusData.ResultId,
Session = SessionId,
},
CancellationToken.None)
.Result;
},
true,
typeof(IOException),
typeof(RpcException)));
}
catch (Exception e)
{
Expand Down
115 changes: 114 additions & 1 deletion Common/src/Common/RetryAction.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ArmoniK.DevelopmentKit.Common;

Expand Down Expand Up @@ -142,4 +143,116 @@ public static T WhileException<T>(int retries,
// we're out of retries. If it's unexpected, throwing is the right thing to do anyway
return operation(retries);
}

/// <summary>
/// Retry async the specified operation the specified number of times, until there are no more retries or it succeeded
/// without an exception.
/// </summary>
/// <param name="retries">The number of times to retry the operation</param>
/// <param name="delayMs">The number of milliseconds to sleep after a failed invocation of the operation</param>
/// <param name="operation">the operation to perform</param>
/// <param name="exceptionType">if not null, ignore any exceptions of this type and subtypes</param>
/// <param name="allowDerivedExceptions">
/// If true, exceptions deriving from the specified exception type are ignored as
/// well. Defaults to False
/// </param>
/// <returns>When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown.</returns>
public static async Task WhileExceptionAsync(int retries,
int delayMs,
Action<int> operation,
bool allowDerivedExceptions = false,
params Type[] exceptionType)
{
// Do all but one retries in the loop
for (var retry = 1; retry < retries; retry++)
{
try
{
// Try the operation. If it succeeds, return its result
operation(retry);
return;
}
catch (Exception ex)
{
// Oops - it did NOT succeed!
if (exceptionType != null && allowDerivedExceptions && ex is AggregateException &&
exceptionType.Any(e => ex.InnerException != null && ex.InnerException.GetType() == e))
{
Thread.Sleep(delayMs);
}
else if (exceptionType == null || exceptionType.Any(e => e == ex.GetType()) || (allowDerivedExceptions && exceptionType.Any(e => ex.GetType()
.IsSubclassOf(e))))
{
// Ignore exceptions when exceptionType is not specified OR
// the exception thrown was of the specified exception type OR
// the exception thrown is derived from the specified exception type and we allow that
Thread.Sleep(delayMs);
}
else
{
// We have an unexpected exception! Re-throw it:
throw;
}
}
}
}

/// <summary>
/// Retry async the specified operation the specified number of times, until there are no more retries or it succeeded
/// without an exception.
/// </summary>
/// <typeparam name="T">The return type of the exception</typeparam>
/// <param name="retries">The number of times to retry the operation</param>
/// <param name="delayMs">The number of milliseconds to sleep after a failed invocation of the operation</param>
/// <param name="operation">the operation to perform</param>
/// <param name="exceptionType">if not null, ignore any exceptions of this type and subtypes</param>
/// <param name="allowDerivedExceptions">
/// If true, exceptions deriving from the specified exception type are ignored as
/// well. Defaults to False
/// </param>
/// <returns>When one of the retries succeeds, return the value the operation returned. If not, an exception is thrown.</returns>
public static async Task<T> WhileExceptionAsync<T>(int retries,
int delayMs,
Func<int, Task<T>> operation,
bool allowDerivedExceptions = false,
params Type[] exceptionType)
{
// Do all but one retries in the loop
for (var retry = 1; retry < retries; retry++)
{
try
{
// Try the operation. If it succeeds, return its result
return await operation(retry)
.ConfigureAwait(false);
}
catch (Exception ex)
{
if (exceptionType != null && allowDerivedExceptions && ex is AggregateException &&
exceptionType.Any(e => ex.InnerException != null && ex.InnerException.GetType() == e))
{
Thread.Sleep(delayMs);
}
else if (exceptionType == null || exceptionType.Any(e => e == ex.GetType()) || (allowDerivedExceptions && exceptionType.Any(e => ex.GetType()
.IsSubclassOf(e))))
{
// Ignore exceptions when exceptionType is not specified OR
// the exception thrown was of the specified exception type OR
// the exception thrown is derived from the specified exception type and we allow that
Thread.Sleep(delayMs);
}
else
{
// We have an unexpected exception! Re-throw it:
throw;
}
}
}

// Try the operation one last time. This may or may not succeed.
// Exceptions pass unchanged. If this is an expected exception we need to know about it because
// we're out of retries. If it's unexpected, throwing is the right thing to do anyway
return await operation(retries)
.ConfigureAwait(false);
}
}
Loading