How to make a celery task call asynchronous tasks?

I have an django aplication that needs to run an optimization algorithm. This algorithm is composed by two parts. The first part is an evolutionary algorithm and this algorithm calls a certain number of tasks of the second part which is an simulated annealing algorithm. The problem is that celery dont allows a task calls an asynchronous task. I have tried this code below:

            sa_list = []
            for cromossomo in self._populacao:
                sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

            job = group(sa_list)

            result = job.apply_async()
            resultados = result.get()

This code is part of the evolutionary algorithm which is an celery task. When i tried to run it the celery shows this message:

[2015-12-02 16:20:15,970: WARNING/Worker-1] /home/arthur/django-user/local/lib/python2.7/site-packages/celery/result.py:45: RuntimeWarning: Never call result.get() within a task! See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

In Celery 3.2 this will result in an exception being raised instead of just being a warning.

despite being just a warning the celery seems to be full of tasks and locks.

I searched for a lot of solutions but none of them worked.

Answers


one way to deal with this is to have a 2 stage pipeline:

def first_task():
    sa_list = []
    for cromossomo in self._populacao:
        sa_list.append(simulated_annealing_t.s(cromossomo.to_JSON(), self._NR, self._T, get_mutacao_str(self._mutacao_SA), self._argumentos))

    job = group(sa_list)

    result = job.apply_async()
    result.save()
    return result.id

then call it like this:

from path.to.tasks import app, first_task

result_1 = first_task.apply_async()
result_2_id = result_1.get()
result_2 = app.GroupResult.restore(result_2_id)
resultados = result_2.get()

there are other ways to do this that involve more work - you could use a chord to gather the results of the group.


The problem is not that celery doesn't allow the execution of async tasks in your example, but that you'll run into a deadlock, hence the warning:

Let's assume you have a task A that spawns a number of subtasks B through apply_async(). Every one of those tasks is executed by a worker. The problem is that if the number of tasks B is larger than the amount of available workers, task A is still waiting for their results (in your example, at least - it's not by default). When task A is still running, the workers that have executed a task B will not execute another one, they are blocked until task A is finished. (I don't know exactly why, but I had this problem just a few weeks ago.)

This means that celery can't execute anything until you manually shut down the workers.

Solutions

This depends entirely what you will do with your task results. If you need them to execute the next subtask, you can chain them through Linking with callbacks or by hardcoding it into the respective tasks (so that you call the first, that calls the second, and so on).

If you only need to see if they are executed and are successful or not, you can use flower to monitor your tasks.

If you need to process the output of all the subtasks further, I recommend writing the results to an xml file: Have task A call all tasks B, and once they are done you execute task C that processes the results. Maybe there are more elegant solutions, but this avoids the deadlock for sure.


Need Your Help