Skip to content

Commit

Permalink
Mark PLINQ as enabled in WASM and make browser compat changes (#58227)
Browse files Browse the repository at this point in the history
Co-authored-by: Jeff Handley <jeffhandley@users.noreply.github.com>
  • Loading branch information
kg and jeffhandley committed Nov 22, 2021
1 parent 77165a0 commit 8daae66
Show file tree
Hide file tree
Showing 18 changed files with 84 additions and 28 deletions.
1 change: 0 additions & 1 deletion src/libraries/System.Linq.Parallel/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@
<Import Project="..\Directory.Build.props" />
<PropertyGroup>
<StrongNameKeyId>Microsoft</StrongNameKeyId>
<UnsupportedOSPlatforms>browser</UnsupportedOSPlatforms>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace System.Linq.Parallel
/// This is a bounded channel meant for single-producer/single-consumer scenarios.
/// </summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class AsynchronousChannel<T> : IDisposable
{
// The producer will be blocked once the channel reaches a capacity, and unblocked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace System.Linq.Parallel
///
/// </summary>
/// <typeparam name="T"></typeparam>
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class AsynchronousChannelMergeEnumerator<T> : MergeEnumerator<T>
{
private readonly AsynchronousChannel<T>[] _channels; // The channels being enumerated.
Expand Down Expand Up @@ -220,6 +221,7 @@ private bool MoveNextSlowPath()
break;
}

Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
Debug.Assert(_consumerEvent != null);
//This Wait() does not require cancellation support as it will wake up when all the producers into the
//channel have finished. Hence, if all the producers wake up on cancellation, so will this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ internal DefaultMergeHelper(PartitionedStream<TInputOutput, TIgnoreKey> partitio
{
if (partitions.PartitionCount > 1)
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
_asyncChannels =
MergeExecutor<TInputOutput>.MakeAsynchronousChannels(partitions.PartitionCount, options, consumerEvent, cancellationState.MergedCancellationToken);
_channelEnumerator = new AsynchronousChannelMergeEnumerator<TInputOutput>(_taskGroupState, _asyncChannels, consumerEvent);
Expand Down Expand Up @@ -99,6 +100,7 @@ void IMergeHelper<TInputOutput>.Execute()
{
if (_asyncChannels != null)
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
SpoolingTask.SpoolPipeline<TInputOutput, TIgnoreKey>(_taskGroupState, _partitions, _asyncChannels, _taskScheduler);
}
else if (_syncChannels != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ internal static MergeExecutor<TInputOutput> Execute<TKey>(

if (partitions.PartitionCount > 1)
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
// We use a pipelining ordered merge
mergeExecutor._mergeHelper = new OrderPreservingPipeliningMergeHelper<TInputOutput, TKey>(
partitions, taskScheduler, cancellationState, autoBuffered, queryId, partitions.KeyComparer);
Expand Down Expand Up @@ -140,6 +141,7 @@ public IEnumerator<TInputOutput> GetEnumerator()
// An array of asynchronous channels, one for each partition.
//

[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal static AsynchronousChannel<TInputOutput>[] MakeAsynchronousChannels(int partitionCount, ParallelMergeOptions options, IntValueEvent? consumerEvent, CancellationToken cancellationToken)
{
AsynchronousChannel<TInputOutput>[] channels = new AsynchronousChannel<TInputOutput>[partitionCount];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace System.Linq.Parallel
/// Finally, if the producer notices that its buffer has exceeded an even greater threshold, it will
/// go to sleep and wait until the consumer takes the entire buffer.
/// </summary>
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class OrderPreservingPipeliningMergeHelper<TOutput, TKey> : IMergeHelper<TOutput>
{
private readonly QueryTaskGroupState _taskGroupState; // State shared among tasks.
Expand Down Expand Up @@ -354,12 +355,14 @@ private void ThrowIfInTearDown()
{
// Wake up all producers. Since the cancellation token has already been
// set, the producers will eventually stop after waking up.
object[] locks = _mergeHelper._bufferLocks;
for (int i = 0; i < locks.Length; i++)
{
lock (locks[i])
if (!ParallelEnumerable.SinglePartitionMode) {
object[] locks = _mergeHelper._bufferLocks;
for (int i = 0; i < locks.Length; i++)
{
Monitor.Pulse(locks[i]);
lock (locks[i])
{
Monitor.Pulse(locks[i]);
}
}
}

Expand Down Expand Up @@ -398,6 +401,9 @@ private bool TryWaitForElement(int producer, ref Pair<TKey, TOutput> element)
return false;
}

if (ParallelEnumerable.SinglePartitionMode)
return false;

_mergeHelper._consumerWaiting[producer] = true;
Monitor.Wait(bufferLock);

Expand All @@ -416,6 +422,7 @@ private bool TryWaitForElement(int producer, ref Pair<TKey, TOutput> element)
// If the producer is waiting, wake it up
if (_mergeHelper._producerWaiting[producer])
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
Monitor.Pulse(bufferLock);
_mergeHelper._producerWaiting[producer] = false;
}
Expand Down Expand Up @@ -469,15 +476,17 @@ private bool TryGetPrivateElement(int producer, ref Pair<TKey, TOutput> element)
public override void Dispose()
{
// Wake up any waiting producers
int partitionCount = _mergeHelper._buffers.Length;
for (int producer = 0; producer < partitionCount; producer++)
{
object bufferLock = _mergeHelper._bufferLocks[producer];
lock (bufferLock)
if (!ParallelEnumerable.SinglePartitionMode) {
int partitionCount = _mergeHelper._buffers.Length;
for (int producer = 0; producer < partitionCount; producer++)
{
if (_mergeHelper._producerWaiting[producer])
object bufferLock = _mergeHelper._bufferLocks[producer];
lock (bufferLock)
{
Monitor.Pulse(bufferLock);
if (_mergeHelper._producerWaiting[producer])
{
Monitor.Pulse(bufferLock);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ internal HashRepartitionEnumerator(
_barrier = barrier;
_valueExchangeMatrix = valueExchangeMatrix;
_cancellationToken = cancellationToken;

if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);
}

//---------------------------------------------------------------------------------------
Expand Down Expand Up @@ -120,6 +123,8 @@ internal override bool MoveNext(ref Pair<TInputOutput, THashKey> currentElement,
return false;
}

Debug.Assert(!ParallelEnumerable.SinglePartitionMode);

Mutables? mutables = _mutables;
if (mutables == null)
mutables = _mutables = new Mutables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ internal override bool MoveNext(ref Pair<TInputOutput, THashKey> currentElement,
return false;
}

Debug.Assert(!ParallelEnumerable.SinglePartitionMode);

Mutables? mutables = _mutables;
if (mutables == null)
mutables = _mutables = new Mutables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ internal override void WrapPartitionedStream<TKey>(
PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

// Generate the shared data.
Shared<int> sharedEmptyCount = new Shared<int>(0);
Expand Down Expand Up @@ -153,7 +155,13 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu

if (!moveNextResult)
{
if (_partitionIndex == 0)
if (ParallelEnumerable.SinglePartitionMode)
{
currentElement = _defaultValue;
currentKey = default(TKey)!;
return true;
}
else if (_partitionIndex == 0)
{
// If this is the 0th partition, we must wait for all others. Note: we could
// actually do a wait-any here: if at least one other partition finds an element,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ private void WrapHelper<TKey>(
PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

// Generate the shared data.
FirstQueryOperatorState<TKey> operatorState = new FirstQueryOperatorState<TKey>();
Expand Down Expand Up @@ -200,17 +202,20 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu
}
finally
{
// No matter whether we exit due to an exception or normal completion, we must ensure
// that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
_sharedBarrier.Signal();
if (!ParallelEnumerable.SinglePartitionMode) {
// No matter whether we exit due to an exception or normal completion, we must ensure
// that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
_sharedBarrier.Signal();
}
}

_alreadySearched = true;

// Wait only if we may have the result
if (_partitionId == _operatorState._partitionId)
{
_sharedBarrier.Wait(_cancellationToken);
if (!ParallelEnumerable.SinglePartitionMode)
_sharedBarrier.Wait(_cancellationToken);

// Now re-read the shared index. If it's the same as ours, we won and return true.
if (_partitionId == _operatorState._partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ internal override void WrapPartitionedStream<TKey>(
private void WrapHelper<TKey>(PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

// Generate the shared data.
LastQueryOperatorState<TKey> operatorState = new LastQueryOperatorState<TKey>();
Expand Down Expand Up @@ -212,7 +214,8 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu
// Only if we have a candidate do we wait.
if (_partitionId == _operatorState._partitionId)
{
_sharedBarrier.Wait(_cancellationToken);
if (!ParallelEnumerable.SinglePartitionMode)
_sharedBarrier.Wait(_cancellationToken);

// Now re-read the shared index. If it's the same as ours, we won and return true.
if (_operatorState._partitionId == _partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ private void WrapHelper<TKey>(PartitionedStream<TResult, TKey> inputStream, IPar
FixedMaxHeap<TKey> sharedIndices = new FixedMaxHeap<TKey>(_count, inputStream.KeyComparer); // an array used to track the sequence of indices leading up to the Nth index
CountdownEvent sharedBarrier = new CountdownEvent(partitionCount); // a barrier to synchronize before yielding

if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

PartitionedStream<TResult, TKey> outputStream =
new PartitionedStream<TResult, TKey>(partitionCount, inputStream.KeyComparer, OrdinalIndexState);
for (int i = 0; i < partitionCount; i++)
Expand Down Expand Up @@ -222,9 +225,11 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TResult cu
}
}

// Before exiting the search phase, we will synchronize with others. This is a barrier.
_sharedBarrier.Signal();
_sharedBarrier.Wait(_cancellationToken);
if (!ParallelEnumerable.SinglePartitionMode) {
// Before exiting the search phase, we will synchronize with others. This is a barrier.
_sharedBarrier.Signal();
_sharedBarrier.Wait(_cancellationToken);
}

// Publish the buffer and set the index to just before the 1st element.
_buffer = buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ internal override void WrapPartitionedStream<TKey>(
private void WrapHelper<TKey>(PartitionedStream<TResult, TKey> inputStream, IPartitionedStreamRecipient<TResult> recipient, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

// Create shared data.
OperatorState<TKey> operatorState = new OperatorState<TKey>();
Expand Down Expand Up @@ -321,13 +323,16 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TResult cu
}
finally
{
// No matter whether we exit due to an exception or normal completion, we must ensure
// that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
_sharedBarrier.Signal();
if (!ParallelEnumerable.SinglePartitionMode) {
// No matter whether we exit due to an exception or normal completion, we must ensure
// that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
_sharedBarrier.Signal();
}
}

// Before exiting the search phase, we will synchronize with others. This is a barrier.
_sharedBarrier.Wait(_cancellationToken);
if (!ParallelEnumerable.SinglePartitionMode)
_sharedBarrier.Wait(_cancellationToken);

// Publish the buffer and set the index to just before the 1st element.
_buffer = buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

namespace System.Linq.Parallel
{
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class OrderPreservingPipeliningSpoolingTask<TOutput, TKey> : SpoolingTaskBase
{
private readonly QueryTaskGroupState _taskGroupState; // State shared among tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ internal static void SpoolStopAndGo<TInputOutput, TIgnoreKey>(
// taskScheduler - the task manager on which to execute
//

[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal static void SpoolPipeline<TInputOutput, TIgnoreKey>(
QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions,
AsynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler)
Expand Down Expand Up @@ -264,6 +265,7 @@ protected override void SpoolingFinally()
/// </summary>
/// <typeparam name="TInputOutput"></typeparam>
/// <typeparam name="TIgnoreKey"></typeparam>
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class PipelineSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase
{
// The data source from which to pull data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ internal override TInputOutput[] Sort()
// Step 3. Enter into the merging phases, each separated by several barriers.
if (_partitionCount > 1)
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
// We only need to merge if there is more than 1 partition.
MergeSortCooperatively();
}
Expand Down Expand Up @@ -357,6 +358,7 @@ private void QuickSortIndicesInPlace(GrowingArray<TKey> keys, List<TInputOutput>
// negatively impact speedups.
//

[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
private void MergeSortCooperatively()
{
CancellationToken cancelToken = _groupState.CancellationState.MergedCancellationToken;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public static class ParallelEnumerable
+ "System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() extension method "
+ "to convert the right data source to System.Linq.ParallelQuery<T>.";

// When running in single partition mode, PLINQ operations will occur on a single partition and will not
// be executed in parallel, but will retain PLINQ semantics (exceptions wrapped as aggregates, etc).
[System.Runtime.Versioning.SupportedOSPlatformGuard("browser")]
internal static bool SinglePartitionMode => OperatingSystem.IsBrowser();

//-----------------------------------------------------------------------------------
// Converts any IEnumerable<TSource> into something that can be the target of parallel
// query execution.
Expand Down
2 changes: 0 additions & 2 deletions src/libraries/tests.proj
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,6 @@
<!-- Mono-Browser ignores runtimeconfig.template.json (e.g. for this it has "System.Globalization.EnforceJapaneseEraYearRanges": true) -->
<ProjectExclusions Include="$(MSBuildThisFileDirectory)System.Globalization.Calendars\tests\CalendarTestWithConfigSwitch\System.Globalization.CalendarsWithConfigSwitch.Tests.csproj" />

<ProjectExclusions Include="$(MSBuildThisFileDirectory)System.Linq.Parallel\tests\System.Linq.Parallel.Tests.csproj" />

<!-- https://github.com/dotnet/runtime/issues/37669 -->
<ProjectExclusions Include="$(MSBuildThisFileDirectory)Microsoft.Extensions.DependencyModel\tests\Microsoft.Extensions.DependencyModel.Tests.csproj" />
<ProjectExclusions Include="$(MSBuildThisFileDirectory)Microsoft.Extensions.Hosting\tests\UnitTests\Microsoft.Extensions.Hosting.Unit.Tests.csproj" />
Expand Down

0 comments on commit 8daae66

Please sign in to comment.