Have a set of Tasks with only X running at a time

Let's say I have 100 tasks that do something that takes 10 seconds. Now I want to only run 10 at a time like when 1 of those 10 finishes another task gets executed till all are finished.

Now I always used ThreadPool.QueueUserWorkItem() for such task but I've read that it is bad practice to do so and that I should use Tasks instead.

My problem is that I nowhere found a good example for my scenario so could you get me started on how to achieve this goal with Tasks?

Answers


SemaphoreSlim maxThread = new SemaphoreSlim(10);

for (int i = 0; i < 115; i++)
{
    maxThread.Wait();
    Task.Factory.StartNew(() =>
        {
            //Your Works
        }
        , TaskCreationOptions.LongRunning)
    .ContinueWith( (task) => maxThread.Release() );
}

TPL Dataflow is great for doing things like this. You can create a 100% async version of Parallel.Invoke pretty easily:

async Task ProcessTenAtOnce<T>(IEnumerable<T> items, Func<T, Task> func)
{
    ExecutionDataflowBlockOptions edfbo = new ExecutionDataflowBlockOptions
    {
         MaxDegreeOfParallelism = 10
    };

    ActionBlock<T> ab = new ActionBlock<T>(func, edfbo);

    foreach (T item in items)
    {
         await ab.SendAsync(item);
    }

    ab.Complete();
    await ab.Completion;
}

You have several options. You can use Parallel.Invoke for starters:

public void DoWork(IEnumerable<Action> actions)
{
    Parallel.Invoke(new ParallelOptions() { MaxDegreeOfParallelism = 10 }
        , actions.ToArray());
}

Here is an alternate option that will work much harder to have exactly 10 tasks running (although the number of threads in the thread pool processing those tasks may be different) and that returns a Task indicating when it finishes, rather than blocking until done.

public Task DoWork(IList<Action> actions)
{
    List<Task> tasks = new List<Task>();
    int numWorkers = 10;
    int batchSize = (int)Math.Ceiling(actions.Count / (double)numWorkers);
    foreach (var batch in actions.Batch(actions.Count / 10))
    {
        tasks.Add(Task.Factory.StartNew(() =>
        {
            foreach (var action in batch)
            {
                action();
            }
        }));
    }

    return Task.WhenAll(tasks);
}

If you don't have MoreLinq, for the Batch function, here's my simpler implementation:

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int batchSize)
{
    List<T> buffer = new List<T>(batchSize);

    foreach (T item in source)
    {
        buffer.Add(item);

        if (buffer.Count >= batchSize)
        {
            yield return buffer;
            buffer = new List<T>();
        }
    }
    if (buffer.Count >= 0)
    {
        yield return buffer;
    }
}

I would love to use the simplest solution I can think of which as I think using the TPL:

string[] urls={};
Parallel.ForEach(urls, new ParallelOptions() { MaxDegreeOfParallelism = 2}, url =>
{
   //Download the content or do whatever you want with each URL
});

You can create a method like this:

public static async Task RunLimitedNumberAtATime<T>(int numberOfTasksConcurrent, 
    IEnumerable<T> inputList, Func<T, Task> asyncFunc)
{
    Queue<T> inputQueue = new Queue<T>(inputList);
    List<Task> runningTasks = new List<Task>(numberOfTasksConcurrent);
    for (int i = 0; i < numberOfTasksConcurrent && inputQueue.Count > 0; i++)
        runningTasks.Add(asyncFunc(inputQueue.Dequeue()));

    while (inputQueue.Count > 0)
    {
        Task task = await Task.WhenAny(runningTasks);
        runningTasks.Remove(task);
        runningTasks.Add(asyncFunc(inputQueue.Dequeue()));
    }

    await Task.WhenAll(runningTasks);
}

And then you can call any async method n times with a limit like this:

Task task = RunLimitedNumberAtATime(10,
    Enumerable.Range(1, 100),
    async x =>
    {
        Console.WriteLine($"Starting task {x}");
        await Task.Delay(100);
        Console.WriteLine($"Finishing task {x}");
    });

Or if you want to run long running non async methods, you can do it that way:

Task task = RunLimitedNumberAtATime(10,
    Enumerable.Range(1, 100),
    x => Task.Factory.StartNew(() => {
        Console.WriteLine($"Starting task {x}");
        System.Threading.Thread.Sleep(100);
        Console.WriteLine($"Finishing task {x}");
    }, TaskCreationOptions.LongRunning));

Maybe there is a similar method somewhere in the framework, but I didn't find it yet.


Need Your Help

XCode 5 Crashes on AppStore Validation

iphone ios objective-c xcode

I'm new to iOS Development and may very well have missed something relevant.

CFBundleDisplayName returns 'null'

ios objective-c ios5 xcode5 info-plist

I'm trying to get app name to use as the NavController title, but whatever I do I can't get CFBundleDisplayName value. It returns 'null'.