Saturday, 24 August 2013

A replacement for Parallel.ForEach that executes until a ConcurrentQueue is empty

A replacement for Parallel.ForEach that executes until a ConcurrentQueue
is empty

I've written the following code, using Parallel.ForEach to get some
parallelism for recursive algorithms. I don't think Parallel.ForEach is
quite right for the job though. This code could, itself, be an answer to
this question, but I'm wondering if there's something that's less hacky
(referring to using a while loop and Enumerable.Range to run multiple
calls to Parallel.ForEach) that could replace it. This is a bit of
variation on the Producer/Consumer pattern, in that the Producer and
Consumer are the same thing, and the consuming should terminate when the
producing is done, making the whole thing synchronous (but parallel).
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading.Tasks;
namespace Common {
/// <summary>
/// Processes a recursive algorithm in parallel.
/// It is initialized with a starting set of
/// distinct items. Run is called with the recursive
/// algorithm, which may add more distinct items.
/// These added items will also be processed,
/// recursively, until no more items are added.
/// </summary>
/// <typeparam name="T"></typeparam>
public class DistinctRecursiveAlgorithmProcessor<T> {
private readonly ConcurrentQueue<T> _items = new
ConcurrentQueue<T>();
private readonly ConcurrentSet<T> _alreadyQueuedItems = new
ConcurrentSet<T>();
/// <summary>
/// Add an item to process. If it is identical to a previously
added item,
/// it will not be added. Thus, only distinct items will be
processed.
/// </summary>
/// <param name="item"></param>
/// <returns>False if the item has been previously added</returns>
bool Add(T item) {
if (!_alreadyQueuedItems.TryAdd(item)) {
return false;
}
_items.Enqueue(item);
return true;
}
/// <summary>
/// Runs the recursive algorithm specified, and blocks until it is
complete.
/// </summary>
/// <param name="recursiveAlgorithm"></param>
void Run(Action<T> recursiveAlgorithm) {
while(_items.Count > 0) {
Parallel.ForEach(Enumerable.Range(0, _items.Count),
(dontCare) => {
T item;
if (_items.TryDequeue(out item)) {
recursiveAlgorithm(item);
} else {
throw new ApplicationException("This
implementation does not function as expected");
}
});
}
}
}
}
In case there's interest, ConcurrentSet is implemented here:
http://pastebin.com/8REHRFFL

No comments:

Post a Comment