Problem
I need to make a number of WCF calls in a metro app. I need to make a lot of calls, so I’ll have to do them in a loop. The issue is that the parallel loop ends before all of the WCF calls are finished.
How would you refactor this to make it work the way it should?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Asked by Darthg8r
Solution #1
The idea behind Parallel.ForEach() is that you have a collection of threads, each of which processes a portion of it. As you may have seen, async-await does not work when you want to release the thread for the duration of the async call.
You could “correct” this by blocking the ForEach() threads, but it defeats the purpose of async-await in the first place.
Instead of using Parallel, you might utilize TPL Dataflow. ForEach() is a method that works well with asynchronous Tasks.
Your function might be written using a TransformBlock that uses the async lambda to transform each id into a Customer. This block has the option of running in parallel. That block would be linked to an ActionBlock that outputs each Customer to the console. You can Post() each id to the TransformBlock after you’ve built up the block network.
In code:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Although you should probably limit the TransformBlock’s parallelism to a tiny constant. You could also use SendAsync() to limit the TransformBlock’s size and add things to it asynchronously if the collection is too large.
When compared to your code (if it works), the writing will begin as soon as a single item is completed, rather than waiting until all of the processing is completed.
Answered by svick
Solution #2
svick’s response is (as usual) fantastic.
Dataflow, on the other hand, I find to be more effective when dealing with vast amounts of data. When you require an async-compatible queue, for example.
A easier option in your instance is to employ async-style parallelism:
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
{
ICustomerRepo repo = new CustomerRepo();
return repo.GetCustomer(i);
});
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
Answered by Stephen Cleary
Solution #3
Using DataFlow, as recommended by svick, may be overkill, and Stephen’s response lacks the ability to control the operation’s concurrency. That can, however, be accomplished quite easily:
public static async Task RunWithMaxDegreeOfConcurrency<T>(
int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
//observe exceptions here
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
The ToArray() calls could be made faster by replacing finished tasks with an array instead of a list, but I doubt it would make much of a difference in most cases. As an example, in response to the OP’s question:
RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
EDIT: Eli Arbel, a fellow SO user and TPL guru, linked me to a relevant piece by Stephen Toub. His implementation, as usual, is both elegant and efficient:
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
Answered by Ohad Schneider
Solution #4
You can save time by using the new AsyncEnumerator NuGet Package, which didn’t exist when the topic was first asked four years ago. You can control the degree of parallelism with it:
using System.Collections.Async;
...
await ids.ParallelForEachAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
},
maxDegreeOfParallelism: 10);
Disclaimer: I’m the author of the open source AsyncEnumerator library, which is released under the MIT license, and I’m only sharing this message to assist the community.
Answered by Serge Semenov
Solution #5
Wrap the Parallel in a circle. Turn each lesson into a task. Use [yourasyncmethod] instead of the await keyword in Run(). Result
(You must complete the task.) To avoid blocking the UI thread, run the thing.)
Something like this:
var yourForeachTask = Task.Run(() =>
{
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = repo.GetCustomer(i).Result;
customers.Add(cust);
});
});
await yourForeachTask;
Answered by ofcoursedude
Post is based on https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach