Celery: dynamic queues at object level

I'm writing a django app to make polls which uses celery to put under control the voting system. Right now, I have two queues, default and polls, the first one with concurrency set to 8 and the second one set to 1.

$ celery multi start -A myproject.celery default polls -Q:default default -Q:polls polls -c:default 8 -c:polls 1

Celery routes:

CELERY_ROUTES = {
    'polls.tasks.option_add_vote': {
        'queue': 'polls',
    },
    'polls.tasks.option_subtract_vote': {
        'queue': 'polls',
    }
}

Task:

@app.task
def option_add_vote(pk):
    """
        Updates given option id and its poll increasing vote number by 1.
    """

    option = Option.objects.get(pk=pk)

    try:
        with transaction.atomic():

            option.vote_quantity += 1
            option.save()
            option.poll.total_votes += 1
            option.poll.save()

    except IntegrityError as exc:
        raise self.retry(exc=exc)

The option_add_vote method (task) updates the poll-object vote-number value adding 1 to the previous value. So, to avoid concurrency problems, I set the poll queue concurrency to 1. This allow the system to handle thousand of vote requests to be completed successfully.

The problem will be, as I can imagine, a bottle-neck when the system grows up.

So, I was thinking about some kind of dynamic queues where all vote requests to any options of a certain poll where routered to a custom queue. I think this will make the system more reliable and fast.

What do you think? How can I make it?

EDIT1:

I got a new idea thanks to Paul and Plahcinski. I'm storing the votes as objects in their own model (a user-options relationship). When someone votes an option it creates an object from this model, allowing me to count how many votes an option has. This free the system from the voting-concurrency problem, so it could be executed in parallel.

I'm thinking about using CELERYBEAT_SCHEDULE to cron a task that updates poll options based on the result of Vote.objects.get(pk=pk).count(). Maybe I could execute it every hour or do partial updates for those options that are getting new votes...

But, how do I give to the clients updated options in real time? As Plahcinski says, I can have a cached value for my options in Redis (or any other mem-cached system?) and use it to temporally store this values, giving to any new request the cached value.

How can I mix this with my standar values in django models? Anyone could give me some code references or hints? Am I in the good way or did I make mistakes?

Answers


What I would do is remove your incrementation for the database and move to redis and use the database model as your cached value. Have a celery beat that updates recently incremented redis keys to your database

http://redis.io/commands/INCR


What about just having a simple model that stores vote -1/+1 integers then a celery task that reconciles those with the FK object for atomic transactions and updates?


Need Your Help

Retrieve selected sub arrays with particular key value pair of a document in mongodb

php mongodb mongodb-query aggregation-framework

I'm trying to retrieve a list of sub arrays of a document which meets a particular condition.

UITextField textColor reverts after resignFirstResponder

ios iphone uitextfield resignfirstresponder

I have a UIButton that controls the textColor of a UITextField, and I have found that when I call resignFirstResponder on the UITextField, any changes to the textColor made while the text field was...