Coder Perfect

Asynchronous lambda for parallel foreach

Problem

I’d like to handle a collection in parallel, but I’m having problems doing so, so I’m hoping for some assistance.

The problem arises when I wish to call an async method in C# from within the parallel loop’s lambda. Consider the following scenario:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Because all the threads produced are effectively just background threads and the Parallel, the problem emerges when the count is 0. The ForEach method does not wait for a response. The method looks like this if I remove the async keyword:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

Also works, but it disables the await cleverness, so I have to deal with exceptions manually. (Removed for the sake of brevity.)

Can I use the await keyword within the lambda to build a Parallel.ForEach loop? Is that even possible?

The Parallel’s first prototype. I want the ForEach method to wait for my asynchronous lambda, which accepts an ActionT> as an argument.

Asked by clausndk

Solution #1

If you only need simple parallelism, follow these steps:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

Check out Stephen Toub’s ForEachAsync post if you need something more advanced.

Answered by Stephen Cleary

Solution #2

The ParallelForEachAsync extension method from the AsyncEnumerator NuGet package can be used:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

Disclaimer: I’m the author of the open source AsyncEnumerator library, which is released under the MIT license, and I’m only sharing this message to assist the community.

Answered by Serge Semenov

Solution #3

Parallel is one of the new.NET 6 APIs. ForEachAsync is an asynchronous work scheduling method that allows you to control the degree of parallelism:

var urls = new [] 
{
    "https://dotnet.microsoft.com",
    "https://www.microsoft.com",
    "https://stackoverflow.com"
};

var client = new HttpClient();

var options = new ParallelOptions { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, options, async (url, token) =>
{
    var targetPath = Path.Combine(Path.GetTempPath(), "http_cache", url);

    var response = await client.GetAsync(url);

    if (response.IsSuccessStatusCode)
    {
        using var target = File.OpenWrite(targetPath);

        await response.Content.CopyToAsync(target);
    }
});

Scott Hanselman’s blog has another example.

For the sake of reference, here is the source.

Answered by Majid Shahabfar

Solution #4

You may regulate parallelism with SemaphoreSlim.

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  await throttler.WaitAsync();
  try
  {
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;

Answered by Felipe l

Solution #5

The simplest feasible extension approach, based on other responses and the paper cited in the acceptable answer:

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync();
        try
        {
            await asyncAction(item).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

UPDATE: Here’s a quick fix that includes a cancellation token, as suggested in the comments (untested)

public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, CancellationToken, Task> asyncAction, int maxDegreeOfParallelism, CancellationToken cancellationToken)
{
    var throttler = new SemaphoreSlim(initialCount: maxDegreeOfParallelism);
    var tasks = source.Select(async item =>
    {
        await throttler.WaitAsync(cancellationToken);
        if (cancellationToken.IsCancellationRequested) return;

        try
        {
            await asyncAction(item, cancellationToken).ConfigureAwait(false);
        }
        finally
        {
            throttler.Release();
        }
    });
    await Task.WhenAll(tasks);
}

Answered by Alex from Jitbit

Post is based on https://stackoverflow.com/questions/15136542/parallel-foreach-with-asynchronous-lambda