Problem
I’d like to handle a collection in parallel, but I’m having problems doing so, so I’m hoping for some assistance.
The problem arises when I wish to call an async method in C# from within the parallel loop’s lambda. Consider the following scenario:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
Because all the threads produced are effectively just background threads and the Parallel, the problem emerges when the count is 0. The ForEach method does not wait for a response. The method looks like this if I remove the async keyword:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
Also works, but it disables the await cleverness, so I have to deal with exceptions manually. (Removed for the sake of brevity.)
Can I use the await keyword within the lambda to build a Parallel.ForEach loop? Is that even possible?
The Parallel’s first prototype. I want the ForEach method to wait for my asynchronous lambda, which accepts an ActionT> as an argument.
Asked by clausndk
Solution #1
If you only need simple parallelism, follow these steps:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
Check out Stephen Toub’s ForEachAsync post if you need something more advanced.
Answered by Stephen Cleary
Solution #2
The ParallelForEachAsync extension method from the AsyncEnumerator NuGet package can be used:
using Dasync.Collections;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
Disclaimer: I’m the author of the open source AsyncEnumerator library, which is released under the MIT license, and I’m only sharing this message to assist the community.
Answered by Serge Semenov
Solution #3
Parallel is one of the new.NET 6 APIs. ForEachAsync is an asynchronous work scheduling method that allows you to control the degree of parallelism:
var urls = new []
{
"https://dotnet.microsoft.com",
"https://www.microsoft.com",
"https://stackoverflow.com"
};
var client = new HttpClient();
var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);
var response = await client.GetAsync(url);
if (response.IsSuccessStatusCode)
{
using var target = File.OpenWrite(targetPath);
await response.Content.CopyToAsync(target);
}
});
Scott Hanselman’s blog has another example.
For the sake of reference, here is the source.
Answered by Majid Shahabfar
Solution #4
You may regulate parallelism with SemaphoreSlim.
var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
await throttler.WaitAsync();
try
{
var response = await GetData(item);
bag.Add(response);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
var count = bag.Count;
Answered by Felipe l
Solution #5
The simplest feasible extension approach, based on other responses and the paper cited in the acceptable answer:
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync();
try
{
await asyncAction(item).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
UPDATE: Here’s a quick fix that includes a cancellation token, as suggested in the comments (untested)
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
var tasks = source.Select(async item =>
{
await throttler.WaitAsync(cancellationToken);
if (cancellationToken.IsCancellationRequested) return;
try
{
await asyncAction(item, cancellationToken).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
}
Answered by Alex from Jitbit
Post is based on https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda