Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark PLINQ as enabled in WASM and make browser compat changes #58227

Merged
merged 5 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/libraries/System.Linq.Parallel/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,5 @@
<Import Project="..\Directory.Build.props" />
<PropertyGroup>
<StrongNameKeyId>Microsoft</StrongNameKeyId>
<UnsupportedOSPlatforms>browser</UnsupportedOSPlatforms>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace System.Linq.Parallel
/// This is a bounded channel meant for single-producer/single-consumer scenarios.
/// </summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class AsynchronousChannel<T> : IDisposable
{
// The producer will be blocked once the channel reaches a capacity, and unblocked
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ namespace System.Linq.Parallel
///
/// </summary>
/// <typeparam name="T"></typeparam>
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class AsynchronousChannelMergeEnumerator<T> : MergeEnumerator<T>
{
private readonly AsynchronousChannel<T>[] _channels; // The channels being enumerated.
Expand Down Expand Up @@ -220,6 +221,7 @@ private bool MoveNextSlowPath()
break;
}

Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
Debug.Assert(_consumerEvent != null);
//This Wait() does not require cancellation support as it will wake up when all the producers into the
//channel have finished. Hence, if all the producers wake up on cancellation, so will this.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ internal DefaultMergeHelper(PartitionedStream<TInputOutput, TIgnoreKey> partitio
{
if (partitions.PartitionCount > 1)
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
_asyncChannels =
MergeExecutor<TInputOutput>.MakeAsynchronousChannels(partitions.PartitionCount, options, consumerEvent, cancellationState.MergedCancellationToken);
_channelEnumerator = new AsynchronousChannelMergeEnumerator<TInputOutput>(_taskGroupState, _asyncChannels, consumerEvent);
Expand Down Expand Up @@ -99,6 +100,7 @@ void IMergeHelper<TInputOutput>.Execute()
{
if (_asyncChannels != null)
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
SpoolingTask.SpoolPipeline<TInputOutput, TIgnoreKey>(_taskGroupState, _partitions, _asyncChannels, _taskScheduler);
}
else if (_syncChannels != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ internal static MergeExecutor<TInputOutput> Execute<TKey>(

if (partitions.PartitionCount > 1)
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
// We use a pipelining ordered merge
mergeExecutor._mergeHelper = new OrderPreservingPipeliningMergeHelper<TInputOutput, TKey>(
partitions, taskScheduler, cancellationState, autoBuffered, queryId, partitions.KeyComparer);
Expand Down Expand Up @@ -140,6 +141,7 @@ public IEnumerator<TInputOutput> GetEnumerator()
// An array of asynchronous channels, one for each partition.
//

[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal static AsynchronousChannel<TInputOutput>[] MakeAsynchronousChannels(int partitionCount, ParallelMergeOptions options, IntValueEvent? consumerEvent, CancellationToken cancellationToken)
{
AsynchronousChannel<TInputOutput>[] channels = new AsynchronousChannel<TInputOutput>[partitionCount];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ namespace System.Linq.Parallel
/// Finally, if the producer notices that its buffer has exceeded an even greater threshold, it will
/// go to sleep and wait until the consumer takes the entire buffer.
/// </summary>
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class OrderPreservingPipeliningMergeHelper<TOutput, TKey> : IMergeHelper<TOutput>
{
private readonly QueryTaskGroupState _taskGroupState; // State shared among tasks.
Expand Down Expand Up @@ -354,12 +355,14 @@ private void ThrowIfInTearDown()
{
// Wake up all producers. Since the cancellation token has already been
// set, the producers will eventually stop after waking up.
object[] locks = _mergeHelper._bufferLocks;
for (int i = 0; i < locks.Length; i++)
{
lock (locks[i])
if (!ParallelEnumerable.SinglePartitionMode) {
object[] locks = _mergeHelper._bufferLocks;
for (int i = 0; i < locks.Length; i++)
{
Monitor.Pulse(locks[i]);
lock (locks[i])
{
Monitor.Pulse(locks[i]);
}
}
}

Expand Down Expand Up @@ -398,6 +401,9 @@ private bool TryWaitForElement(int producer, ref Pair<TKey, TOutput> element)
return false;
}

if (ParallelEnumerable.SinglePartitionMode)
return false;

_mergeHelper._consumerWaiting[producer] = true;
Monitor.Wait(bufferLock);

Expand All @@ -416,6 +422,7 @@ private bool TryWaitForElement(int producer, ref Pair<TKey, TOutput> element)
// If the producer is waiting, wake it up
if (_mergeHelper._producerWaiting[producer])
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
Monitor.Pulse(bufferLock);
_mergeHelper._producerWaiting[producer] = false;
}
Expand Down Expand Up @@ -469,15 +476,17 @@ private bool TryGetPrivateElement(int producer, ref Pair<TKey, TOutput> element)
public override void Dispose()
{
// Wake up any waiting producers
int partitionCount = _mergeHelper._buffers.Length;
for (int producer = 0; producer < partitionCount; producer++)
{
object bufferLock = _mergeHelper._bufferLocks[producer];
lock (bufferLock)
if (!ParallelEnumerable.SinglePartitionMode) {
int partitionCount = _mergeHelper._buffers.Length;
for (int producer = 0; producer < partitionCount; producer++)
{
if (_mergeHelper._producerWaiting[producer])
object bufferLock = _mergeHelper._bufferLocks[producer];
lock (bufferLock)
{
Monitor.Pulse(bufferLock);
if (_mergeHelper._producerWaiting[producer])
{
Monitor.Pulse(bufferLock);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ internal HashRepartitionEnumerator(
_barrier = barrier;
_valueExchangeMatrix = valueExchangeMatrix;
_cancellationToken = cancellationToken;

if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);
}

//---------------------------------------------------------------------------------------
Expand Down Expand Up @@ -120,6 +123,8 @@ internal override bool MoveNext(ref Pair<TInputOutput, THashKey> currentElement,
return false;
}

Debug.Assert(!ParallelEnumerable.SinglePartitionMode);

Mutables? mutables = _mutables;
if (mutables == null)
mutables = _mutables = new Mutables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ internal override bool MoveNext(ref Pair<TInputOutput, THashKey> currentElement,
return false;
}

Debug.Assert(!ParallelEnumerable.SinglePartitionMode);

Mutables? mutables = _mutables;
if (mutables == null)
mutables = _mutables = new Mutables();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ internal override void WrapPartitionedStream<TKey>(
PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, bool preferStriping, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

// Generate the shared data.
Shared<int> sharedEmptyCount = new Shared<int>(0);
Expand Down Expand Up @@ -153,7 +155,13 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu

if (!moveNextResult)
{
if (_partitionIndex == 0)
if (ParallelEnumerable.SinglePartitionMode)
{
currentElement = _defaultValue;
currentKey = default(TKey)!;
return true;
}
else if (_partitionIndex == 0)
{
// If this is the 0th partition, we must wait for all others. Note: we could
// actually do a wait-any here: if at least one other partition finds an element,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ private void WrapHelper<TKey>(
PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

// Generate the shared data.
FirstQueryOperatorState<TKey> operatorState = new FirstQueryOperatorState<TKey>();
Expand Down Expand Up @@ -200,17 +202,20 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu
}
finally
{
// No matter whether we exit due to an exception or normal completion, we must ensure
// that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
_sharedBarrier.Signal();
if (!ParallelEnumerable.SinglePartitionMode) {
// No matter whether we exit due to an exception or normal completion, we must ensure
// that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
_sharedBarrier.Signal();
}
}

_alreadySearched = true;

// Wait only if we may have the result
if (_partitionId == _operatorState._partitionId)
{
_sharedBarrier.Wait(_cancellationToken);
if (!ParallelEnumerable.SinglePartitionMode)
_sharedBarrier.Wait(_cancellationToken);

// Now re-read the shared index. If it's the same as ours, we won and return true.
if (_partitionId == _operatorState._partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ internal override void WrapPartitionedStream<TKey>(
private void WrapHelper<TKey>(PartitionedStream<TSource, TKey> inputStream, IPartitionedStreamRecipient<TSource> recipient, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

// Generate the shared data.
LastQueryOperatorState<TKey> operatorState = new LastQueryOperatorState<TKey>();
Expand Down Expand Up @@ -212,7 +214,8 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TSource cu
// Only if we have a candidate do we wait.
if (_partitionId == _operatorState._partitionId)
{
_sharedBarrier.Wait(_cancellationToken);
if (!ParallelEnumerable.SinglePartitionMode)
_sharedBarrier.Wait(_cancellationToken);

// Now re-read the shared index. If it's the same as ours, we won and return true.
if (_operatorState._partitionId == _partitionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ private void WrapHelper<TKey>(PartitionedStream<TResult, TKey> inputStream, IPar
FixedMaxHeap<TKey> sharedIndices = new FixedMaxHeap<TKey>(_count, inputStream.KeyComparer); // an array used to track the sequence of indices leading up to the Nth index
CountdownEvent sharedBarrier = new CountdownEvent(partitionCount); // a barrier to synchronize before yielding

if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

PartitionedStream<TResult, TKey> outputStream =
new PartitionedStream<TResult, TKey>(partitionCount, inputStream.KeyComparer, OrdinalIndexState);
for (int i = 0; i < partitionCount; i++)
Expand Down Expand Up @@ -222,9 +225,11 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TResult cu
}
}

// Before exiting the search phase, we will synchronize with others. This is a barrier.
_sharedBarrier.Signal();
_sharedBarrier.Wait(_cancellationToken);
if (!ParallelEnumerable.SinglePartitionMode) {
// Before exiting the search phase, we will synchronize with others. This is a barrier.
_sharedBarrier.Signal();
_sharedBarrier.Wait(_cancellationToken);
}

// Publish the buffer and set the index to just before the 1st element.
_buffer = buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ internal override void WrapPartitionedStream<TKey>(
private void WrapHelper<TKey>(PartitionedStream<TResult, TKey> inputStream, IPartitionedStreamRecipient<TResult> recipient, QuerySettings settings)
{
int partitionCount = inputStream.PartitionCount;
if (ParallelEnumerable.SinglePartitionMode)
Debug.Assert(partitionCount == 1);

// Create shared data.
OperatorState<TKey> operatorState = new OperatorState<TKey>();
Expand Down Expand Up @@ -321,13 +323,16 @@ internal override bool MoveNext([MaybeNullWhen(false), AllowNull] ref TResult cu
}
finally
{
// No matter whether we exit due to an exception or normal completion, we must ensure
// that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
_sharedBarrier.Signal();
if (!ParallelEnumerable.SinglePartitionMode) {
// No matter whether we exit due to an exception or normal completion, we must ensure
// that we signal other partitions that we have completed. Otherwise, we can cause deadlocks.
_sharedBarrier.Signal();
}
}

// Before exiting the search phase, we will synchronize with others. This is a barrier.
_sharedBarrier.Wait(_cancellationToken);
if (!ParallelEnumerable.SinglePartitionMode)
_sharedBarrier.Wait(_cancellationToken);

// Publish the buffer and set the index to just before the 1st element.
_buffer = buffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

namespace System.Linq.Parallel
{
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class OrderPreservingPipeliningSpoolingTask<TOutput, TKey> : SpoolingTaskBase
{
private readonly QueryTaskGroupState _taskGroupState; // State shared among tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ internal static void SpoolStopAndGo<TInputOutput, TIgnoreKey>(
// taskScheduler - the task manager on which to execute
//

[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal static void SpoolPipeline<TInputOutput, TIgnoreKey>(
QueryTaskGroupState groupState, PartitionedStream<TInputOutput, TIgnoreKey> partitions,
AsynchronousChannel<TInputOutput>[] channels, TaskScheduler taskScheduler)
Expand Down Expand Up @@ -264,6 +265,7 @@ protected override void SpoolingFinally()
/// </summary>
/// <typeparam name="TInputOutput"></typeparam>
/// <typeparam name="TIgnoreKey"></typeparam>
[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
internal sealed class PipelineSpoolingTask<TInputOutput, TIgnoreKey> : SpoolingTaskBase
{
// The data source from which to pull data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ internal override TInputOutput[] Sort()
// Step 3. Enter into the merging phases, each separated by several barriers.
if (_partitionCount > 1)
{
Debug.Assert(!ParallelEnumerable.SinglePartitionMode);
// We only need to merge if there is more than 1 partition.
MergeSortCooperatively();
}
Expand Down Expand Up @@ -357,6 +358,7 @@ private void QuickSortIndicesInPlace(GrowingArray<TKey> keys, List<TInputOutput>
// negatively impact speedups.
//

[System.Runtime.Versioning.UnsupportedOSPlatform("browser")]
private void MergeSortCooperatively()
{
CancellationToken cancelToken = _groupState.CancellationState.MergedCancellationToken;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public static class ParallelEnumerable
+ "System.Collections.Generic.IEnumerable<T>. To fix this problem, use the AsParallel() extension method "
+ "to convert the right data source to System.Linq.ParallelQuery<T>.";

// When running in single partition mode, PLINQ operations will occur on a single partition and will not
// be executed in parallel, but will retain PLINQ semantics (exceptions wrapped as aggregates, etc).
[System.Runtime.Versioning.SupportedOSPlatformGuard("browser")]
internal static bool SinglePartitionMode => OperatingSystem.IsBrowser();

//-----------------------------------------------------------------------------------
// Converts any IEnumerable<TSource> into something that can be the target of parallel
// query execution.
Expand Down
2 changes: 0 additions & 2 deletions src/libraries/tests.proj
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@
<!-- Mono-Browser ignores runtimeconfig.template.json (e.g. for this it has "System.Globalization.EnforceJapaneseEraYearRanges": true) -->
<ProjectExclusions Include="$(MSBuildThisFileDirectory)System.Globalization.Calendars\tests\CalendarTestWithConfigSwitch\System.Globalization.CalendarsWithConfigSwitch.Tests.csproj" />

<ProjectExclusions Include="$(MSBuildThisFileDirectory)System.Linq.Parallel\tests\System.Linq.Parallel.Tests.csproj" />

<!-- https://github.com/dotnet/runtime/issues/37669 -->
<ProjectExclusions Include="$(MSBuildThisFileDirectory)Microsoft.Extensions.DependencyModel\tests\Microsoft.Extensions.DependencyModel.Tests.csproj" />
<ProjectExclusions Include="$(MSBuildThisFileDirectory)Microsoft.Extensions.Hosting\tests\UnitTests\Microsoft.Extensions.Hosting.Unit.Tests.csproj" />
Expand Down