From 27cb713535138416fb82431cefcbe89d6d3c1114 Mon Sep 17 00:00:00 2001 From: Ami Bar Date: Fri, 16 Jul 2021 11:19:43 +0300 Subject: [PATCH] Added support to .net core 5.0 --- STPTests/STPTests.csproj | 115 +- SmartThreadPool/SmartThreadPool.cs | 3605 ++++++++++++------------ SmartThreadPool/SmartThreadPool.csproj | 124 +- 3 files changed, 1925 insertions(+), 1919 deletions(-) diff --git a/STPTests/STPTests.csproj b/STPTests/STPTests.csproj index 510f26b..34e379f 100644 --- a/STPTests/STPTests.csproj +++ b/STPTests/STPTests.csproj @@ -1,56 +1,59 @@ - - - - net46;netcoreapp3.1 - STPTests - STPTests - TRACE; - Debug;Release;Publish - - - - true - - - - true - true - ..\publish\Keys\STP.snk - - - - true - - - - true - - - - true - true - ..\publish\Keys\STP.snk - - - - true - - - - - - - all - runtime; build; native; contentfiles; analyzers; buildtransitive - - - - - - - - - - - - + + + + net46;netcoreapp3.1;net5.0 + STPTests + STPTests + TRACE; + Debug;Release;Publish + + + + true + + + + true + ..\publish\Keys\STP.snk + + + + true + + + + true + ..\publish\Keys\STP.snk + + + + true + + + + true + ..\publish\Keys\STP.snk + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + + + + + + + diff --git a/SmartThreadPool/SmartThreadPool.cs b/SmartThreadPool/SmartThreadPool.cs index a3cb647..dc067a5 100644 --- a/SmartThreadPool/SmartThreadPool.cs +++ b/SmartThreadPool/SmartThreadPool.cs @@ -1,1801 +1,1804 @@ -#region Release History - -// Smart Thread Pool -// 7 Aug 2004 - Initial release -// -// 14 Sep 2004 - Bug fixes -// -// 15 Oct 2004 - Added new features -// - Work items return result. -// - Support waiting synchronization for multiple work items. -// - Work items can be cancelled. -// - Passage of the caller thread’s context to the thread in the pool. -// - Minimal usage of WIN32 handles. -// - Minor bug fixes. -// -// 26 Dec 2004 - Changes: -// - Removed static constructors. -// - Added finalizers. -// - Changed Exceptions so they are serializable. -// - Fixed the bug in one of the SmartThreadPool constructors. -// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters. -// The SmartThreadPool.WaitAny() is still limited by the .NET Framework. -// - Added PostExecute with options on which cases to call it. -// - Added option to dispose of the state objects. -// - Added a WaitForIdle() method that waits until the work items queue is empty. -// - Added an STPStartInfo class for the initialization of the thread pool. -// - Changed exception handling so if a work item throws an exception it -// is rethrown at GetResult(), rather then firing an UnhandledException event. -// Note that PostExecute exception are always ignored. -// -// 25 Mar 2005 - Changes: -// - Fixed lost of work items bug -// -// 3 Jul 2005: Changes. -// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed. -// -// 16 Aug 2005: Changes. -// - Fixed bug where the InUseThreads becomes negative when canceling work items. -// -// 31 Jan 2006 - Changes: -// - Added work items priority -// - Removed support of chained delegates in callbacks and post executes (nobody really use this) -// - Added work items groups -// - Added work items groups idle event -// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array -// it returns true rather then throwing an exception. -// - Added option to start the STP and the WIG as suspended -// - Exception behavior changed, the real exception is returned by an -// inner exception -// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.) -// - Added performance counters -// - Added priority to the threads in the pool -// -// 13 Feb 2006 - Changes: -// - Added a call to the dispose of the Performance Counter so -// their won't be a Performance Counter leak. -// - Added exception catch in case the Performance Counters cannot -// be created. -// -// 17 May 2008 - Changes: -// - Changed the dispose behavior and removed the Finalizers. -// - Enabled the change of the MaxThreads and MinThreads at run time. -// - Enabled the change of the Concurrency of a IWorkItemsGroup at run -// time If the IWorkItemsGroup is a SmartThreadPool then the Concurrency -// refers to the MaxThreads. -// - Improved the cancel behavior. -// - Added events for thread creation and termination. -// - Fixed the HttpContext context capture. -// - Changed internal collections so they use generic collections -// - Added IsIdle flag to the SmartThreadPool and IWorkItemsGroup -// - Added support for WinCE -// - Added support for Action and Func -// -// 07 April 2009 - Changes: -// - Added support for Silverlight and Mono -// - Added Join, Choice, and Pipe to SmartThreadPool. -// - Added local performance counters (for Mono, Silverlight, and WindowsCE) -// - Changed duration measures from DateTime.Now to Stopwatch. -// - Queues changed from System.Collections.Queue to System.Collections.Generic.LinkedList. -// -// 21 December 2009 - Changes: -// - Added work item timeout (passive) -// -// 20 August 2012 - Changes: -// - Added set name to threads -// - Fixed the WorkItemsQueue.Dequeue. -// Replaced while (!Monitor.TryEnter(this)); with lock(this) { ... } -// - Fixed SmartThreadPool.Pipe -// - Added IsBackground option to threads -// - Added ApartmentState to threads -// - Fixed thread creation when queuing many work items at the same time. -// -// 24 August 2012 - Changes: -// - Enabled cancel abort after cancel. See: http://smartthreadpool.codeplex.com/discussions/345937 by alecswan -// - Added option to set MaxStackSize of threads -// -// 16 September 2016 - Changes: -// - Separated the STP project to .NET 2.0, .NET 4.0, and .NET 4.5 -// - Added option to set MaxQueueLength (Thanks to Rob Hruska) -// -// 31 May 2019 - Changes: -// - Added .Net Standard 2.0 support -// -// 24 Feb 2020 - Changes: -// - Added .Net Core 3.x support -// * Removed the use of Thread.Abort(). The Shutdown method doesn't get forceAbort argument in the .net core versions -// * Fixed #if for .net core, .net standard, and .net framework. -// * Enabled tests to run on .net core too -// * Fixed/Removed tests which depend on Thread.Abort. -// - -#endregion - -using System; -using System.Security; -using System.Threading; -using System.Collections; -using System.Collections.Generic; -using System.Diagnostics; -using System.Runtime.CompilerServices; - -using Amib.Threading.Internal; -using Stopwatch = System.Diagnostics.Stopwatch; - -namespace Amib.Threading -{ - #region SmartThreadPool class - /// - /// Smart thread pool class. - /// - public partial class SmartThreadPool : WorkItemsGroupBase, IDisposable - { - #region Public Default Constants - - /// - /// Default minimum number of threads the thread pool contains. (0) - /// - public const int DefaultMinWorkerThreads = 0; - - /// - /// Default maximum number of threads the thread pool contains. (25) - /// - public const int DefaultMaxWorkerThreads = 25; - - /// - /// Default idle timeout in milliseconds. (One minute) - /// - public const int DefaultIdleTimeout = 60*1000; // One minute - - /// - /// Indicate to copy the security context of the caller and then use it in the call. (false) - /// - public const bool DefaultUseCallerCallContext = false; - - /// - /// Indicate to copy the HTTP context of the caller and then use it in the call. (false) - /// - public const bool DefaultUseCallerHttpContext = false; - - /// - /// Indicate to dispose of the state objects if they support the IDispose interface. (false) - /// - public const bool DefaultDisposeOfStateObjects = false; - - /// - /// The default option to run the post execute (CallToPostExecute.Always) - /// - public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always; - - /// - /// The default post execute method to run. (None) - /// When null it means not to call it. - /// - public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback; - - /// - /// The default work item priority (WorkItemPriority.Normal) - /// - public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal; - - /// - /// The default is to work on work items as soon as they arrive - /// and not to wait for the start. (false) - /// - public const bool DefaultStartSuspended = false; - - /// - /// The default name to use for the performance counters instance. (null) - /// - public static readonly string DefaultPerformanceCounterInstanceName; - - /// - /// The default thread priority (ThreadPriority.Normal) - /// - public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal; - - /// - /// The default thread pool name. (SmartThreadPool) - /// - public const string DefaultThreadPoolName = "SmartThreadPool"; - - /// - /// The default Max Stack Size. (null) - /// - public static readonly int? DefaultMaxStackSize = null; - - /// - /// The default Max Queue Length (null). - /// - public static readonly int? DefaultMaxQueueLength = null; - - /// - /// The default fill state with params. (false) - /// It is relevant only to QueueWorkItem of Action<...>/Func<...> - /// - public const bool DefaultFillStateWithArgs = false; - - /// - /// The default thread backgroundness. (true) - /// - public const bool DefaultAreThreadsBackground = true; - - /// - /// The default apartment state of a thread in the thread pool. - /// The default is ApartmentState.Unknown which means the STP will not - /// set the apartment of the thread. It will use the .NET default. - /// - public const ApartmentState DefaultApartmentState = ApartmentState.Unknown; - - #endregion - - #region Member Variables - - /// - /// Dictionary of all the threads in the thread pool. - /// - private readonly SynchronizedDictionary _workerThreads = new SynchronizedDictionary(); - - /// - /// Queue of work items. - /// - private readonly WorkItemsQueue _workItemsQueue = new WorkItemsQueue(); - - /// - /// Count the work items handled. - /// Used by the performance counter. - /// - private int _workItemsProcessed; - - /// - /// Number of threads that currently work (not idle). - /// - private int _inUseWorkerThreads; - - /// - /// Stores a copy of the original STPStartInfo. - /// It is used to change the MinThread and MaxThreads - /// - private STPStartInfo _stpStartInfo; - - /// - /// Total number of work items that are stored in the work items queue - /// plus the work items that the threads in the pool are working on. - /// - private volatile int _currentWorkItemsCount; - - /// - /// Signaled when the thread pool is idle, i.e. no thread is busy - /// and the work items queue is empty - /// - //private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); - private ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); - - /// - /// An event to signal all the threads to quit immediately. - /// - //private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false); - private ManualResetEvent _shuttingDownEvent = EventWaitHandleFactory.CreateManualResetEvent(false); - - /// - /// A flag to indicate if the Smart Thread Pool is now suspended. - /// - private bool _isSuspended; - - /// - /// A flag to indicate the threads to quit. - /// - private bool _shutdown; - - /// - /// Counts the threads created in the pool. - /// It is used to name the threads. - /// - private int _threadCounter; - - /// - /// Indicate that the SmartThreadPool has been disposed - /// - private bool _isDisposed; - - /// - /// Holds all the WorkItemsGroup instaces that have at least one - /// work item int the SmartThreadPool - /// This variable is used in case of Shutdown - /// - private readonly SynchronizedDictionary _workItemsGroups = new SynchronizedDictionary(); - - /// - /// A common object for all the work items int the STP - /// so we can mark them to cancel in O(1) - /// - private CanceledWorkItemsGroup _canceledSmartThreadPool = new CanceledWorkItemsGroup(); - - /// - /// Windows STP performance counters - /// - private ISTPInstancePerformanceCounters _windowsPCs = NullSTPInstancePerformanceCounters.Instance; - - /// - /// Local STP performance counters - /// - private ISTPInstancePerformanceCounters _localPCs = NullSTPInstancePerformanceCounters.Instance; - - [ThreadStatic] - private static ThreadEntry _threadEntry; - - /// - /// An event to call after a thread is created, but before - /// it's first use. - /// - private event ThreadInitializationHandler _onThreadInitialization; - - /// - /// An event to call when a thread is about to exit, after - /// it is no longer belong to the pool. - /// - private event ThreadTerminationHandler _onThreadTermination; - - #endregion - - #region Per thread properties - - /// - /// A reference to the current work item a thread from the thread pool - /// is executing. - /// - internal static ThreadEntry CurrentThreadEntry - { - get - { - return _threadEntry; - } - set - { - _threadEntry = value; - } - } - - #endregion - - #region Construction and Finalization - - /// - /// Constructor - /// - public SmartThreadPool() - { - _stpStartInfo = new STPStartInfo(); - Initialize(); - } - - /// - /// Constructor - /// - /// Idle timeout in milliseconds - public SmartThreadPool(int idleTimeout) - { - _stpStartInfo = new STPStartInfo - { - IdleTimeout = idleTimeout, - }; - Initialize(); - } - - /// - /// Constructor - /// - /// Set it to True to start thread pool in suspended mode; Explicit call to Start() will be needed to start the Thread pool. - public SmartThreadPool(bool startSuspended) - { - _stpStartInfo = new STPStartInfo - { - StartSuspended = startSuspended, - }; - Initialize(); - } - - /// - /// Constructor - /// - /// Idle timeout in milliseconds - /// Upper limit of threads in the pool - public SmartThreadPool( - int idleTimeout, - int maxWorkerThreads) - { - _stpStartInfo = new STPStartInfo - { - IdleTimeout = idleTimeout, - MaxWorkerThreads = maxWorkerThreads, - }; - Initialize(); - } - - /// - /// Constructor - /// - /// Idle timeout in milliseconds - /// Upper limit of threads in the pool - /// Lower limit of threads in the pool - public SmartThreadPool( - int idleTimeout, - int maxWorkerThreads, - int minWorkerThreads) - { - _stpStartInfo = new STPStartInfo - { - IdleTimeout = idleTimeout, - MaxWorkerThreads = maxWorkerThreads, - MinWorkerThreads = minWorkerThreads, - }; - Initialize(); - } - - /// - /// Constructor - /// - /// A SmartThreadPool configuration that overrides the default behavior - public SmartThreadPool(STPStartInfo stpStartInfo) - { - _stpStartInfo = new STPStartInfo(stpStartInfo); - Initialize(); - } - - private void Initialize() - { - Name = _stpStartInfo.ThreadPoolName; - ValidateSTPStartInfo(); - - // _stpStartInfoRW stores a read/write copy of the STPStartInfo. - // Actually only MaxWorkerThreads and MinWorkerThreads are overwritten - - _isSuspended = _stpStartInfo.StartSuspended; - -#if !(NETFRAMEWORK) - if (null != _stpStartInfo.PerformanceCounterInstanceName) - { - throw new NotSupportedException("Performance counters are not implemented for Compact Framework/Silverlight/Mono, instead use StpStartInfo.EnableLocalPerformanceCounters"); - } -#else - if (null != _stpStartInfo.PerformanceCounterInstanceName) - { - try - { - _windowsPCs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName); - } - catch (Exception e) - { - Debug.WriteLine("Unable to create Performance Counters: " + e); - _windowsPCs = NullSTPInstancePerformanceCounters.Instance; - } - } -#endif - - if (_stpStartInfo.EnableLocalPerformanceCounters) - { - _localPCs = new LocalSTPInstancePerformanceCounters(); - } - - // If the STP is not started suspended then start the threads. - if (!_isSuspended) - { - StartOptimalNumberOfThreads(); - } - } - - private void StartOptimalNumberOfThreads() - { - int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads); - threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads); - threadsCount -= _workerThreads.Count; - if (threadsCount > 0) - { - StartThreads(threadsCount); - } - } - - private void ValidateSTPStartInfo() - { - if (_stpStartInfo.MinWorkerThreads < 0) - { - throw new ArgumentOutOfRangeException( - "MinWorkerThreads", "MinWorkerThreads cannot be negative"); - } - - if (_stpStartInfo.MaxWorkerThreads <= 0) - { - throw new ArgumentOutOfRangeException( - "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero"); - } - - if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads) - { - throw new ArgumentOutOfRangeException( - "MinWorkerThreads, maxWorkerThreads", - "MaxWorkerThreads must be greater or equal to MinWorkerThreads"); - } - - if (_stpStartInfo.MaxQueueLength < 0) - { - throw new ArgumentOutOfRangeException( - "MaxQueueLength", - "MaxQueueLength must be >= 0 or null (for unbounded)"); - } - } - - private static void ValidateCallback(Delegate callback) - { - if(callback.GetInvocationList().Length > 1) - { - throw new NotSupportedException("SmartThreadPool doesn't support delegates chains"); - } - } - - #endregion - - #region Thread Processing - - /// - /// Waits on the queue for a work item, shutdown, or timeout. - /// - /// - /// Returns the WaitingCallback or null in case of timeout or shutdown. - /// - private WorkItem Dequeue() - { - WorkItem workItem = - _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent); - - return workItem; - } - - /// - /// Put a new work item in the queue - /// - /// A work item to queue - internal override void Enqueue(WorkItem workItem) - { - // Make sure the workItem is not null - Debug.Assert(null != workItem); - - IncrementWorkItemsCount(); - - workItem.CanceledSmartThreadPool = _canceledSmartThreadPool; - _workItemsQueue.EnqueueWorkItem(workItem); - workItem.WorkItemIsQueued(); - - // If all the threads are busy then try to create a new one - if (_currentWorkItemsCount > _workerThreads.Count) - { - StartThreads(1); - } - } - - private void IncrementWorkItemsCount() - { - _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); - _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); - - int count = Interlocked.Increment(ref _currentWorkItemsCount); - //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString()); - if (count == 1) - { - IsIdle = false; - _isIdleWaitHandle.Reset(); - } - } - - private void DecrementWorkItemsCount() - { - int count = Interlocked.Decrement(ref _currentWorkItemsCount); - //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString()); - if (count == 0) - { - IsIdle = true; - _isIdleWaitHandle.Set(); - } - - Interlocked.Increment(ref _workItemsProcessed); - - if (!_shutdown) - { - // The counter counts even if the work item was cancelled - _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); - _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); - } - - } - - internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) - { - _workItemsGroups[workItemsGroup] = workItemsGroup; - } - - internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) - { - if (_workItemsGroups.Contains(workItemsGroup)) - { - _workItemsGroups.Remove(workItemsGroup); - } - } - - /// - /// Inform that the current thread is about to quit or quiting. - /// The same thread may call this method more than once. - /// - private void InformCompleted() - { - // There is no need to lock the two methods together - // since only the current thread removes itself - // and the _workerThreads is a synchronized dictionary - if (_workerThreads.Contains(Thread.CurrentThread)) - { - _workerThreads.Remove(Thread.CurrentThread); - _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); - _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); - } - } - - /// - /// Starts new threads - /// - /// The number of threads to start - private void StartThreads(int threadsCount) - { - if (_isSuspended) - { - return; - } - - lock(_workerThreads.SyncRoot) - { - // Don't start threads on shut down - if (_shutdown) - { - return; - } - - for(int i = 0; i < threadsCount; ++i) - { - // Don't create more threads then the upper limit - if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads) - { - return; - } - - // Create a new thread - - Thread workerThread = - _stpStartInfo.MaxStackSize.HasValue - ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value) - : new Thread(ProcessQueuedItems); - // Configure the new thread and start it - workerThread.Name = "STP " + Name + " Thread #" + _threadCounter; - workerThread.IsBackground = _stpStartInfo.AreThreadsBackground; - - if (_stpStartInfo.ApartmentState != ApartmentState.Unknown) - { - workerThread.SetApartmentState(_stpStartInfo.ApartmentState); - } - - workerThread.Priority = _stpStartInfo.ThreadPriority; - workerThread.Start(); - ++_threadCounter; - - // Add it to the dictionary and update its creation time. - _workerThreads[workerThread] = new ThreadEntry(this); - - _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); - _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); - } - } - } - - /// - /// A worker thread method that processes work items from the work items queue. - /// - private void ProcessQueuedItems() - { - // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks - // of the dictionary. - CurrentThreadEntry = _workerThreads[Thread.CurrentThread]; - - FireOnThreadInitialization(); - - try - { - bool bInUseWorkerThreadsWasIncremented = false; - - // Process until shutdown. - while(!_shutdown) - { - // Update the last time this thread was seen alive. - // It's good for debugging. - CurrentThreadEntry.IAmAlive(); - - // The following block handles the when the MaxWorkerThreads has been - // incremented by the user at run-time. - // Double lock for quit. - if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads) - { - lock (_workerThreads.SyncRoot) - { - if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads) - { - // Inform that the thread is quiting and then quit. - // This method must be called within this lock or else - // more threads will quit and the thread pool will go - // below the lower limit. - InformCompleted(); - break; - } - } - } - - // Wait for a work item, shutdown, or timeout - WorkItem workItem = Dequeue(); - - // Update the last time this thread was seen alive. - // It's good for debugging. - CurrentThreadEntry.IAmAlive(); - - // On timeout or shut down. - if (null == workItem) - { - // Double lock for quit. - if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) - { - lock(_workerThreads.SyncRoot) - { - if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) - { - // Inform that the thread is quiting and then quit. - // This method must be called within this lock or else - // more threads will quit and the thread pool will go - // below the lower limit. - InformCompleted(); - break; - } - } - } - } - - // If we didn't quit then skip to the next iteration. - if (null == workItem) - { - continue; - } - - try - { - // Initialize the value to false - bInUseWorkerThreadsWasIncremented = false; - - // Set the Current Work Item of the thread. - // Store the Current Work Item before the workItem.StartingWorkItem() is called, - // so WorkItem.Cancel can work when the work item is between InQueue and InProgress - // states. - // If the work item has been cancelled BEFORE the workItem.StartingWorkItem() - // (work item is in InQueue state) then workItem.StartingWorkItem() will return false. - // If the work item has been cancelled AFTER the workItem.StartingWorkItem() then - // (work item is in InProgress state) then the thread will be aborted - CurrentThreadEntry.CurrentWorkItem = workItem; - - // Change the state of the work item to 'in progress' if possible. - // We do it here so if the work item has been canceled we won't - // increment the _inUseWorkerThreads. - // The cancel mechanism doesn't delete items from the queue, - // it marks the work item as canceled, and when the work item - // is dequeued, we just skip it. - // If the post execute of work item is set to always or to - // call when the work item is canceled then the StartingWorkItem() - // will return true, so the post execute can run. - if (!workItem.StartingWorkItem()) - { - continue; - } - - // Execute the callback. Make sure to accurately - // record how many callbacks are currently executing. - int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads); - _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); - _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); - - // Mark that the _inUseWorkerThreads incremented, so in the finally{} - // statement we will decrement it correctly. - bInUseWorkerThreadsWasIncremented = true; - - workItem.FireWorkItemStarted(); - - ExecuteWorkItem(workItem); - } - catch(Exception ex) - { - ex.GetHashCode(); - // Do nothing - } - finally - { - workItem.DisposeOfState(); - - // Set the CurrentWorkItem to null, since we - // no longer run user's code. - CurrentThreadEntry.CurrentWorkItem = null; - - // Decrement the _inUseWorkerThreads only if we had - // incremented it. Note the cancelled work items don't - // increment _inUseWorkerThreads. - if (bInUseWorkerThreadsWasIncremented) - { - int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads); - _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); - _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); - } - - // Notify that the work item has been completed. - // WorkItemsGroup may enqueue their next work item. - workItem.FireWorkItemCompleted(); - - // Decrement the number of work items here so the idle - // ManualResetEvent won't fluctuate. - DecrementWorkItemsCount(); - } - } - } - catch(ThreadAbortException tae) - { - tae.GetHashCode(); - // Handle the abort exception gracfully. - Thread.ResetAbort(); - } - catch(Exception e) - { - Debug.Assert(null != e); - } - finally - { - InformCompleted(); - FireOnThreadTermination(); - } - } - - private void ExecuteWorkItem(WorkItem workItem) - { - _windowsPCs.SampleWorkItemsWaitTime(workItem.WaitingTime); - _localPCs.SampleWorkItemsWaitTime(workItem.WaitingTime); - try - { - workItem.Execute(); - } - finally - { - _windowsPCs.SampleWorkItemsProcessTime(workItem.ProcessTime); - _localPCs.SampleWorkItemsProcessTime(workItem.ProcessTime); - } - } - - - #endregion - - #region Public Methods - - private void ValidateWaitForIdle() - { - if (null != CurrentThreadEntry && CurrentThreadEntry.AssociatedSmartThreadPool == this) - { - throw new NotSupportedException( - "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock"); - } - } - - internal static void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup) - { - if (null == CurrentThreadEntry) - { - return; - } - - WorkItem workItem = CurrentThreadEntry.CurrentWorkItem; - ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, workItem); - if ((null != workItemsGroup) && - (null != workItem) && - CurrentThreadEntry.CurrentWorkItem.WasQueuedBy(workItemsGroup)) - { - throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock"); - } - } - - [MethodImpl(MethodImplOptions.NoInlining)] - private static void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem) - { - if ((null != workItemsGroup) && - (null != workItem) && - workItem.WasQueuedBy(workItemsGroup)) - { - throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock"); - } - } - - /// - /// Force the SmartThreadPool to shutdown - /// Doesn't use Thread.Abort - /// - public void Shutdown() - { - ShutdownImpl(false, 0); - } - - // Thread.Abort is not supported in .net core - - /// - /// Force the SmartThreadPool to shutdown with timeout - /// Doesn't use Thread.Abort - /// - public void Shutdown(TimeSpan timeout) - { - ShutdownImpl(false, (int)timeout.TotalMilliseconds); - } - - /// - /// Empties the queue of work items and abort the threads in the pool. - /// Doesn't use Thread.Abort - /// - public void Shutdown(int millisecondsTimeout) - { - ShutdownImpl(false, millisecondsTimeout); - } - -#if !(NETCOREAPP) - - /// - /// Force the SmartThreadPool to shutdown with timeout - /// - public void Shutdown(bool forceAbort) - { - ShutdownImpl(forceAbort, 0); - } - - /// - /// Force the SmartThreadPool to shutdown with timeout - /// - public void Shutdown(bool forceAbort, TimeSpan timeout) - { - ShutdownImpl(forceAbort, (int)timeout.TotalMilliseconds); - } - - /// - /// Empties the queue of work items and abort the threads in the pool. - /// - public void Shutdown(bool forceAbort, int millisecondsTimeout) - { - ShutdownImpl(forceAbort, millisecondsTimeout); - } -#endif - private void ShutdownImpl(bool forceAbort, int millisecondsTimeout) - { - ValidateNotDisposed(); - - ISTPInstancePerformanceCounters pcs = _windowsPCs; - - if (NullSTPInstancePerformanceCounters.Instance != _windowsPCs) - { - // Set the _pcs to "null" to stop updating the performance - // counters - _windowsPCs = NullSTPInstancePerformanceCounters.Instance; - - pcs.Dispose(); - } - - Thread [] threads; - lock(_workerThreads.SyncRoot) - { - // Shutdown the work items queue - _workItemsQueue.Dispose(); - - // Signal the threads to exit - _shutdown = true; - _shuttingDownEvent.Set(); - - // Make a copy of the threads' references in the pool - threads = new Thread [_workerThreads.Count]; - _workerThreads.Keys.CopyTo(threads, 0); - } - - int millisecondsLeft = millisecondsTimeout; - Stopwatch stopwatch = Stopwatch.StartNew(); - //DateTime start = DateTime.UtcNow; - bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout); - bool timeout = false; - - // Each iteration we update the time left for the timeout. - foreach(Thread thread in threads) - { - // Join don't work with negative numbers - if (!waitInfinitely && (millisecondsLeft < 0)) - { - timeout = true; - break; - } - - // Wait for the thread to terminate - bool success = thread.Join(millisecondsLeft); - if(!success) - { - timeout = true; - break; - } - - if(!waitInfinitely) - { - // Update the time left to wait - //TimeSpan ts = DateTime.UtcNow - start; - millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds; - } - } - - if (timeout && forceAbort) - { - // Abort the threads in the pool - foreach(Thread thread in threads) - { - - if ((thread != null) && thread.IsAlive) - { - try - { - thread.Abort(); // Shutdown - } - catch(SecurityException e) - { - e.GetHashCode(); - } - catch(ThreadStateException ex) - { - ex.GetHashCode(); - // In case the thread has been terminated - // after the check if it is alive. - } - } - } - } - } - - /// - /// Wait for all work items to complete - /// - /// Array of work item result objects - /// - /// true when every work item in workItemResults has completed; otherwise false. - /// - public static bool WaitAll( - IWaitableResult [] waitableResults) - { - return WaitAll(waitableResults, Timeout.Infinite, true); - } - - /// - /// Wait for all work items to complete - /// - /// Array of work item result objects - /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. - /// - /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. - /// - /// - /// true when every work item in workItemResults has completed; otherwise false. - /// - public static bool WaitAll( - IWaitableResult [] waitableResults, - TimeSpan timeout, - bool exitContext) - { - return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext); - } - - /// - /// Wait for all work items to complete - /// - /// Array of work item result objects - /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. - /// - /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. - /// - /// A cancel wait handle to interrupt the wait if needed - /// - /// true when every work item in workItemResults has completed; otherwise false. - /// - public static bool WaitAll( - IWaitableResult[] waitableResults, - TimeSpan timeout, - bool exitContext, - WaitHandle cancelWaitHandle) - { - return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); - } - - /// - /// Wait for all work items to complete - /// - /// Array of work item result objects - /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. - /// - /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. - /// - /// - /// true when every work item in workItemResults has completed; otherwise false. - /// - public static bool WaitAll( - IWaitableResult [] waitableResults, - int millisecondsTimeout, - bool exitContext) - { - return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, null); - } - - /// - /// Wait for all work items to complete - /// - /// Array of work item result objects - /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. - /// - /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. - /// - /// A cancel wait handle to interrupt the wait if needed - /// - /// true when every work item in workItemResults has completed; otherwise false. - /// - public static bool WaitAll( - IWaitableResult[] waitableResults, - int millisecondsTimeout, - bool exitContext, - WaitHandle cancelWaitHandle) - { - return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle); - } - - - /// - /// Waits for any of the work items in the specified array to complete, cancel, or timeout - /// - /// Array of work item result objects - /// - /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled. - /// - public static int WaitAny( - IWaitableResult [] waitableResults) - { - return WaitAny(waitableResults, Timeout.Infinite, true); - } - - /// - /// Waits for any of the work items in the specified array to complete, cancel, or timeout - /// - /// Array of work item result objects - /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. - /// - /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. - /// - /// - /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. - /// - public static int WaitAny( - IWaitableResult[] waitableResults, - TimeSpan timeout, - bool exitContext) - { - return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext); - } - - /// - /// Waits for any of the work items in the specified array to complete, cancel, or timeout - /// - /// Array of work item result objects - /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. - /// - /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. - /// - /// A cancel wait handle to interrupt the wait if needed - /// - /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. - /// - public static int WaitAny( - IWaitableResult [] waitableResults, - TimeSpan timeout, - bool exitContext, - WaitHandle cancelWaitHandle) - { - return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); - } - - /// - /// Waits for any of the work items in the specified array to complete, cancel, or timeout - /// - /// Array of work item result objects - /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. - /// - /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. - /// - /// - /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. - /// - public static int WaitAny( - IWaitableResult [] waitableResults, - int millisecondsTimeout, - bool exitContext) - { - return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, null); - } - - /// - /// Waits for any of the work items in the specified array to complete, cancel, or timeout - /// - /// Array of work item result objects - /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. - /// - /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. - /// - /// A cancel wait handle to interrupt the wait if needed - /// - /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. - /// - public static int WaitAny( - IWaitableResult [] waitableResults, - int millisecondsTimeout, - bool exitContext, - WaitHandle cancelWaitHandle) - { - return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle); - } - - /// - /// Creates a new WorkItemsGroup. - /// - /// The number of work items that can be run concurrently - /// A reference to the WorkItemsGroup - public IWorkItemsGroup CreateWorkItemsGroup(int concurrency) - { - IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo); - return workItemsGroup; - } - - /// - /// Creates a new WorkItemsGroup. - /// - /// The number of work items that can be run concurrently - /// A WorkItemsGroup configuration that overrides the default behavior - /// A reference to the WorkItemsGroup - public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo) - { - IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo); - return workItemsGroup; - } - - #region Fire Thread's Events - - private void FireOnThreadInitialization() - { - if (null != _onThreadInitialization) - { - foreach (ThreadInitializationHandler tih in _onThreadInitialization.GetInvocationList()) - { - try - { - tih(); - } - catch (Exception e) - { - e.GetHashCode(); - Debug.Assert(false); - throw; - } - } - } - } - - private void FireOnThreadTermination() - { - if (null != _onThreadTermination) - { - foreach (ThreadTerminationHandler tth in _onThreadTermination.GetInvocationList()) - { - try - { - tth(); - } - catch (Exception e) - { - e.GetHashCode(); - Debug.Assert(false); - throw; - } - } - } - } - - #endregion - - /// - /// This event is fired when a thread is created. - /// Use it to initialize a thread before the work items use it. - /// - public event ThreadInitializationHandler OnThreadInitialization - { - add { _onThreadInitialization += value; } - remove { _onThreadInitialization -= value; } - } - - /// - /// This event is fired when a thread is terminating. - /// Use it for cleanup. - /// - public event ThreadTerminationHandler OnThreadTermination - { - add { _onThreadTermination += value; } - remove { _onThreadTermination -= value; } - } - - - internal void CancelAbortWorkItemsGroup(WorkItemsGroup wig) - { - foreach (ThreadEntry threadEntry in _workerThreads.Values) - { - WorkItem workItem = threadEntry.CurrentWorkItem; - if (null != workItem && - workItem.WasQueuedBy(wig) && - !workItem.IsCanceled) - { - threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true); - } - } - } - - private void ValidateQueueIsWithinLimits() - { - // Keep a local copy; if a client changes the length while this is executing, - // we'll want to use the same value throughout. - var maxQueueLength = _stpStartInfo.MaxQueueLength; - - if (maxQueueLength == null) - { - return; - } - - // Instead of just looking at the current queue length here, account for the - // fact that the pool is going to scale up its threads if it's not yet at its - // maximum and there are queued items. This means that the queue length limit - // may be briefly exceeded while the pool is scaling up. - if (_currentWorkItemsCount >= maxQueueLength + MaxThreads) - { - throw new QueueRejectedException("Queue is at its maximum (" + maxQueueLength + ")"); - } - } - - #endregion - - #region Properties - - /// - /// Get/Set the lower limit of threads in the pool. - /// - public int MinThreads - { - get - { - ValidateNotDisposed(); - return _stpStartInfo.MinWorkerThreads; - } - set - { - Debug.Assert(value >= 0); - Debug.Assert(value <= _stpStartInfo.MaxWorkerThreads); - if (_stpStartInfo.MaxWorkerThreads < value) - { - _stpStartInfo.MaxWorkerThreads = value; - } - _stpStartInfo.MinWorkerThreads = value; - StartOptimalNumberOfThreads(); - } - } - - /// - /// Get/Set the upper limit of threads in the pool. - /// - public int MaxThreads - { - get - { - ValidateNotDisposed(); - return _stpStartInfo.MaxWorkerThreads; - } - - set - { - Debug.Assert(value > 0); - Debug.Assert(value >= _stpStartInfo.MinWorkerThreads); - if (_stpStartInfo.MinWorkerThreads > value) - { - _stpStartInfo.MinWorkerThreads = value; - } - _stpStartInfo.MaxWorkerThreads = value; - StartOptimalNumberOfThreads(); - } - } - - public int? MaxQueueLength - { - get - { - ValidateNotDisposed(); - return _stpStartInfo.MaxQueueLength; - } - - set - { - _stpStartInfo.MaxQueueLength = value; - } - } - - /// - /// Get the number of threads in the thread pool. - /// Should be between the lower and the upper limits. - /// - public int ActiveThreads - { - get - { - ValidateNotDisposed(); - return _workerThreads.Count; - } - } - - /// - /// Get the number of work items that haven't finished execution (i.e. - /// items being worked on by threads + items in the queue). - /// - public int CurrentWorkItemsCount - { - get - { - ValidateNotDisposed(); - return _currentWorkItemsCount; - } - } - - /// - /// Returns true if the current running work item has been cancelled. - /// Must be used within the work item's callback method. - /// The work item should sample this value in order to know if it - /// needs to quit before its completion. - /// - public static bool IsWorkItemCanceled - { - get - { - return CurrentThreadEntry.CurrentWorkItem.IsCanceled; - } - } - - /// - /// Checks if the work item has been cancelled, and if yes then abort the thread. - /// Can be used with Cancel and timeout - /// - public static void AbortOnWorkItemCancel() - { - if (IsWorkItemCanceled) - { - Thread.CurrentThread.Abort(); - } - } - - /// - /// Thread Pool start information (readonly) - /// - public STPStartInfo STPStartInfo - { - get - { - return _stpStartInfo.AsReadOnly(); - } - } - - public bool IsShuttingdown - { - get { return _shutdown; } - } - - /// - /// Return the local calculated performance counters - /// Available only if STPStartInfo.EnableLocalPerformanceCounters is true. - /// - public ISTPPerformanceCountersReader PerformanceCountersReader - { - get { return (ISTPPerformanceCountersReader)_localPCs; } - } - - #endregion - - #region IDisposable Members - - public void Dispose() - { - if (!_isDisposed) - { - if (!_shutdown) - { - Shutdown(); - } - - if (null != _shuttingDownEvent) - { - _shuttingDownEvent.Close(); - _shuttingDownEvent = null; - } - _workerThreads.Clear(); - - if (null != _isIdleWaitHandle) - { - _isIdleWaitHandle.Close(); - _isIdleWaitHandle = null; - } - - _isDisposed = true; - } - } - - private void ValidateNotDisposed() - { - if(_isDisposed) - { - throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown"); - } - } - #endregion - - #region WorkItemsGroupBase Overrides - - /// - /// Get/Set the maximum number of work items that execute cocurrency on the thread pool - /// - public override int Concurrency - { - get { return MaxThreads; } - set { MaxThreads = value; } - } - - /// - /// Get the number of busy (not idle) threads in the thread pool. - /// - public override int InUseThreads - { - get - { - ValidateNotDisposed(); - return _inUseWorkerThreads; - } - } - - /// - /// Get the number of work items in the queue. - /// - public override int WaitingCallbacks - { - get - { - ValidateNotDisposed(); - return _workItemsQueue.Count; - } - } - - /// - /// Get an array with all the state objects of the currently running items. - /// The array represents a snap shot and impact performance. - /// - public override object[] GetStates() - { - object[] states = _workItemsQueue.GetStates(); - return states; - } - - /// - /// WorkItemsGroup start information (readonly) - /// - public override WIGStartInfo WIGStartInfo - { - get { return _stpStartInfo.AsReadOnly(); } - } - - /// - /// Start the thread pool if it was started suspended. - /// If it is already running, this method is ignored. - /// - public override void Start() - { - if (!_isSuspended) - { - return; - } - _isSuspended = false; - - ICollection workItemsGroups = _workItemsGroups.Values; - foreach (WorkItemsGroup workItemsGroup in workItemsGroups) - { - workItemsGroup.OnSTPIsStarting(); - } - - StartOptimalNumberOfThreads(); - } - - /// - /// Cancel all work items using thread abortion - /// - /// True to stop work items by raising ThreadAbortException - public override void Cancel(bool abortExecution) - { - _canceledSmartThreadPool.IsCanceled = true; - _canceledSmartThreadPool = new CanceledWorkItemsGroup(); - - ICollection workItemsGroups = _workItemsGroups.Values; - foreach (WorkItemsGroup workItemsGroup in workItemsGroups) - { - workItemsGroup.Cancel(abortExecution); - } - - if (abortExecution) - { - foreach (ThreadEntry threadEntry in _workerThreads.Values) - { - WorkItem workItem = threadEntry.CurrentWorkItem; - if (null != workItem && - threadEntry.AssociatedSmartThreadPool == this && - !workItem.IsCanceled) - { - threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true); - } - } - } - } - - /// - /// Wait for the thread pool to be idle - /// - public override bool WaitForIdle(int millisecondsTimeout) - { - ValidateWaitForIdle(); - return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); - } - - /// - /// This event is fired when all work items are completed. - /// (When IsIdle changes to true) - /// This event only work on WorkItemsGroup. On SmartThreadPool - /// it throws the NotImplementedException. - /// - public override event WorkItemsGroupIdleHandler OnIdle - { - add - { - throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature."); - //_onIdle += value; - } - remove - { - throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature."); - //_onIdle -= value; - } - } - - internal override void PreQueueWorkItem() - { - ValidateNotDisposed(); - - // This gives no preference to items of higher priority. - ValidateQueueIsWithinLimits(); - } - - #endregion - - #region Join, Choice, Pipe, etc. - - /// - /// Executes all actions in parallel. - /// Returns when they all finish. - /// - /// Actions to execute - public void Join(IEnumerable actions) - { - WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true }; - IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo); - foreach (Action action in actions) - { - workItemsGroup.QueueWorkItem(action); - } - workItemsGroup.Start(); - workItemsGroup.WaitForIdle(); - } - - /// - /// Executes all actions in parallel. - /// Returns when they all finish. - /// - /// Actions to execute - public void Join(params Action[] actions) - { - Join((IEnumerable)actions); - } - - private class ChoiceIndex - { - public int _index = -1; - } - - /// - /// Executes all actions in parallel - /// Returns when the first one completes - /// - /// Actions to execute - public int Choice(IEnumerable actions) - { - WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true }; - IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo); - - ManualResetEvent anActionCompleted = new ManualResetEvent(false); - - ChoiceIndex choiceIndex = new ChoiceIndex(); - - int i = 0; - foreach (Action action in actions) - { - Action act = action; - int value = i; - workItemsGroup.QueueWorkItem(() => { act(); Interlocked.CompareExchange(ref choiceIndex._index, value, -1); anActionCompleted.Set(); }); - ++i; - } - workItemsGroup.Start(); - anActionCompleted.WaitOne(); - - return choiceIndex._index; - } - - /// - /// Executes all actions in parallel - /// Returns when the first one completes - /// - /// Actions to execute - public int Choice(params Action[] actions) - { - return Choice((IEnumerable)actions); - } - - /// - /// Executes actions in sequence asynchronously. - /// Returns immediately. - /// - /// A state context that passes - /// Actions to execute in the order they should run - public void Pipe(T pipeState, IEnumerable> actions) - { - WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true }; - IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(1, wigStartInfo); - foreach (Action action in actions) - { - Action act = action; - workItemsGroup.QueueWorkItem(() => act(pipeState)); - } - workItemsGroup.Start(); - workItemsGroup.WaitForIdle(); - } - - /// - /// Executes actions in sequence asynchronously. - /// Returns immediately. - /// - /// - /// Actions to execute in the order they should run - public void Pipe(T pipeState, params Action[] actions) - { - Pipe(pipeState, (IEnumerable>)actions); - } - #endregion - } - #endregion -} +#region Release History + +// Smart Thread Pool +// 7 Aug 2004 - Initial release +// +// 14 Sep 2004 - Bug fixes +// +// 15 Oct 2004 - Added new features +// - Work items return result. +// - Support waiting synchronization for multiple work items. +// - Work items can be cancelled. +// - Passage of the caller thread’s context to the thread in the pool. +// - Minimal usage of WIN32 handles. +// - Minor bug fixes. +// +// 26 Dec 2004 - Changes: +// - Removed static constructors. +// - Added finalizers. +// - Changed Exceptions so they are serializable. +// - Fixed the bug in one of the SmartThreadPool constructors. +// - Changed the SmartThreadPool.WaitAll() so it will support any number of waiters. +// The SmartThreadPool.WaitAny() is still limited by the .NET Framework. +// - Added PostExecute with options on which cases to call it. +// - Added option to dispose of the state objects. +// - Added a WaitForIdle() method that waits until the work items queue is empty. +// - Added an STPStartInfo class for the initialization of the thread pool. +// - Changed exception handling so if a work item throws an exception it +// is rethrown at GetResult(), rather then firing an UnhandledException event. +// Note that PostExecute exception are always ignored. +// +// 25 Mar 2005 - Changes: +// - Fixed lost of work items bug +// +// 3 Jul 2005: Changes. +// - Fixed bug where Enqueue() throws an exception because PopWaiter() returned null, hardly reconstructed. +// +// 16 Aug 2005: Changes. +// - Fixed bug where the InUseThreads becomes negative when canceling work items. +// +// 31 Jan 2006 - Changes: +// - Added work items priority +// - Removed support of chained delegates in callbacks and post executes (nobody really use this) +// - Added work items groups +// - Added work items groups idle event +// - Changed SmartThreadPool.WaitAll() behavior so when it gets empty array +// it returns true rather then throwing an exception. +// - Added option to start the STP and the WIG as suspended +// - Exception behavior changed, the real exception is returned by an +// inner exception +// - Added option to keep the Http context of the caller thread. (Thanks to Steven T.) +// - Added performance counters +// - Added priority to the threads in the pool +// +// 13 Feb 2006 - Changes: +// - Added a call to the dispose of the Performance Counter so +// their won't be a Performance Counter leak. +// - Added exception catch in case the Performance Counters cannot +// be created. +// +// 17 May 2008 - Changes: +// - Changed the dispose behavior and removed the Finalizers. +// - Enabled the change of the MaxThreads and MinThreads at run time. +// - Enabled the change of the Concurrency of a IWorkItemsGroup at run +// time If the IWorkItemsGroup is a SmartThreadPool then the Concurrency +// refers to the MaxThreads. +// - Improved the cancel behavior. +// - Added events for thread creation and termination. +// - Fixed the HttpContext context capture. +// - Changed internal collections so they use generic collections +// - Added IsIdle flag to the SmartThreadPool and IWorkItemsGroup +// - Added support for WinCE +// - Added support for Action and Func +// +// 07 April 2009 - Changes: +// - Added support for Silverlight and Mono +// - Added Join, Choice, and Pipe to SmartThreadPool. +// - Added local performance counters (for Mono, Silverlight, and WindowsCE) +// - Changed duration measures from DateTime.Now to Stopwatch. +// - Queues changed from System.Collections.Queue to System.Collections.Generic.LinkedList. +// +// 21 December 2009 - Changes: +// - Added work item timeout (passive) +// +// 20 August 2012 - Changes: +// - Added set name to threads +// - Fixed the WorkItemsQueue.Dequeue. +// Replaced while (!Monitor.TryEnter(this)); with lock(this) { ... } +// - Fixed SmartThreadPool.Pipe +// - Added IsBackground option to threads +// - Added ApartmentState to threads +// - Fixed thread creation when queuing many work items at the same time. +// +// 24 August 2012 - Changes: +// - Enabled cancel abort after cancel. See: http://smartthreadpool.codeplex.com/discussions/345937 by alecswan +// - Added option to set MaxStackSize of threads +// +// 16 September 2016 - Changes: +// - Separated the STP project to .NET 2.0, .NET 4.0, and .NET 4.5 +// - Added option to set MaxQueueLength (Thanks to Rob Hruska) +// +// 31 May 2019 - Changes: +// - Added .Net Standard 2.0 support +// +// 24 Feb 2020 - Changes: +// - Added .Net Core 3.x support +// * Removed the use of Thread.Abort(). The Shutdown method doesn't get forceAbort argument in the .net core versions +// * Fixed #if for .net core, .net standard, and .net framework. +// * Enabled tests to run on .net core too +// * Fixed/Removed tests which depend on Thread.Abort. +// +// 16 July 2021 - Changes: +// - Added .Net Core 5.0 support + +#endregion + +using System; +using System.Security; +using System.Threading; +using System.Collections; +using System.Collections.Generic; +using System.Diagnostics; +using System.Runtime.CompilerServices; + +using Amib.Threading.Internal; +using Stopwatch = System.Diagnostics.Stopwatch; + +namespace Amib.Threading +{ + #region SmartThreadPool class + /// + /// Smart thread pool class. + /// + public partial class SmartThreadPool : WorkItemsGroupBase, IDisposable + { + #region Public Default Constants + + /// + /// Default minimum number of threads the thread pool contains. (0) + /// + public const int DefaultMinWorkerThreads = 0; + + /// + /// Default maximum number of threads the thread pool contains. (25) + /// + public const int DefaultMaxWorkerThreads = 25; + + /// + /// Default idle timeout in milliseconds. (One minute) + /// + public const int DefaultIdleTimeout = 60*1000; // One minute + + /// + /// Indicate to copy the security context of the caller and then use it in the call. (false) + /// + public const bool DefaultUseCallerCallContext = false; + + /// + /// Indicate to copy the HTTP context of the caller and then use it in the call. (false) + /// + public const bool DefaultUseCallerHttpContext = false; + + /// + /// Indicate to dispose of the state objects if they support the IDispose interface. (false) + /// + public const bool DefaultDisposeOfStateObjects = false; + + /// + /// The default option to run the post execute (CallToPostExecute.Always) + /// + public const CallToPostExecute DefaultCallToPostExecute = CallToPostExecute.Always; + + /// + /// The default post execute method to run. (None) + /// When null it means not to call it. + /// + public static readonly PostExecuteWorkItemCallback DefaultPostExecuteWorkItemCallback; + + /// + /// The default work item priority (WorkItemPriority.Normal) + /// + public const WorkItemPriority DefaultWorkItemPriority = WorkItemPriority.Normal; + + /// + /// The default is to work on work items as soon as they arrive + /// and not to wait for the start. (false) + /// + public const bool DefaultStartSuspended = false; + + /// + /// The default name to use for the performance counters instance. (null) + /// + public static readonly string DefaultPerformanceCounterInstanceName; + + /// + /// The default thread priority (ThreadPriority.Normal) + /// + public const ThreadPriority DefaultThreadPriority = ThreadPriority.Normal; + + /// + /// The default thread pool name. (SmartThreadPool) + /// + public const string DefaultThreadPoolName = "SmartThreadPool"; + + /// + /// The default Max Stack Size. (null) + /// + public static readonly int? DefaultMaxStackSize = null; + + /// + /// The default Max Queue Length (null). + /// + public static readonly int? DefaultMaxQueueLength = null; + + /// + /// The default fill state with params. (false) + /// It is relevant only to QueueWorkItem of Action<...>/Func<...> + /// + public const bool DefaultFillStateWithArgs = false; + + /// + /// The default thread backgroundness. (true) + /// + public const bool DefaultAreThreadsBackground = true; + + /// + /// The default apartment state of a thread in the thread pool. + /// The default is ApartmentState.Unknown which means the STP will not + /// set the apartment of the thread. It will use the .NET default. + /// + public const ApartmentState DefaultApartmentState = ApartmentState.Unknown; + + #endregion + + #region Member Variables + + /// + /// Dictionary of all the threads in the thread pool. + /// + private readonly SynchronizedDictionary _workerThreads = new SynchronizedDictionary(); + + /// + /// Queue of work items. + /// + private readonly WorkItemsQueue _workItemsQueue = new WorkItemsQueue(); + + /// + /// Count the work items handled. + /// Used by the performance counter. + /// + private int _workItemsProcessed; + + /// + /// Number of threads that currently work (not idle). + /// + private int _inUseWorkerThreads; + + /// + /// Stores a copy of the original STPStartInfo. + /// It is used to change the MinThread and MaxThreads + /// + private STPStartInfo _stpStartInfo; + + /// + /// Total number of work items that are stored in the work items queue + /// plus the work items that the threads in the pool are working on. + /// + private volatile int _currentWorkItemsCount; + + /// + /// Signaled when the thread pool is idle, i.e. no thread is busy + /// and the work items queue is empty + /// + //private ManualResetEvent _isIdleWaitHandle = new ManualResetEvent(true); + private ManualResetEvent _isIdleWaitHandle = EventWaitHandleFactory.CreateManualResetEvent(true); + + /// + /// An event to signal all the threads to quit immediately. + /// + //private ManualResetEvent _shuttingDownEvent = new ManualResetEvent(false); + private ManualResetEvent _shuttingDownEvent = EventWaitHandleFactory.CreateManualResetEvent(false); + + /// + /// A flag to indicate if the Smart Thread Pool is now suspended. + /// + private bool _isSuspended; + + /// + /// A flag to indicate the threads to quit. + /// + private bool _shutdown; + + /// + /// Counts the threads created in the pool. + /// It is used to name the threads. + /// + private int _threadCounter; + + /// + /// Indicate that the SmartThreadPool has been disposed + /// + private bool _isDisposed; + + /// + /// Holds all the WorkItemsGroup instaces that have at least one + /// work item int the SmartThreadPool + /// This variable is used in case of Shutdown + /// + private readonly SynchronizedDictionary _workItemsGroups = new SynchronizedDictionary(); + + /// + /// A common object for all the work items int the STP + /// so we can mark them to cancel in O(1) + /// + private CanceledWorkItemsGroup _canceledSmartThreadPool = new CanceledWorkItemsGroup(); + + /// + /// Windows STP performance counters + /// + private ISTPInstancePerformanceCounters _windowsPCs = NullSTPInstancePerformanceCounters.Instance; + + /// + /// Local STP performance counters + /// + private ISTPInstancePerformanceCounters _localPCs = NullSTPInstancePerformanceCounters.Instance; + + [ThreadStatic] + private static ThreadEntry _threadEntry; + + /// + /// An event to call after a thread is created, but before + /// it's first use. + /// + private event ThreadInitializationHandler _onThreadInitialization; + + /// + /// An event to call when a thread is about to exit, after + /// it is no longer belong to the pool. + /// + private event ThreadTerminationHandler _onThreadTermination; + + #endregion + + #region Per thread properties + + /// + /// A reference to the current work item a thread from the thread pool + /// is executing. + /// + internal static ThreadEntry CurrentThreadEntry + { + get + { + return _threadEntry; + } + set + { + _threadEntry = value; + } + } + + #endregion + + #region Construction and Finalization + + /// + /// Constructor + /// + public SmartThreadPool() + { + _stpStartInfo = new STPStartInfo(); + Initialize(); + } + + /// + /// Constructor + /// + /// Idle timeout in milliseconds + public SmartThreadPool(int idleTimeout) + { + _stpStartInfo = new STPStartInfo + { + IdleTimeout = idleTimeout, + }; + Initialize(); + } + + /// + /// Constructor + /// + /// Set it to True to start thread pool in suspended mode; Explicit call to Start() will be needed to start the Thread pool. + public SmartThreadPool(bool startSuspended) + { + _stpStartInfo = new STPStartInfo + { + StartSuspended = startSuspended, + }; + Initialize(); + } + + /// + /// Constructor + /// + /// Idle timeout in milliseconds + /// Upper limit of threads in the pool + public SmartThreadPool( + int idleTimeout, + int maxWorkerThreads) + { + _stpStartInfo = new STPStartInfo + { + IdleTimeout = idleTimeout, + MaxWorkerThreads = maxWorkerThreads, + }; + Initialize(); + } + + /// + /// Constructor + /// + /// Idle timeout in milliseconds + /// Upper limit of threads in the pool + /// Lower limit of threads in the pool + public SmartThreadPool( + int idleTimeout, + int maxWorkerThreads, + int minWorkerThreads) + { + _stpStartInfo = new STPStartInfo + { + IdleTimeout = idleTimeout, + MaxWorkerThreads = maxWorkerThreads, + MinWorkerThreads = minWorkerThreads, + }; + Initialize(); + } + + /// + /// Constructor + /// + /// A SmartThreadPool configuration that overrides the default behavior + public SmartThreadPool(STPStartInfo stpStartInfo) + { + _stpStartInfo = new STPStartInfo(stpStartInfo); + Initialize(); + } + + private void Initialize() + { + Name = _stpStartInfo.ThreadPoolName; + ValidateSTPStartInfo(); + + // _stpStartInfoRW stores a read/write copy of the STPStartInfo. + // Actually only MaxWorkerThreads and MinWorkerThreads are overwritten + + _isSuspended = _stpStartInfo.StartSuspended; + +#if !(NETFRAMEWORK) + if (null != _stpStartInfo.PerformanceCounterInstanceName) + { + throw new NotSupportedException("Performance counters are not implemented for Compact Framework/Silverlight/Mono, instead use StpStartInfo.EnableLocalPerformanceCounters"); + } +#else + if (null != _stpStartInfo.PerformanceCounterInstanceName) + { + try + { + _windowsPCs = new STPInstancePerformanceCounters(_stpStartInfo.PerformanceCounterInstanceName); + } + catch (Exception e) + { + Debug.WriteLine("Unable to create Performance Counters: " + e); + _windowsPCs = NullSTPInstancePerformanceCounters.Instance; + } + } +#endif + + if (_stpStartInfo.EnableLocalPerformanceCounters) + { + _localPCs = new LocalSTPInstancePerformanceCounters(); + } + + // If the STP is not started suspended then start the threads. + if (!_isSuspended) + { + StartOptimalNumberOfThreads(); + } + } + + private void StartOptimalNumberOfThreads() + { + int threadsCount = Math.Max(_workItemsQueue.Count, _stpStartInfo.MinWorkerThreads); + threadsCount = Math.Min(threadsCount, _stpStartInfo.MaxWorkerThreads); + threadsCount -= _workerThreads.Count; + if (threadsCount > 0) + { + StartThreads(threadsCount); + } + } + + private void ValidateSTPStartInfo() + { + if (_stpStartInfo.MinWorkerThreads < 0) + { + throw new ArgumentOutOfRangeException( + "MinWorkerThreads", "MinWorkerThreads cannot be negative"); + } + + if (_stpStartInfo.MaxWorkerThreads <= 0) + { + throw new ArgumentOutOfRangeException( + "MaxWorkerThreads", "MaxWorkerThreads must be greater than zero"); + } + + if (_stpStartInfo.MinWorkerThreads > _stpStartInfo.MaxWorkerThreads) + { + throw new ArgumentOutOfRangeException( + "MinWorkerThreads, maxWorkerThreads", + "MaxWorkerThreads must be greater or equal to MinWorkerThreads"); + } + + if (_stpStartInfo.MaxQueueLength < 0) + { + throw new ArgumentOutOfRangeException( + "MaxQueueLength", + "MaxQueueLength must be >= 0 or null (for unbounded)"); + } + } + + private static void ValidateCallback(Delegate callback) + { + if(callback.GetInvocationList().Length > 1) + { + throw new NotSupportedException("SmartThreadPool doesn't support delegates chains"); + } + } + + #endregion + + #region Thread Processing + + /// + /// Waits on the queue for a work item, shutdown, or timeout. + /// + /// + /// Returns the WaitingCallback or null in case of timeout or shutdown. + /// + private WorkItem Dequeue() + { + WorkItem workItem = + _workItemsQueue.DequeueWorkItem(_stpStartInfo.IdleTimeout, _shuttingDownEvent); + + return workItem; + } + + /// + /// Put a new work item in the queue + /// + /// A work item to queue + internal override void Enqueue(WorkItem workItem) + { + // Make sure the workItem is not null + Debug.Assert(null != workItem); + + IncrementWorkItemsCount(); + + workItem.CanceledSmartThreadPool = _canceledSmartThreadPool; + _workItemsQueue.EnqueueWorkItem(workItem); + workItem.WorkItemIsQueued(); + + // If all the threads are busy then try to create a new one + if (_currentWorkItemsCount > _workerThreads.Count) + { + StartThreads(1); + } + } + + private void IncrementWorkItemsCount() + { + _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); + _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); + + int count = Interlocked.Increment(ref _currentWorkItemsCount); + //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString()); + if (count == 1) + { + IsIdle = false; + _isIdleWaitHandle.Reset(); + } + } + + private void DecrementWorkItemsCount() + { + int count = Interlocked.Decrement(ref _currentWorkItemsCount); + //Trace.WriteLine("WorkItemsCount = " + _currentWorkItemsCount.ToString()); + if (count == 0) + { + IsIdle = true; + _isIdleWaitHandle.Set(); + } + + Interlocked.Increment(ref _workItemsProcessed); + + if (!_shutdown) + { + // The counter counts even if the work item was cancelled + _windowsPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); + _localPCs.SampleWorkItems(_workItemsQueue.Count, _workItemsProcessed); + } + + } + + internal void RegisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) + { + _workItemsGroups[workItemsGroup] = workItemsGroup; + } + + internal void UnregisterWorkItemsGroup(IWorkItemsGroup workItemsGroup) + { + if (_workItemsGroups.Contains(workItemsGroup)) + { + _workItemsGroups.Remove(workItemsGroup); + } + } + + /// + /// Inform that the current thread is about to quit or quiting. + /// The same thread may call this method more than once. + /// + private void InformCompleted() + { + // There is no need to lock the two methods together + // since only the current thread removes itself + // and the _workerThreads is a synchronized dictionary + if (_workerThreads.Contains(Thread.CurrentThread)) + { + _workerThreads.Remove(Thread.CurrentThread); + _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); + _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); + } + } + + /// + /// Starts new threads + /// + /// The number of threads to start + private void StartThreads(int threadsCount) + { + if (_isSuspended) + { + return; + } + + lock(_workerThreads.SyncRoot) + { + // Don't start threads on shut down + if (_shutdown) + { + return; + } + + for(int i = 0; i < threadsCount; ++i) + { + // Don't create more threads then the upper limit + if (_workerThreads.Count >= _stpStartInfo.MaxWorkerThreads) + { + return; + } + + // Create a new thread + + Thread workerThread = + _stpStartInfo.MaxStackSize.HasValue + ? new Thread(ProcessQueuedItems, _stpStartInfo.MaxStackSize.Value) + : new Thread(ProcessQueuedItems); + // Configure the new thread and start it + workerThread.Name = "STP " + Name + " Thread #" + _threadCounter; + workerThread.IsBackground = _stpStartInfo.AreThreadsBackground; + + if (_stpStartInfo.ApartmentState != ApartmentState.Unknown) + { + workerThread.SetApartmentState(_stpStartInfo.ApartmentState); + } + + workerThread.Priority = _stpStartInfo.ThreadPriority; + workerThread.Start(); + ++_threadCounter; + + // Add it to the dictionary and update its creation time. + _workerThreads[workerThread] = new ThreadEntry(this); + + _windowsPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); + _localPCs.SampleThreads(_workerThreads.Count, _inUseWorkerThreads); + } + } + } + + /// + /// A worker thread method that processes work items from the work items queue. + /// + private void ProcessQueuedItems() + { + // Keep the entry of the dictionary as thread's variable to avoid the synchronization locks + // of the dictionary. + CurrentThreadEntry = _workerThreads[Thread.CurrentThread]; + + FireOnThreadInitialization(); + + try + { + bool bInUseWorkerThreadsWasIncremented = false; + + // Process until shutdown. + while(!_shutdown) + { + // Update the last time this thread was seen alive. + // It's good for debugging. + CurrentThreadEntry.IAmAlive(); + + // The following block handles the when the MaxWorkerThreads has been + // incremented by the user at run-time. + // Double lock for quit. + if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads) + { + lock (_workerThreads.SyncRoot) + { + if (_workerThreads.Count > _stpStartInfo.MaxWorkerThreads) + { + // Inform that the thread is quiting and then quit. + // This method must be called within this lock or else + // more threads will quit and the thread pool will go + // below the lower limit. + InformCompleted(); + break; + } + } + } + + // Wait for a work item, shutdown, or timeout + WorkItem workItem = Dequeue(); + + // Update the last time this thread was seen alive. + // It's good for debugging. + CurrentThreadEntry.IAmAlive(); + + // On timeout or shut down. + if (null == workItem) + { + // Double lock for quit. + if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) + { + lock(_workerThreads.SyncRoot) + { + if (_workerThreads.Count > _stpStartInfo.MinWorkerThreads) + { + // Inform that the thread is quiting and then quit. + // This method must be called within this lock or else + // more threads will quit and the thread pool will go + // below the lower limit. + InformCompleted(); + break; + } + } + } + } + + // If we didn't quit then skip to the next iteration. + if (null == workItem) + { + continue; + } + + try + { + // Initialize the value to false + bInUseWorkerThreadsWasIncremented = false; + + // Set the Current Work Item of the thread. + // Store the Current Work Item before the workItem.StartingWorkItem() is called, + // so WorkItem.Cancel can work when the work item is between InQueue and InProgress + // states. + // If the work item has been cancelled BEFORE the workItem.StartingWorkItem() + // (work item is in InQueue state) then workItem.StartingWorkItem() will return false. + // If the work item has been cancelled AFTER the workItem.StartingWorkItem() then + // (work item is in InProgress state) then the thread will be aborted + CurrentThreadEntry.CurrentWorkItem = workItem; + + // Change the state of the work item to 'in progress' if possible. + // We do it here so if the work item has been canceled we won't + // increment the _inUseWorkerThreads. + // The cancel mechanism doesn't delete items from the queue, + // it marks the work item as canceled, and when the work item + // is dequeued, we just skip it. + // If the post execute of work item is set to always or to + // call when the work item is canceled then the StartingWorkItem() + // will return true, so the post execute can run. + if (!workItem.StartingWorkItem()) + { + continue; + } + + // Execute the callback. Make sure to accurately + // record how many callbacks are currently executing. + int inUseWorkerThreads = Interlocked.Increment(ref _inUseWorkerThreads); + _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); + _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); + + // Mark that the _inUseWorkerThreads incremented, so in the finally{} + // statement we will decrement it correctly. + bInUseWorkerThreadsWasIncremented = true; + + workItem.FireWorkItemStarted(); + + ExecuteWorkItem(workItem); + } + catch(Exception ex) + { + ex.GetHashCode(); + // Do nothing + } + finally + { + workItem.DisposeOfState(); + + // Set the CurrentWorkItem to null, since we + // no longer run user's code. + CurrentThreadEntry.CurrentWorkItem = null; + + // Decrement the _inUseWorkerThreads only if we had + // incremented it. Note the cancelled work items don't + // increment _inUseWorkerThreads. + if (bInUseWorkerThreadsWasIncremented) + { + int inUseWorkerThreads = Interlocked.Decrement(ref _inUseWorkerThreads); + _windowsPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); + _localPCs.SampleThreads(_workerThreads.Count, inUseWorkerThreads); + } + + // Notify that the work item has been completed. + // WorkItemsGroup may enqueue their next work item. + workItem.FireWorkItemCompleted(); + + // Decrement the number of work items here so the idle + // ManualResetEvent won't fluctuate. + DecrementWorkItemsCount(); + } + } + } + catch(ThreadAbortException tae) + { + tae.GetHashCode(); + // Handle the abort exception gracfully. + Thread.ResetAbort(); + } + catch(Exception e) + { + Debug.Assert(null != e); + } + finally + { + InformCompleted(); + FireOnThreadTermination(); + } + } + + private void ExecuteWorkItem(WorkItem workItem) + { + _windowsPCs.SampleWorkItemsWaitTime(workItem.WaitingTime); + _localPCs.SampleWorkItemsWaitTime(workItem.WaitingTime); + try + { + workItem.Execute(); + } + finally + { + _windowsPCs.SampleWorkItemsProcessTime(workItem.ProcessTime); + _localPCs.SampleWorkItemsProcessTime(workItem.ProcessTime); + } + } + + + #endregion + + #region Public Methods + + private void ValidateWaitForIdle() + { + if (null != CurrentThreadEntry && CurrentThreadEntry.AssociatedSmartThreadPool == this) + { + throw new NotSupportedException( + "WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock"); + } + } + + internal static void ValidateWorkItemsGroupWaitForIdle(IWorkItemsGroup workItemsGroup) + { + if (null == CurrentThreadEntry) + { + return; + } + + WorkItem workItem = CurrentThreadEntry.CurrentWorkItem; + ValidateWorkItemsGroupWaitForIdleImpl(workItemsGroup, workItem); + if ((null != workItemsGroup) && + (null != workItem) && + CurrentThreadEntry.CurrentWorkItem.WasQueuedBy(workItemsGroup)) + { + throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock"); + } + } + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ValidateWorkItemsGroupWaitForIdleImpl(IWorkItemsGroup workItemsGroup, WorkItem workItem) + { + if ((null != workItemsGroup) && + (null != workItem) && + workItem.WasQueuedBy(workItemsGroup)) + { + throw new NotSupportedException("WaitForIdle cannot be called from a thread on its SmartThreadPool, it causes a deadlock"); + } + } + + /// + /// Force the SmartThreadPool to shutdown + /// Doesn't use Thread.Abort + /// + public void Shutdown() + { + ShutdownImpl(false, 0); + } + + // Thread.Abort is not supported in .net core + + /// + /// Force the SmartThreadPool to shutdown with timeout + /// Doesn't use Thread.Abort + /// + public void Shutdown(TimeSpan timeout) + { + ShutdownImpl(false, (int)timeout.TotalMilliseconds); + } + + /// + /// Empties the queue of work items and abort the threads in the pool. + /// Doesn't use Thread.Abort + /// + public void Shutdown(int millisecondsTimeout) + { + ShutdownImpl(false, millisecondsTimeout); + } + +#if !(NETCOREAPP) + + /// + /// Force the SmartThreadPool to shutdown with timeout + /// + public void Shutdown(bool forceAbort) + { + ShutdownImpl(forceAbort, 0); + } + + /// + /// Force the SmartThreadPool to shutdown with timeout + /// + public void Shutdown(bool forceAbort, TimeSpan timeout) + { + ShutdownImpl(forceAbort, (int)timeout.TotalMilliseconds); + } + + /// + /// Empties the queue of work items and abort the threads in the pool. + /// + public void Shutdown(bool forceAbort, int millisecondsTimeout) + { + ShutdownImpl(forceAbort, millisecondsTimeout); + } +#endif + private void ShutdownImpl(bool forceAbort, int millisecondsTimeout) + { + ValidateNotDisposed(); + + ISTPInstancePerformanceCounters pcs = _windowsPCs; + + if (NullSTPInstancePerformanceCounters.Instance != _windowsPCs) + { + // Set the _pcs to "null" to stop updating the performance + // counters + _windowsPCs = NullSTPInstancePerformanceCounters.Instance; + + pcs.Dispose(); + } + + Thread [] threads; + lock(_workerThreads.SyncRoot) + { + // Shutdown the work items queue + _workItemsQueue.Dispose(); + + // Signal the threads to exit + _shutdown = true; + _shuttingDownEvent.Set(); + + // Make a copy of the threads' references in the pool + threads = new Thread [_workerThreads.Count]; + _workerThreads.Keys.CopyTo(threads, 0); + } + + int millisecondsLeft = millisecondsTimeout; + Stopwatch stopwatch = Stopwatch.StartNew(); + //DateTime start = DateTime.UtcNow; + bool waitInfinitely = (Timeout.Infinite == millisecondsTimeout); + bool timeout = false; + + // Each iteration we update the time left for the timeout. + foreach(Thread thread in threads) + { + // Join don't work with negative numbers + if (!waitInfinitely && (millisecondsLeft < 0)) + { + timeout = true; + break; + } + + // Wait for the thread to terminate + bool success = thread.Join(millisecondsLeft); + if(!success) + { + timeout = true; + break; + } + + if(!waitInfinitely) + { + // Update the time left to wait + //TimeSpan ts = DateTime.UtcNow - start; + millisecondsLeft = millisecondsTimeout - (int)stopwatch.ElapsedMilliseconds; + } + } + + if (timeout && forceAbort) + { + // Abort the threads in the pool + foreach(Thread thread in threads) + { + + if ((thread != null) && thread.IsAlive) + { + try + { + thread.Abort(); // Shutdown + } + catch(SecurityException e) + { + e.GetHashCode(); + } + catch(ThreadStateException ex) + { + ex.GetHashCode(); + // In case the thread has been terminated + // after the check if it is alive. + } + } + } + } + } + + /// + /// Wait for all work items to complete + /// + /// Array of work item result objects + /// + /// true when every work item in workItemResults has completed; otherwise false. + /// + public static bool WaitAll( + IWaitableResult [] waitableResults) + { + return WaitAll(waitableResults, Timeout.Infinite, true); + } + + /// + /// Wait for all work items to complete + /// + /// Array of work item result objects + /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. + /// + /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. + /// + /// + /// true when every work item in workItemResults has completed; otherwise false. + /// + public static bool WaitAll( + IWaitableResult [] waitableResults, + TimeSpan timeout, + bool exitContext) + { + return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext); + } + + /// + /// Wait for all work items to complete + /// + /// Array of work item result objects + /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. + /// + /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. + /// + /// A cancel wait handle to interrupt the wait if needed + /// + /// true when every work item in workItemResults has completed; otherwise false. + /// + public static bool WaitAll( + IWaitableResult[] waitableResults, + TimeSpan timeout, + bool exitContext, + WaitHandle cancelWaitHandle) + { + return WaitAll(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); + } + + /// + /// Wait for all work items to complete + /// + /// Array of work item result objects + /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. + /// + /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. + /// + /// + /// true when every work item in workItemResults has completed; otherwise false. + /// + public static bool WaitAll( + IWaitableResult [] waitableResults, + int millisecondsTimeout, + bool exitContext) + { + return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, null); + } + + /// + /// Wait for all work items to complete + /// + /// Array of work item result objects + /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. + /// + /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. + /// + /// A cancel wait handle to interrupt the wait if needed + /// + /// true when every work item in workItemResults has completed; otherwise false. + /// + public static bool WaitAll( + IWaitableResult[] waitableResults, + int millisecondsTimeout, + bool exitContext, + WaitHandle cancelWaitHandle) + { + return WorkItem.WaitAll(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle); + } + + + /// + /// Waits for any of the work items in the specified array to complete, cancel, or timeout + /// + /// Array of work item result objects + /// + /// The array index of the work item result that satisfied the wait, or WaitTimeout if any of the work items has been canceled. + /// + public static int WaitAny( + IWaitableResult [] waitableResults) + { + return WaitAny(waitableResults, Timeout.Infinite, true); + } + + /// + /// Waits for any of the work items in the specified array to complete, cancel, or timeout + /// + /// Array of work item result objects + /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. + /// + /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. + /// + /// + /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. + /// + public static int WaitAny( + IWaitableResult[] waitableResults, + TimeSpan timeout, + bool exitContext) + { + return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext); + } + + /// + /// Waits for any of the work items in the specified array to complete, cancel, or timeout + /// + /// Array of work item result objects + /// The number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. + /// + /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. + /// + /// A cancel wait handle to interrupt the wait if needed + /// + /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. + /// + public static int WaitAny( + IWaitableResult [] waitableResults, + TimeSpan timeout, + bool exitContext, + WaitHandle cancelWaitHandle) + { + return WaitAny(waitableResults, (int)timeout.TotalMilliseconds, exitContext, cancelWaitHandle); + } + + /// + /// Waits for any of the work items in the specified array to complete, cancel, or timeout + /// + /// Array of work item result objects + /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. + /// + /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. + /// + /// + /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. + /// + public static int WaitAny( + IWaitableResult [] waitableResults, + int millisecondsTimeout, + bool exitContext) + { + return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, null); + } + + /// + /// Waits for any of the work items in the specified array to complete, cancel, or timeout + /// + /// Array of work item result objects + /// The number of milliseconds to wait, or Timeout.Infinite (-1) to wait indefinitely. + /// + /// true to exit the synchronization domain for the context before the wait (if in a synchronized context), and reacquire it; otherwise, false. + /// + /// A cancel wait handle to interrupt the wait if needed + /// + /// The array index of the work item result that satisfied the wait, or WaitTimeout if no work item result satisfied the wait and a time interval equivalent to millisecondsTimeout has passed or the work item has been canceled. + /// + public static int WaitAny( + IWaitableResult [] waitableResults, + int millisecondsTimeout, + bool exitContext, + WaitHandle cancelWaitHandle) + { + return WorkItem.WaitAny(waitableResults, millisecondsTimeout, exitContext, cancelWaitHandle); + } + + /// + /// Creates a new WorkItemsGroup. + /// + /// The number of work items that can be run concurrently + /// A reference to the WorkItemsGroup + public IWorkItemsGroup CreateWorkItemsGroup(int concurrency) + { + IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, _stpStartInfo); + return workItemsGroup; + } + + /// + /// Creates a new WorkItemsGroup. + /// + /// The number of work items that can be run concurrently + /// A WorkItemsGroup configuration that overrides the default behavior + /// A reference to the WorkItemsGroup + public IWorkItemsGroup CreateWorkItemsGroup(int concurrency, WIGStartInfo wigStartInfo) + { + IWorkItemsGroup workItemsGroup = new WorkItemsGroup(this, concurrency, wigStartInfo); + return workItemsGroup; + } + + #region Fire Thread's Events + + private void FireOnThreadInitialization() + { + if (null != _onThreadInitialization) + { + foreach (ThreadInitializationHandler tih in _onThreadInitialization.GetInvocationList()) + { + try + { + tih(); + } + catch (Exception e) + { + e.GetHashCode(); + Debug.Assert(false); + throw; + } + } + } + } + + private void FireOnThreadTermination() + { + if (null != _onThreadTermination) + { + foreach (ThreadTerminationHandler tth in _onThreadTermination.GetInvocationList()) + { + try + { + tth(); + } + catch (Exception e) + { + e.GetHashCode(); + Debug.Assert(false); + throw; + } + } + } + } + + #endregion + + /// + /// This event is fired when a thread is created. + /// Use it to initialize a thread before the work items use it. + /// + public event ThreadInitializationHandler OnThreadInitialization + { + add { _onThreadInitialization += value; } + remove { _onThreadInitialization -= value; } + } + + /// + /// This event is fired when a thread is terminating. + /// Use it for cleanup. + /// + public event ThreadTerminationHandler OnThreadTermination + { + add { _onThreadTermination += value; } + remove { _onThreadTermination -= value; } + } + + + internal void CancelAbortWorkItemsGroup(WorkItemsGroup wig) + { + foreach (ThreadEntry threadEntry in _workerThreads.Values) + { + WorkItem workItem = threadEntry.CurrentWorkItem; + if (null != workItem && + workItem.WasQueuedBy(wig) && + !workItem.IsCanceled) + { + threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true); + } + } + } + + private void ValidateQueueIsWithinLimits() + { + // Keep a local copy; if a client changes the length while this is executing, + // we'll want to use the same value throughout. + var maxQueueLength = _stpStartInfo.MaxQueueLength; + + if (maxQueueLength == null) + { + return; + } + + // Instead of just looking at the current queue length here, account for the + // fact that the pool is going to scale up its threads if it's not yet at its + // maximum and there are queued items. This means that the queue length limit + // may be briefly exceeded while the pool is scaling up. + if (_currentWorkItemsCount >= maxQueueLength + MaxThreads) + { + throw new QueueRejectedException("Queue is at its maximum (" + maxQueueLength + ")"); + } + } + + #endregion + + #region Properties + + /// + /// Get/Set the lower limit of threads in the pool. + /// + public int MinThreads + { + get + { + ValidateNotDisposed(); + return _stpStartInfo.MinWorkerThreads; + } + set + { + Debug.Assert(value >= 0); + Debug.Assert(value <= _stpStartInfo.MaxWorkerThreads); + if (_stpStartInfo.MaxWorkerThreads < value) + { + _stpStartInfo.MaxWorkerThreads = value; + } + _stpStartInfo.MinWorkerThreads = value; + StartOptimalNumberOfThreads(); + } + } + + /// + /// Get/Set the upper limit of threads in the pool. + /// + public int MaxThreads + { + get + { + ValidateNotDisposed(); + return _stpStartInfo.MaxWorkerThreads; + } + + set + { + Debug.Assert(value > 0); + Debug.Assert(value >= _stpStartInfo.MinWorkerThreads); + if (_stpStartInfo.MinWorkerThreads > value) + { + _stpStartInfo.MinWorkerThreads = value; + } + _stpStartInfo.MaxWorkerThreads = value; + StartOptimalNumberOfThreads(); + } + } + + public int? MaxQueueLength + { + get + { + ValidateNotDisposed(); + return _stpStartInfo.MaxQueueLength; + } + + set + { + _stpStartInfo.MaxQueueLength = value; + } + } + + /// + /// Get the number of threads in the thread pool. + /// Should be between the lower and the upper limits. + /// + public int ActiveThreads + { + get + { + ValidateNotDisposed(); + return _workerThreads.Count; + } + } + + /// + /// Get the number of work items that haven't finished execution (i.e. + /// items being worked on by threads + items in the queue). + /// + public int CurrentWorkItemsCount + { + get + { + ValidateNotDisposed(); + return _currentWorkItemsCount; + } + } + + /// + /// Returns true if the current running work item has been cancelled. + /// Must be used within the work item's callback method. + /// The work item should sample this value in order to know if it + /// needs to quit before its completion. + /// + public static bool IsWorkItemCanceled + { + get + { + return CurrentThreadEntry.CurrentWorkItem.IsCanceled; + } + } + + /// + /// Checks if the work item has been cancelled, and if yes then abort the thread. + /// Can be used with Cancel and timeout + /// + public static void AbortOnWorkItemCancel() + { + if (IsWorkItemCanceled) + { + Thread.CurrentThread.Abort(); + } + } + + /// + /// Thread Pool start information (readonly) + /// + public STPStartInfo STPStartInfo + { + get + { + return _stpStartInfo.AsReadOnly(); + } + } + + public bool IsShuttingdown + { + get { return _shutdown; } + } + + /// + /// Return the local calculated performance counters + /// Available only if STPStartInfo.EnableLocalPerformanceCounters is true. + /// + public ISTPPerformanceCountersReader PerformanceCountersReader + { + get { return (ISTPPerformanceCountersReader)_localPCs; } + } + + #endregion + + #region IDisposable Members + + public void Dispose() + { + if (!_isDisposed) + { + if (!_shutdown) + { + Shutdown(); + } + + if (null != _shuttingDownEvent) + { + _shuttingDownEvent.Close(); + _shuttingDownEvent = null; + } + _workerThreads.Clear(); + + if (null != _isIdleWaitHandle) + { + _isIdleWaitHandle.Close(); + _isIdleWaitHandle = null; + } + + _isDisposed = true; + } + } + + private void ValidateNotDisposed() + { + if(_isDisposed) + { + throw new ObjectDisposedException(GetType().ToString(), "The SmartThreadPool has been shutdown"); + } + } + #endregion + + #region WorkItemsGroupBase Overrides + + /// + /// Get/Set the maximum number of work items that execute cocurrency on the thread pool + /// + public override int Concurrency + { + get { return MaxThreads; } + set { MaxThreads = value; } + } + + /// + /// Get the number of busy (not idle) threads in the thread pool. + /// + public override int InUseThreads + { + get + { + ValidateNotDisposed(); + return _inUseWorkerThreads; + } + } + + /// + /// Get the number of work items in the queue. + /// + public override int WaitingCallbacks + { + get + { + ValidateNotDisposed(); + return _workItemsQueue.Count; + } + } + + /// + /// Get an array with all the state objects of the currently running items. + /// The array represents a snap shot and impact performance. + /// + public override object[] GetStates() + { + object[] states = _workItemsQueue.GetStates(); + return states; + } + + /// + /// WorkItemsGroup start information (readonly) + /// + public override WIGStartInfo WIGStartInfo + { + get { return _stpStartInfo.AsReadOnly(); } + } + + /// + /// Start the thread pool if it was started suspended. + /// If it is already running, this method is ignored. + /// + public override void Start() + { + if (!_isSuspended) + { + return; + } + _isSuspended = false; + + ICollection workItemsGroups = _workItemsGroups.Values; + foreach (WorkItemsGroup workItemsGroup in workItemsGroups) + { + workItemsGroup.OnSTPIsStarting(); + } + + StartOptimalNumberOfThreads(); + } + + /// + /// Cancel all work items using thread abortion + /// + /// True to stop work items by raising ThreadAbortException + public override void Cancel(bool abortExecution) + { + _canceledSmartThreadPool.IsCanceled = true; + _canceledSmartThreadPool = new CanceledWorkItemsGroup(); + + ICollection workItemsGroups = _workItemsGroups.Values; + foreach (WorkItemsGroup workItemsGroup in workItemsGroups) + { + workItemsGroup.Cancel(abortExecution); + } + + if (abortExecution) + { + foreach (ThreadEntry threadEntry in _workerThreads.Values) + { + WorkItem workItem = threadEntry.CurrentWorkItem; + if (null != workItem && + threadEntry.AssociatedSmartThreadPool == this && + !workItem.IsCanceled) + { + threadEntry.CurrentWorkItem.GetWorkItemResult().Cancel(true); + } + } + } + } + + /// + /// Wait for the thread pool to be idle + /// + public override bool WaitForIdle(int millisecondsTimeout) + { + ValidateWaitForIdle(); + return STPEventWaitHandle.WaitOne(_isIdleWaitHandle, millisecondsTimeout, false); + } + + /// + /// This event is fired when all work items are completed. + /// (When IsIdle changes to true) + /// This event only work on WorkItemsGroup. On SmartThreadPool + /// it throws the NotImplementedException. + /// + public override event WorkItemsGroupIdleHandler OnIdle + { + add + { + throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature."); + //_onIdle += value; + } + remove + { + throw new NotImplementedException("This event is not implemented in the SmartThreadPool class. Please create a WorkItemsGroup in order to use this feature."); + //_onIdle -= value; + } + } + + internal override void PreQueueWorkItem() + { + ValidateNotDisposed(); + + // This gives no preference to items of higher priority. + ValidateQueueIsWithinLimits(); + } + + #endregion + + #region Join, Choice, Pipe, etc. + + /// + /// Executes all actions in parallel. + /// Returns when they all finish. + /// + /// Actions to execute + public void Join(IEnumerable actions) + { + WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true }; + IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo); + foreach (Action action in actions) + { + workItemsGroup.QueueWorkItem(action); + } + workItemsGroup.Start(); + workItemsGroup.WaitForIdle(); + } + + /// + /// Executes all actions in parallel. + /// Returns when they all finish. + /// + /// Actions to execute + public void Join(params Action[] actions) + { + Join((IEnumerable)actions); + } + + private class ChoiceIndex + { + public int _index = -1; + } + + /// + /// Executes all actions in parallel + /// Returns when the first one completes + /// + /// Actions to execute + public int Choice(IEnumerable actions) + { + WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true }; + IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(int.MaxValue, wigStartInfo); + + ManualResetEvent anActionCompleted = new ManualResetEvent(false); + + ChoiceIndex choiceIndex = new ChoiceIndex(); + + int i = 0; + foreach (Action action in actions) + { + Action act = action; + int value = i; + workItemsGroup.QueueWorkItem(() => { act(); Interlocked.CompareExchange(ref choiceIndex._index, value, -1); anActionCompleted.Set(); }); + ++i; + } + workItemsGroup.Start(); + anActionCompleted.WaitOne(); + + return choiceIndex._index; + } + + /// + /// Executes all actions in parallel + /// Returns when the first one completes + /// + /// Actions to execute + public int Choice(params Action[] actions) + { + return Choice((IEnumerable)actions); + } + + /// + /// Executes actions in sequence asynchronously. + /// Returns immediately. + /// + /// A state context that passes + /// Actions to execute in the order they should run + public void Pipe(T pipeState, IEnumerable> actions) + { + WIGStartInfo wigStartInfo = new WIGStartInfo { StartSuspended = true }; + IWorkItemsGroup workItemsGroup = CreateWorkItemsGroup(1, wigStartInfo); + foreach (Action action in actions) + { + Action act = action; + workItemsGroup.QueueWorkItem(() => act(pipeState)); + } + workItemsGroup.Start(); + workItemsGroup.WaitForIdle(); + } + + /// + /// Executes actions in sequence asynchronously. + /// Returns immediately. + /// + /// + /// Actions to execute in the order they should run + public void Pipe(T pipeState, params Action[] actions) + { + Pipe(pipeState, (IEnumerable>)actions); + } + #endregion + } + #endregion +} diff --git a/SmartThreadPool/SmartThreadPool.csproj b/SmartThreadPool/SmartThreadPool.csproj index 81b8e5d..436b16f 100644 --- a/SmartThreadPool/SmartThreadPool.csproj +++ b/SmartThreadPool/SmartThreadPool.csproj @@ -1,62 +1,62 @@ - - - - net40;net45;net46;netstandard2.0;netcoreapp3.0;netcoreapp3.1 - SmartThreadPool - SmartThreadPool - TRACE; - true - 2.3.0 - - Ami Bar - Smart Thread Pool, implemented in .NET - SmartThreadPool Thread Pool .NET - MS-PL - Ami Bar - https://github.com/amibar/SmartThreadPool - https://github.com/amibar/SmartThreadPool - Debug;Release;Publish - Added .net core 3.0 and 3.1 support - SmartThreadPool.dll - 2.3.0.0 - 2.3.0.0 - - - - - - - - - - - - - - - - - <_Parameter1>STPTests - - - - - - <_Parameter1>STPTests - - - - - - <_Parameter1>STPTests,PublicKey=00240000048000009400000006020000002400005253413100040000010001004fe3d39add741ba7c8d52cd1eb0d94c7d79060ad956cbaff0e51c1dce94db10356b261778bc1ac3114b3218434da6fcd8416dd5507653809598f7d2afc422099ce4f6b7b0477f18e6c57c727ef2a7ab6ee56e6b4589fe44cb0e25f2875a3c65ab0383ee33c4dd93023f7ce1218bebc8b7a9a1dac878938f5c4f45ea74b6bd8ad - - - - - true - true - ..\publish\Keys\STP.snk - ..\publish\dist\bin - - - + + + + net40;net45;net46;netstandard2.0;netcoreapp3.0;netcoreapp3.1;net5.0 + SmartThreadPool + SmartThreadPool + TRACE; + true + 2.3.0 + + Ami Bar + Smart Thread Pool, implemented in .NET + SmartThreadPool Thread Pool .NET + MS-PL + Ami Bar + https://github.com/amibar/SmartThreadPool + https://github.com/amibar/SmartThreadPool + Debug;Release;Publish + Added .net core 3.1 and 5.0 support + SmartThreadPool.dll + 2.3.0.0 + 2.3.0.0 + + + + + + + + + + + + + + + + + <_Parameter1>STPTests + + + + + + <_Parameter1>STPTests + + + + + + <_Parameter1>STPTests,PublicKey=00240000048000009400000006020000002400005253413100040000010001004fe3d39add741ba7c8d52cd1eb0d94c7d79060ad956cbaff0e51c1dce94db10356b261778bc1ac3114b3218434da6fcd8416dd5507653809598f7d2afc422099ce4f6b7b0477f18e6c57c727ef2a7ab6ee56e6b4589fe44cb0e25f2875a3c65ab0383ee33c4dd93023f7ce1218bebc8b7a9a1dac878938f5c4f45ea74b6bd8ad + + + + + true + true + ..\publish\Keys\STP.snk + ..\publish\dist\bin + + +