Coder Social home page Coder Social logo

django-q's Introduction

Q logo

A multiprocessing distributed task queue for Django

image0 image1 Documentation Status image2

Features

  • Multiprocessing worker pool
  • Asynchronous tasks
  • Scheduled, cron and repeated tasks
  • Signed and compressed packages
  • Failure and success database or cache
  • Result hooks, groups and chains
  • Django Admin integration
  • PaaS compatible with multiple instances
  • Multi cluster monitor
  • Redis, Disque, IronMQ, SQS, MongoDB or ORM
  • Rollbar and Sentry support

Requirements

Tested with: Python 3.7, 3.8, 3.9 Django 2.2.X and 3.2.X

Warning

Since Python 3.7 async became a reserved keyword and was refactored to async_task

Brokers

Installation

  • Install the latest version with pip:

    $ pip install django-q
  • Add django_q to your INSTALLED_APPS in your projects `settings.py`:

    INSTALLED_APPS = (
        # other apps
        'django_q',
    )
  • Run Django migrations to create the database tables:

    $ python manage.py migrate
  • Choose a message broker , configure and install the appropriate client library.

Read the full documentation at https://django-q.readthedocs.org

Configuration

All configuration settings are optional. e.g:

# settings.py example
Q_CLUSTER = {
    'name': 'myproject',
    'workers': 8,
    'recycle': 500,
    'timeout': 60,
    'compress': True,
    'cpu_affinity': 1,
    'save_limit': 250,
    'queue_limit': 500,
    'label': 'Django Q',
    'redis': {
        'host': '127.0.0.1',
        'port': 6379,
        'db': 0, }
}

For full configuration options, see the configuration documentation.

Management Commands

Start a cluster with:

$ python manage.py qcluster

Monitor your clusters with:

$ python manage.py qmonitor

Monitor your clusters' memory usage with:

$ python manage.py qmemory

Check overall statistics with:

$ python manage.py qinfo

Creating Tasks

Use async_task from your code to quickly offload tasks:

from django_q.tasks import async_task, result

# create the task
async_task('math.copysign', 2, -2)

# or with a reference
import math.copysign

task_id = async_task(copysign, 2, -2)

# get the result
task_result = result(task_id)

# result returns None if the task has not been executed yet
# you can wait for it
task_result = result(task_id, 200)

# but in most cases you will want to use a hook:

async_task('math.modf', 2.5, hook='hooks.print_result')

# hooks.py
def print_result(task):
    print(task.result)

For more info see Tasks

Schedule

Schedules are regular Django models. You can manage them through the Admin page or directly from your code:

# Use the schedule function
from django_q.tasks import schedule

schedule('math.copysign',
         2, -2,
         hook='hooks.print_result',
         schedule_type=Schedule.DAILY)

# Or create the object directly
from django_q.models import Schedule

Schedule.objects.create(func='math.copysign',
                        hook='hooks.print_result',
                        args='2,-2',
                        schedule_type=Schedule.DAILY
                        )

# Run a task every 5 minutes, starting at 6 today
# for 2 hours
import arrow

schedule('math.hypot',
         3, 4,
         schedule_type=Schedule.MINUTES,
         minutes=5,
         repeats=24,
         next_run=arrow.utcnow().replace(hour=18, minute=0))

# Use a cron expression
schedule('math.hypot',
         3, 4,
         schedule_type=Schedule.CRON,
         cron = '0 22 * * 1-5')

For more info check the Schedules documentation.

Testing

To run the tests you will need the following in addition to install requirements:

Or you can use the included Docker Compose file.

The following commands can be used to run the tests:

# Create virtual environment
python -m venv venv

# Install requirements
venv/bin/pip install -r requirements.txt

# Install test dependencies
venv/bin/pip install pytest pytest-django

# Install django-q
venv/bin/python setup.py develop

# Run required services (you need to have docker-compose installed)
docker-compose -f test-services-docker-compose.yaml up -d

# Run tests
venv/bin/pytest

# Stop the services required by tests (when you no longer plan to run tests)
docker-compose -f test-services-docker-compose.yaml down

Locale

Currently available in English, German and French. Translation pull requests are always welcome.

Todo

  • Better tests and coverage
  • Less dependencies?

Acknowledgements

django-q's People

Contributors

achidlow avatar alx-sdv avatar amo13 avatar benjaoming avatar bulv1ne avatar danielwelch avatar dependabot[bot] avatar eagllus avatar fallenhitokiri avatar icfly2 avatar janneronkko avatar jmcvetta avatar kennyhei avatar koed00 avatar maerteijn avatar maximiliankindshofer avatar nickpolet avatar nurettin avatar p-eb avatar pysean3 avatar svdgraaf avatar telmobarros avatar timomeara avatar tylerharper avatar urth avatar valentinogagliardi avatar vkaracic avatar wgordon17 avatar yannpom avatar zws2014 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

django-q's Issues

Scheduled task name behaviour inconsistent

I've been using django-q for a while now and it is working great. Celery was overkill for me and I really like how django-q can use the orm as a broker.

When creating scheduled tasks sometimes I create them directly using Schedule.objects.create() and sometimes by using the schedule() helper function. I have found it useful to create these tasks with the same name so I can find them by group in the successful/failed model in the django admin. The problem is that sometimes there can be multiple schedules with the same name at the same time.

  • When creating the schedule objects directly the name can be the same, but when using the helper function an exception is raised due to a unique check in that function.
  • The name is not required according to the docs or the model, but it is a required field in the admin. This makes it hard to edit nameless jobs.

I probably don't understand the reasons for this behaviour but why not go one way or the other?

  • Make the name field required/unique, and add a group field to the Schedule model so the group can be specified instead of using the name or the pk.

-or-

  • Allow nulls in the name field, remove the duplicate check in the helper function, and remove the required check in the django admin.

Either of these would keep the instance, helper function, and admin behaviour consistent. As it stands right now I had to change my code to create the Schedule objects directly so I don't get an IntegrityError and can still filter my successful/failed tasks by group.

Thanks.

Documentation for mocking in case of testing

Greetings.
I've found this library quite interesting, but to start using it (as I follow TDD) it would be really convenient to know how to make asynchronous execution synchronous in case of testing (like CELERY_ALWAYS_EAGER). A friend of mine dropped rq as it was really hard to write tests for tasks code there.

No module named builtins

Running the ./manage.py qcluster command with python 2.7 gives you an error:

from builtins import range
ImportError: No module named builtins

Looks like the package 'future' is not installed when using pip to install django_q. This issue can be fixed with:

pip install future

Adding 'future' to the install_requires in setup.py should hopefully fix this little issue.

I have created a pull request at #5 to fix this.

Catch_up States

I would like to see the option to catch up from missed tasks to 3 states instead of 2. The states would be:

None: Same effect as current boolean False
Once: Instead of all missed occurrences, this just executes the missed task(s) one time.
All: Same effect as current boolean True

Could uWSGI be a broker?

I've been watching this project and am very interested. Would it be possible to have uWSGI as a broker for django-q?

If so, are there any plans to add support?

Why not Celery?

It would be nice if the docs contained a brief comparison with similar libs or motives behind django-q. Right now, celery is the go to system for most django developers out there. It would be nice to know what django-q intents to do differently.

Cheers.

Worker recycle causes: "ERROR connection already closed"

Hello, and thanks for this great Django app! I'm using Python 3.4, Django 1.9.2, the latest django-q and the ORM broker backed by PostgreSQL.

It appears that when my worker recycles, it loses the ability to talk to the database. Everything works fine up until that point. I've verified it is directly related to the recycle configuration parameter, and changing this changes the placement of the error accordingly.

The output below shows the issue I'm having while running 1 single worker.

14:43:16 [Q] INFO Processed [zulu-montana-triple-michigan]
14:43:16 [Q] INFO recycled worker Process-1:1
14:43:16 [Q] INFO Process-1:4 ready for work at 22360
14:43:16 [Q] INFO Process-1:4 processing [maryland-king-queen-table]
14:43:17 [Q] INFO Process-1:4 processing [summer-mirror-mountain-september]
14:43:17 [Q] INFO Processed [maryland-king-queen-table]
14:43:17 [Q] INFO Process-1:4 processing [illinois-finch-orange-sodium]
14:43:17 [Q] ERROR server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.

Can you provide any guidance?

Thanks

Cluster dies when started with Postgres

Using Python 2.7.9, Django 1.8.4, Postgres 9.4.4, django-q 0.7.5 with the 'orm' broker.

  18:17:21 [Q] INFO Q Cluster-9399 starting.
  18:17:21 [Q] INFO Process-1:1 ready for work at 9404
  18:17:21 [Q] INFO Process-1:2 ready for work at 9405
  18:17:21 [Q] INFO Process-1:3 ready for work at 9406
  18:17:21 [Q] INFO Process-1:4 ready for work at 9407
  18:17:21 [Q] INFO Process-1 guarding cluster at 9402
  18:17:21 [Q] INFO Process-1:5 monitoring at 9408
  18:17:21 [Q] INFO Process-1:6 pushing tasks at 9409
  18:17:21 [Q] INFO Q Cluster-9399 running.
  18:17:21 [Q] ERROR SSL error: decryption failed or bad record mac

Disabling SSL in the database options gets

  18:24:07 [Q] INFO Q Cluster-9827 running.
  18:24:07 [Q] ERROR server closed the connection unexpectedly
      This probably means the server terminated abnormally
      before or while processing the request.

Django_q is part of INSTALLED_APPS, the DB migrated without a problem, I don't see any output from the devserver - I'm not sure where to go from here.

ERROR invalid syntax (<unknown>, line 1) - while passing objects as arguments

I'm not sure, its a SO question or bug. My goal is to schedule a bulk mail task that sends with few attachments (will be included from a django-admin form page). Instead sending all at once, I also want to schedule them at random time.

The provided example from Docs is perfect for sending a single email without attachments. But when I try to implement this with django's EmailMessage with below code, I'm receiving ERROR invalid syntax (<unknown>, line 1) - error from ./manage.py qcluster.

def send_email(email):
    pass


class VendorEmailView(View):

    @method_decorator(staff_member_required)
    def get(self, request):
        form = EmailForm({
            'subject': 'Test Subject',
            'message': 'Test Message',
        })
        return render(request, 'admin/project/vendor/email.html', {'form': form})

    @method_decorator(staff_member_required)
    def post(self, request):
        form = EmailForm(request.POST, request.FILES)
        if form.is_valid():
            subject = form.cleaned_data['subject']
            message = form.cleaned_data['message'].replace(
                '/media/', 'http://{}/media/'.format(settings.ALLOWED_HOSTS[0]))

            email = EmailMessage(subject=subject, body=message, from_email='[email protected]')
            email.content_subtype = 'html'

            for attachment in request.FILES.getlist('attachment'):
                email.attach(attachment.name, attachment.read(), attachment.content_type)

            vendors = Vendor.objects.filter(primary_email__contains='@')
            for vendor in vendors:
                email.to = [vendor.primary_email]
                schedule('project.views.send_email',
                         email, #============> ERROR LINE <==================
                         schedule_type=Schedule.ONCE,
                         next_run=timezone.now() + timedelta(seconds=random.randint(1,1000))
                         )

            return redirect('/admin/project/vendor/', permanent=True)

The error raises whenever I try to pass an object (email1 orrequest, etc) to mysend_emailfunction. It works fine when I'm passing variables likesubject,message`.

Would appreciate your help.

fetch() only works for the first task, returning None for subsequent tasks IDs

First of all, a big thank you for this awesome project. I have had lots of trouble working with Celery before and am very glad that I stumbled over your project a couple of months ago.
Django Q seems clean - code and feature wise, very promising!

Unfortunately, I am currently stuck on fetching offloaded tasks. After fetching the first task I am not able to retrieve any further tasks after that.

I have setup a test project including a unit test that reproduces this issue.

The tests include combinations of cache and spawning external processes as I first suspected the issue in that area, but to no avail.

Could you please have a look at this?

Thanks in advance!

ValueError('need more than 1 value to unpack',)

I am testing django-q as production msg que and schedule library. I made simple function:

@login_required
def makea_math(request):
    resp = {}
    resp['retrr'] = result(async('math_task', '123131.12312'), 100)
return HttpResponse(json.dumps(resp), content_type="application/json")
...
def math_task(number):
    return math.ceil(number)

but each time I get the error from broker:
ValueError('need more than 1 value to unpack'

Is there any way to debug or check broker logs and check what went wrong ?

I am running Django : 1.9.6
Django REST framework: 3.3.2

Typo in parameter passed to broker in get_broker

In the get_broker function in brokers.__init__.py, when returning a broker for a custom broker class (Conf.BROKER_CLASS), the parameter passed to broker is currently:

if Conf.BROKER_CLASS:
    module, func = Conf.BROKER_CLASS.rsplit('.', 1)
    m = importlib.import_module(module)
    broker = getattr(m, func)
    return broker(list_key=list)

The last line above (line 166 in brokers.__init__.py) should use list_key=list_key instead of list_key=list as parameter to the broker function call:

if Conf.BROKER_CLASS:
    module, func = Conf.BROKER_CLASS.rsplit('.', 1)
    m = importlib.import_module(module)
    broker = getattr(m, func)
    return broker(list_key=list_key)

The typo as it is now causes custom broker classes to fail when trying to connect because the list_key (with the value as a list class) causes a bad connection URL.

Getting 'can't pickle lock objects' using async()

I am trying to send emails using django-q, but am getting a "TypeError: can't pickle lock objects" error trying the below. It works without the async. Sorry if I'm stupid, but I can't get this to work :/

Method for sending single email

def send_single_correspondence(subject, text_content, html_content, from_email, recipient, connection:
    msg = EmailMultiAlternativesWithEncoding(subject, unicode(text_content), from_email, [recipient], connection=connection)
    msg.attach_alternative(html_content, "text/html")
    msg.send()

and the async call

def send_board_correspondence(correspondence_instance):
    ...
    for recipient in recipients:
        async(send_single_correspondence, subject, text_content, html_content, from_email, recipient[0], connection)

Probably not important, this is just a class that's overridden to handle attachents better:

class EmailMultiAlternativesWithEncoding(EmailMultiAlternatives):
    def _create_attachment(self, filename, content, mimetype=None):
        """
        Converts the filename, content, mimetype triple into a MIME attachment
        object. Use self.encoding when handling text attachments.
        """
        if mimetype is None:
            mimetype, _ = mimetypes.guess_type(filename)
            if mimetype is None:
                mimetype = DEFAULT_ATTACHMENT_MIME_TYPE
        basetype, subtype = mimetype.split('/', 1)
        if basetype == 'text':
            encoding = self.encoding or settings.DEFAULT_CHARSET
            attachment = SafeMIMEText(smart_str(content,
                settings.DEFAULT_CHARSET), subtype, encoding)
        else:
            # Encode non-text attachments with base64.
            attachment = MIMEBase(basetype, subtype)
            attachment.set_payload(content)
            encoders.encode_base64(attachment)
        if filename:
            try:
                filename = filename.encode('ascii')
            except UnicodeEncodeError:
                filename = Header(filename, 'utf-8').encode()
            attachment.add_header('Content-Disposition', 'attachment',
                                   filename=filename)
        return attachment

The variables sent to send_single_correspondence are:
subject: u'Async sending 12:23'
text_content: 'Message body'
html_content: u'<p>Message body</p>'
from_email: u'[email protected]'
recipient: u'[email protected]'
connection: <django.core.mail.backends.smtp.EmailBackend object at 0xb49aa0c>

Traceback:

Traceback (most recent call last):
  File "/home/martin/.virtualenvs/fysiografen/local/lib/python2.7/site-packages/django/core/handlers/base.py", line 111, in get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/media/sf_Projects/fysiografen/fysiografen/utils.py", line 194, in wrapper
    result = f(*args, **kwds)
  File "/media/sf_Projects/fysiografen/board/views.py", line 62, in correspondence_send
    send_board_correspondence(instance)
  File "/media/sf_Projects/fysiografen/fysiografen/mail_utils.py", line 135, in send_board_correspondence
    async(send_single_correspondence, subject, text_content, html_content, from_email, recipient[0], connection, correspondence_instance.attachments.all())
  File "/home/martin/.virtualenvs/fysiografen/local/lib/python2.7/site-packages/django_q/tasks.py", line 41, in async
    pack = signing.SignedPackage.dumps(task)
  File "/home/martin/.virtualenvs/fysiografen/local/lib/python2.7/site-packages/django_q/signing.py", line 24, in dumps
    serializer=PickleSerializer)
  File "/home/martin/.virtualenvs/fysiografen/local/lib/python2.7/site-packages/django/core/signing.py", line 111, in dumps
    data = serializer().dumps(obj)
  File "/home/martin/.virtualenvs/fysiografen/local/lib/python2.7/site-packages/django_q/signing.py", line 40, in dumps
    return pickle.dumps(obj)
  File "/home/martin/.virtualenvs/fysiografen/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle lock objects

And here's the faulting line:

/home/martin/.virtualenvs/fysiografen/lib/python2.7/copy_reg.py in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.name

Local vars
self
<thread.lock object at 0xadb9320>

base
<type 'thread.lock'>

proto
0

Perhaps I'm going about this the wrong way? It seems that I'm missing some information on the restrictions for django-q...?

Thanks for looking into this!

Successful tasks are not being saved to the database when 'save_limit' config setting is 0

The django-q docs says:

save_limit
Limits the amount of successful tasks saved to Django.

- Set to 0 for unlimited.
- Set to -1 for no success storage at all.
- Defaults to 250
- Failures are always saved.

I have save_limit set to 0 in my config:

    Q_CLUSTER = {
    ...
    'save_limit': 0,
    ....
}

but none of the successful tasks are being saved to the db.

Setting save_limit to a positive value works, though.

I'm thinking line 402 from the save_task function in cluster.py may be the culprit:

# SAVE LIMIT < 0 : Don't save success
if not task.get('save', Conf.SAVE_LIMIT > 0) and task['success']:
    return

and Conf.SAVE_LIMIT > 0 should be Conf.SAVE_LIMIT >= 0:

# SAVE LIMIT < 0 : Don't save success
if not task.get('save', Conf.SAVE_LIMIT >= 0) and task['success']:
    return

Or am I overlooking something in my config?

Race condition when async() within transaction

This has been explained in Celery docs - http://docs.celeryproject.org/en/latest/userguide/tasks.html#database-transactions

And we bumped into this issue after rolling out django-q into production recently. Let say we have a function like this:-

@transaction.atomic
def send_msg(recipient, text, msgid=None, queue=False):
    msg = Message.objects.get_or_create(msgid=msgid)
    if queue:
        async('send_msg', recipient, text, msg.msgid, queue=False)
        msg.status = 'QUEUED'
        msg.save()
        return msg

    # pass msg to backend gateway
    send_to_gateway(recipient, msg, ....)

Once we had the qcluster running, we start seeing "Duplicate entry error" in the log. Look like when we call async(), the cluster picked it up before the transaction get committed, so get_or_create() there will create new object with the same msgid instead of getting an existing one. So we fix that using transaction.on_commit() in django 1.9.

@transaction.atomic
def send_msg(recipient, text, msgid=None, queue=False):
    msg = Message.objects.get_or_create(msgid=msgid)
    if queue:
        def _add_to_queue():
            async('send_msg', recipient, text, msg.msgid, queue=False)

        msg.status = 'QUEUED'
        msg.save()
        transaction.on_commit(_add_to_queue)
        return msg

    # pass msg to backend gateway
    send_to_gateway(recipient, msg, ....)

Maybe we should have this somewhere in the docs, like in Celery docs.

OrmQ broker MySQL connection errors on ORM.delete(task_id)

Just started seeing this now that my qcluster has been running for a few days. Running Django 1.8.7, django-q 0.7.11, pylibmc 1.5.0

09:11:51 [Q] ERROR reincarnated monitor Process-1:11 after sudden death
09:11:51 [Q] INFO Process-1:12 monitoring at 30667
09:13:51 [Q] INFO Process-1:2 processing [zulu-july-low-gee]
Process Process-1:12:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django_q/cluster.py", line 326, in monitor
    broker.acknowledge(ack_id)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django_q/brokers/orm.py", line 64, in acknowledge
    return self.delete(task_id)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django_q/brokers/orm.py", line 61, in delete
    self.connection.filter(pk=task_id).delete()
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/models/query.py", line 537, in delete
    collector.delete()
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/models/deletion.py", line 282, in delete
    with transaction.atomic(using=self.using, savepoint=False):
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/transaction.py", line 186, in __enter__
    connection.set_autocommit(False)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/backends/base/base.py", line 295, in set_autocommit
    self._set_autocommit(autocommit)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/backends/mysql/base.py", line 301, in _set_autocommit
    self.connection.autocommit(autocommit)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/utils.py", line 98, in __exit__
    six.reraise(dj_exc_type, dj_exc_value, traceback)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/db/backends/mysql/base.py", line 301, in _set_autocommit
    self.connection.autocommit(autocommit)
  File "/opt/python/run/venv/local/lib64/python2.7/site-packages/MySQLdb/connections.py", line 243, in autocommit
    _mysql.connection.autocommit(self, on)
OperationalError: (2006, 'MySQL server has gone away')
09:13:52 [Q] ERROR reincarnated monitor Process-1:12 after sudden death
09:13:52 [Q] INFO Process-1:13 monitoring at 30692

The weird thing is that the broker is able to dequeue tasks, save the lock time, and pass them off to workers (the tasks themselves require DB access and do execute successfully). It's only when ORM.delete() is called that this error is triggered. So I don't think it's actually a problem accessing the DB, despite what the error says. I'd guess it has more to do with a conflicting transaction or autocommit state on the connection that results in dropping the connection. Just not sure why this behavior seems to only appear on older qclusters that have, presumably, passed some timeout threshold.

And because ORM.delete() failed, when the broker re-spawns it re-executes the completed task. Then once again dies when it tries to delete the task. I end up in an endless execute-die-respawn loop until I manually delete the task.

If it is related to stale DB connections, see: https://code.djangoproject.com/ticket/21597#comment:29

So as an experiment I'm now explicitly closing the connection in the broker before trying to use it and refactored access to the connection object:

from django.db import connection

    def get_connection(list_key=Conf.PREFIX):
        connection.close()
        return OrmQ.objects.using(Conf.ORM)

    def delete(self, task_id):
        self.get_connection().filter(pk=task_id).delete()

Time will tell if this helps. My last pull request ended up being totally misguided so I'm holding off on this one for now!

Any thoughts?

Can not pass async() non-core functions

I have views.py and tasks.py in my project folder.

I have the following in views.py:

# views.py
def TestAsync(request):
    from django_q.tasks import async
    import tasks

    async('tasks.hi', 'Hooray!')

    return HttpResponseRedirect('blah')

and tasks.py is:

# tasks.py
def hi(string_to_print):
    print string_to_print

When I hit the url I have wired to call my TestAsync view, I get the following error:

'module' object has no attribute 'hi'

I am able, however, to get the email examples from the django-q docs working, among other examples that use functions from django core/python built-ins. I've tried passing the function to async() with and without quotes, tried putting the 'hi' function right in the view, and every importing scheme I can think of...I still get the same error.

I have a feeling that this might be an error caused my my lack of understanding on what constitutes an acceptable function to pass to async() - however on the off chance it's a bug I wanted to drop a note here.

If it's my fault, perhaps it will help others to have a lower-level explanation of what async() expects from it's functions?

app file structure

I've already worked with django, sorry if this is a silly question.

This is the usual file structure:

project:
mydata_app
manage.py

I should add the tasks to a file called tasks.py?
It should be inside mydata_app or in the projects directory?

the call for the tasks can be placed in the views.py?

SystemError: Parent module '' not loaded, cannot perform relative import

I have an import problem with Django-Q 0.7.15 and Django 1.8.6 using Python 3.5

Django-Q is installed and able to import:

$ ipython
Python 3.5.0 (default, Sep 15 2015, 13:36:21) 
Type "copyright", "credits" or "license" for more information.

IPython 4.0.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.

In [1]: import django_q

In [2]: django_q
Out[2]: <module 'django_q' from '.../python3.5/site-packages/django_q/__init__.py'>

But as soon as I add django_q to my INSTALLED_APPS Django can't get past the django.setup() call:

$ ./manage.py shell
Traceback (most recent call last):
  File "./manage.py", line 11, in <module>
    execute_from_command_line(sys.argv)
  File ".../python3.5/site-packages/django/core/management/__init__.py", line 354, in execute_from_command_line
    utility.execute()
  File ".../python3.5/site-packages/django/core/management/__init__.py", line 328, in execute
    django.setup()
  File ".../python3.5/site-packages/django/__init__.py", line 18, in setup
    apps.populate(settings.INSTALLED_APPS)
  File ".../python3.5/site-packages/django/apps/registry.py", line 85, in populate
    app_config = AppConfig.create(entry)
  File ".../python3.5/site-packages/django/apps/config.py", line 86, in create
    module = import_module(entry)
  File ".../python3.5/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 986, in _gcd_import
  File "<frozen importlib._bootstrap>", line 969, in _find_and_load
  File "<frozen importlib._bootstrap>", line 944, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 986, in _gcd_import
  File "<frozen importlib._bootstrap>", line 969, in _find_and_load
  File "<frozen importlib._bootstrap>", line 958, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 673, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 662, in exec_module
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File ".../python3.5/site-packages/django_q/apps.py", line 3, in <module>
    from .conf import Conf
SystemError: Parent module '' not loaded, cannot perform relative import

No doubt I'm holding it wrong. :-) Can you advise?

documentation or design issue?

For a scheduled task you give the example (first one).

# Use the schedule wrapper
from django_q.tasks import schedule

schedule('math.copysign',
         2, -2,
         hook='hooks.print_result',
         schedule_type=Schedule.DAILY)

However Schedule.DAILY is not imported here, so this should be

# Use the schedule wrapper
from django_q.tasks import schedule
from django_q.tasks import Schedule

schedule('math.copysign',
         2, -2,
         hook='hooks.print_result',
         schedule_type=Schedule.DAILY)

While your second example

# Or create the object directly
from django_q.models import Schedule

Schedule.objects.create(func='math.copysign',
                        hook='hooks.print_result',
                        args='2,-2',
                        schedule_type=Schedule.DAILY
                        )

Does idd only need the single import. I think it's more recommended to only use the second option

scheduling repeating tasks

I'm having difficulty scheduling a repeating task. I placed this in tasks.py, but it never gets called.

schedule(
    'core.tasks.process_emails',
    schedule_type='I',
    minutes=15
)

If I add the task from the Django admin, it gets called once, but then the repeating number decreases to -2 and it never runs again. What am I doing wrong here?

Apparently it is running again, I'm just impatient. I'd like to schedule it via code, though. Is there something wrong with the way I wrote it?

"InterfaceError: connection already closed" being raised when a test is run

Hello,

First off, congratulations on this awesome app. I am using Python 2.7, Django 1.9 and Postgresql 9.3.

Settings dict for django-q is as follows:

Q_CLUSTER = {
    'name': 'pulsecheck',
    'workers': 2,
    'recycle': 500,
    'compress': True,
    'save_limit': 250,
    'label': 'Task Management',
    'redis': {
        'host': '127.0.0.1',
        'port': 6379,
        'db': 0, }
}

This is what I have in tests.py:

Conf.SYNC = True

...

def test_stuff(self):
    ...
    async('some_stuff', var1, var2)
    ...
    assert ...

When tests are run, this is what I get:

Creating test database for alias 'default'...
16:53:03 [Q] INFO MainProcess ready for work at 30995
16:53:03 [Q] INFO MainProcess processing [stairway-iowa-solar-lactose]
16:53:03 [Q] INFO MainProcess stopped doing work
16:53:03 [Q] INFO MainProcess monitoring at 30995
16:53:03 [Q] ERROR connection already closed
16:53:03 [Q] ERROR Failed [stairway-iowa-solar-lactose] - connection already closed
16:53:03 [Q] INFO MainProcess stopped monitoring results
E
======================================================================
ERROR: test_activation_successful (core.tests.AuthTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/ozgurisil/dev/pulsecheck_env/src/pulsecheck/core/tests.py", line 358, in test_activation_successful
    user = User.objects.get(username='testuser')
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/models/manager.py", line 122, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/models/query.py", line 381, in get
    num = len(clone)
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/models/query.py", line 240, in __len__
    self._fetch_all()
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/models/query.py", line 1074, in _fetch_all
    self._result_cache = list(self.iterator())
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/models/query.py", line 52, in __iter__
    results = compiler.execute_sql()
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 846, in execute_sql
    cursor = self.connection.cursor()
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/backends/base/base.py", line 233, in cursor
    cursor = self.make_cursor(self._cursor())
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/backends/base/base.py", line 206, in _cursor
    return self.create_cursor()
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/utils.py", line 95, in __exit__
    six.reraise(dj_exc_type, dj_exc_value, traceback)
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/backends/base/base.py", line 206, in _cursor
    return self.create_cursor()
  File "/home/ozgurisil/dev/pulsecheck_env/local/lib/python2.7/site-packages/django/db/backends/postgresql/base.py", line 210, in create_cursor
    cursor = self.connection.cursor()
InterfaceError: connection already closed

----------------------------------------------------------------------
Ran 1 test in 1.382s

PS: The error still occurs if async(..., sync=True) format is used.

[Question] State of Task ?

I've been watching and working with this project.
Would it be possible to implement a Task State ?
Currently it doesn't seem that we can clearly say that a task is open/running/done
(except of the qcluster logging)
Or is this already implemented and i miss something ?

Result is -

I run a task successfully and return a string at the end of my task.
However the results in the ORM displays the info below.

Name: fourteen-saturn-red-moon
Func: app.tasks.generate_static_file
Hook: -
Args: ()
Kwargs: {}
Result: -

I would have expected that Result: would be my text

def generate_static_file():
       ....
        if not delta > modified_time:
            return 'modified within the last 30 minutes'

Am I doing something wrong or dus result means something else?

Async argument 'timeout' fails if broker timeout is set to None

I'm using the 'orm' as a broker, but I suspect this will hit other brokers as well.

If the broker's timeout is set to None, as is the default, passing 'timeout' as an argument to async doesn't set the timeout properly. Once the broker has a timeout set, then the timeout can be overridden.

  Q_CLUSTER = { 
      'name': 'DjangoORM',
      'orm': 'default',
     'retry': 600,
  }
def block():
  import time
  while True:
    print("BLOCKING")
    time.sleep(1)
async(block, timeout=3)

The worker will never timeout. If timeout was set to something, then the worker times out in 3 seconds.

From what I can tell, setting timeout to -1 allows you to never timeout by default and to override it with the argument.

Using django-q breaks django-mama-cas

Just having django-q in INSTALLED_APPS makes django-mama-cas incapable of correctly constructing a redirect URL for unclear reasons -- mysteriously, somewhere in the middle of the process, the constructed url becomes bytes, rather than string or unicode, which confuses the standard urlparse.py.

The traceback is highly bizarre and unhelpful:

Traceback:
File "/kursitet/app/platform-env/local/lib/python2.7/site-packages/django/core/handlers/base.py" in get_response
  230.             response = self.apply_response_fixes(request, response)
File "/kursitet/app/platform-env/local/lib/python2.7/site-packages/django/core/handlers/base.py" in apply_response_fixes
  277.             response = func(request, response)
File "/kursitet/app/platform-env/local/lib/python2.7/site-packages/django/http/utils.py" in fix_location_header
  21.         response['Location'] = request.build_absolute_uri(response['Location'])
File "/kursitet/app/platform-env/local/lib/python2.7/site-packages/django/http/request.py" in build_absolute_uri
  148.         bits = urlsplit(location)
File "/usr/lib/python2.7/urlparse.py" in urlsplit
  192.             if c not in scheme_chars:

Exception Type: TypeError at /cas/login
Exception Value: 'in <string>' requires string as left operand, not int

It is only clear that this happens because the url being parsed is b'https://...' but not exactly how this occurs. Removing django-q from INSTALLED_APPS immediately fixes the issue.

The most likely culprit so far is the future library monkeypatching standard library - including urlparse - when invoked at https://github.com/Koed00/django-q/blob/master/django_q/cluster.py#L10

Q Cluster auto-launch?

I'm trying to simplify local dev and my Production environment. I'd love a way to have each Django instance auto-launch a Q Cluster based on a settings.py param.

This would make it easy to eventually separate pure web front ends from task workers by just tweaking some if/else logic in the settings.py.

It looks like the place to put a run-once-on-launch command would be in myapp/app.py (see: http://stackoverflow.com/a/16111968)

I tried a naive:

q = Cluster()
try:
    q.stop()
except Exception as e:
    pass
q.start()

in app.py but it isn't finding existing instances and is just spawning a new cluster each time. It also isn't able to fully shut down the clusters when runserver exits. Can you think of any better ways to couple a django instance with an auto-launched cluster? Or are there particular reasons why such a coupling is a bad idea?

Getting "MySQL server has gone away" on new tasks after idling

Hello,

I've been trying Django Q for a new project as an alternative to Celery, and so far I love it, but I keep getting a "MySQL server has gone away" error on each new task received by my cluster after it idled for at least 8 hours, which corresponds to the wait_timeout of my MySQL server. Also, to make sure it was related to wait_timeout I tried lowering it to a minute and the same thing keeps happening until I restart the cluster.

Is this behavior intended? Shouldn't Django or Django Q handle this and at least try to re-establish a connection?

I tried changing Django's CONN_MAX_AGE in my settings.py to both 0 and a value higher but lower than MySQL's wait_timeout, but no luck.
So after a bit of googling I found this: https://code.djangoproject.com/ticket/21597#comment:29, they recommend using connection.close() so that Django can connect again, and it works for my task, meaning it can change stuff on one of my models and save it, but the task itself isn't getting saved and doesn't appear under successful tasks.

Is there any workaround other than periodically restarting the cluster or increasing wait_timeout to insane values?

My Django Q settings are vanilla, using redis as broker.

08:56:23 [Q] INFO Q Cluster-26132 starting.
08:56:23 [Q] INFO Process-1:3 ready for work at 26139
08:56:23 [Q] INFO Process-1:5 monitoring at 26141
08:56:23 [Q] INFO Process-1 guarding cluster at 26138
08:56:23 [Q] INFO Process-1:6 pushing tasks at 26142
08:56:23 [Q] INFO Process-1:4 ready for work at 26140
08:56:23 [Q] INFO Q Cluster-26132 running.

08:57:46 [Q] INFO Process-1:3 processing [ink-bakerloo-michigan-finch]
08:57:47 [Q] INFO Processed [ink-bakerloo-michigan-finch]

08:59:19 [Q] INFO Process-1:4 processing [rugby-five-skylark-lake]
08:59:19 [Q] ERROR (2006, 'MySQL server has gone away')
08:59:19 [Q] INFO Processed [rugby-five-skylark-lake]

09:01:24 [Q] INFO Process-1:3 processing [uniform-one-hydrogen-december]
09:01:25 [Q] ERROR (2006, 'MySQL server has gone away')
09:01:25 [Q] INFO Processed [uniform-one-hydrogen-december]

Iron-mq v3 not supported

Builds have been failing since Iron-mq introduced V3 and the accompanying python wrapper.
For now Iron-mq tests have been disabled until I figure out what's causing it.

Circus stop only stops first process

I have a project running with circus.
When I tell circusctl stop django-q it returns ok but during ps aux | grep qcluster I notice 1 process stopped (the main process) but the children are still running.

28686  0.0  0.3 142600 36740 ?        Sl   09:29   0:00 /project/virtualenv/bin/python ./manage.py qcluster
28687  0.0  0.3 141848 34048 ?        S    09:29   0:00 /project/virtualenv/bin/python ./manage.py qcluster
28688  0.0  0.3 141852 34056 ?        S    09:29   0:00 /project/virtualenv/bin/python ./manage.py qcluster
28689  0.0  0.3 141856 34060 ?        S    09:29   0:00 /project/virtualenv/bin/python ./manage.py qcluster
28690  0.0  0.3 141860 34052 ?        S    09:29   0:00 /project/virtualenv/bin/python ./manage.py qcluster
28691  0.0  0.3 141864 34064 ?        S    09:29   0:00 /project/virtualenv/bin/python ./manage.py qcluster
28692  0.0  0.3 141868 34068 ?        S    09:29   0:00 /project/virtualenv/bin/python ./manage.py qcluster
28693  0.0  0.3 141872 34072 ?        S    09:29   0:00 /project/virtualenv/bin/python ./manage.py qcluster
28694  0.0  0.3 141872 35516 ?        S    09:29   0:00 /project/virtualenv/bin/python ./manage.py qcluster
28695  2.0  0.3 142128 36424 ?        Sl   09:29   0:24 /project/virtualenv/bin/python ./manage.py qcluster

Any idea why this is happening?

Import errors

Hi,

I have added django_q in INSTALLED_APPS. All of Django's management commands are now throwing this error-

ValueError: Attempted relative import in non-package

Stack trace

Traceback (most recent call last):
  File "./manage.py", line 10, in <module>
    execute_from_command_line(sys.argv)
  File "/Users/xxx/.virtualenvs/plweb/lib/python2.7/site-packages/django/core/management/__init__.py", line 354, in execute_from_command_line
    utility.execute()
  File "/Users/xxx/.virtualenvs/plweb/lib/python2.7/site-packages/django/core/management/__init__.py", line 328, in execute
    django.setup()
  File "/Users/xxx/.virtualenvs/plweb/lib/python2.7/site-packages/django/__init__.py", line 18, in setup
    apps.populate(settings.INSTALLED_APPS)
  File "/Users/xxx/.virtualenvs/plweb/lib/python2.7/site-packages/django/apps/registry.py", line 85, in populate
    app_config = AppConfig.create(entry)
  File "/Users/xxx/.virtualenvs/plweb/lib/python2.7/site-packages/django/apps/config.py", line 86, in create
    module = import_module(entry)
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/importlib/__init__.py", line 37, in import_module
    __import__(name)
  File "/Users/xxx/.virtualenvs/plweb/lib/python2.7/site-packages/django_q/apps.py", line 3, in <module>
    from .conf import Conf

Any idea?

manage mqtt IoT

I've just discovered mqtt protocol (http://mqtt.org/) and looking to manage it from django.
Looking at docs it appear that use a subscriber/publisher schema, so a broker like mosquitto is needed.

Creating a Custom Broker with a paho-client in django-q would make a chance to access mosquitto broker as a subscriber?

Just wondering if anyone already has done this, am I on the right way or totally wrong?

Multiple Queues ?

Hello !

I had a look on Django-Q and didn't find anything about multiple queues ?
Does it exist, or is it in the roadmap ?

Nice work by the way.

foo.bar.tasks.my_task fails with "No module named bar.tasks" when running in AWS / Elastic Beanstalk

So now I've got qcluster happily running under supervisord both in local dev and on Elastic Beanstalk (EB). Local testing works great.

I have a simple test task:

def task_test(user):
    logger.debug("Hello, from the task!!")

And I can make an async call on it in local dev:

async('myapp.member.tasks.task_test', request.user)

And it runs fine:

22:15:51 [Q] INFO Process-1:1 processing [colorado-eleven-lima-skylark]
2015-10-28 22:15:51,112 DEBUG    myapp.member.tasks:task_test(9): Hello, from the task!!
22:15:51 [Q] INFO Processed [colorado-eleven-lima-skylark]

But up on Elastic Beanstalk something strange is going on with traversing the app structure:

21:23:04 [Q] INFO Process-1:2 processing [potato-mars-connecticut-ink]
21:23:04 [Q] ERROR Failed [potato-mars-connecticut-ink] - No module named member.tasks

I also tried passing the function directly:

from myapp.member.tasks import task_test
async(task_test, request.user)

But end up with a similar error:

22:09:07 [Q] INFO Process-1:10 pushing tasks at 3695
Process Process-1:10:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django_q/cluster.py", line 300, in pusher
    task = signing.SignedPackage.loads(task[1])
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django_q/signing.py", line 31, in loads
    serializer=PickleSerializer)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django/core/signing.py", line 145, in loads
    return serializer().loads(data)
  File "/opt/python/run/venv/local/lib/python2.7/site-packages/django_q/signing.py", line 44, in loads
    return pickle.loads(data)
ImportError: No module named member.tasks

Same problem if I do a relative import:

from .tasks import task_test
async(task_test, request.user)

The EB supervisord.conf is straightforward:

[program:qcluster]
command=/opt/python/run/venv/bin/python manage.py qcluster
numprocs=1
directory=/opt/python/current/app/myapp
environment=$djangoenv

($djangoenv is injecting the environment variables elsewhere in the deploy script, but didn't make a difference with or without them):

djangoenv=`cat /opt/python/current/env | tr '\n' ',' | sed 's/export //g' | sed 's/$PATH/%(ENV_PATH)s/g' | sed 's/$PYTHONPATH//g' | sed 's/$LD_LIBRARY_PATH//g'`
djangoenv=${djangoenv%?}

I also tried SSHing into EB and doing a manual async call through the manage.py shell but saw the same errors in the qcluster logs.

Nothing crazy about the project structure:

approot
|
+---myapp
|    +---member
|    |    +---__init__.py
|    |    +---tasks.py
|    |    +---urls.py
|    |    +---views.py
|    +---__init__.py
|    +---admin.py
|    +---forms.py
|    +---models.py
|    +---settings.py
|    +---urls.py
|    +---views.py
|    +---wsgi.py
+---manage.py

Running on Python 2.7.9.

This seems most likely to be something with the EB environment, but let me know if you have any ideas. I'm all out at this point!

Schedule repeat

I've just started to look at using Django-q and scheduling tasks. I noticed when testing that the repeat counts down everytime the task runs as per the docs.

However if you have a task that runs every minute it would quickly get to the -32768 limit of a smallint.

Is Django-q not setup for this or is there a better way of doing this?

AppRegistryNotReady Exception on Django 1.10 Dev

When running ./manage.py migrate on Django 1.10.dev20160413172854 throws the following exception. But changing to Django 1.9.5 works fine. Is there any fix to work with dev version?

╰─$ ./manage.py migrate
Traceback (most recent call last):
  File "./manage.py", line 10, in <module>
    execute_from_command_line(sys.argv)
  File "/Users/dacodekid/dev/django/django/core/management/__init__.py", line 367, in execute_from_command_line
    utility.execute()
  File "/Users/dacodekid/dev/django/django/core/management/__init__.py", line 341, in execute
    django.setup()
  File "/Users/dacodekid/dev/django/django/__init__.py", line 27, in setup
    apps.populate(settings.INSTALLED_APPS)
  File "/Users/dacodekid/dev/django/django/apps/registry.py", line 85, in populate
    app_config = AppConfig.create(entry)
  File "/Users/dacodekid/dev/django/django/apps/config.py", line 90, in create
    module = import_module(entry)
  File "/Users/dacodekid/.envs/tradex/lib/python3.5/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 986, in _gcd_import
  File "<frozen importlib._bootstrap>", line 969, in _find_and_load
  File "<frozen importlib._bootstrap>", line 958, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 673, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 662, in exec_module
  File "<frozen importlib._bootstrap>", line 222, in _call_with_frames_removed
  File "/Users/dacodekid/.envs/tradex/lib/python3.5/site-packages/django_q/__init__.py", line 15, in <module>
    from .tasks import async, schedule, result, result_group, fetch, fetch_group, count_group, delete_group, queue_size
  File "/Users/dacodekid/.envs/tradex/lib/python3.5/site-packages/django_q/tasks.py", line 11, in <module>
    import cluster
  File "/Users/dacodekid/.envs/tradex/lib/python3.5/site-packages/django_q/cluster.py", line 31, in <module>
    import tasks
  File "/Users/dacodekid/.envs/tradex/lib/python3.5/site-packages/django_q/tasks.py", line 13, in <module>
    from django_q.models import Schedule, Task
  File "/Users/dacodekid/.envs/tradex/lib/python3.5/site-packages/django_q/models.py", line 12, in <module>
    class Task(models.Model):
  File "/Users/dacodekid/dev/django/django/db/models/base.py", line 94, in __new__
    app_config = apps.get_containing_app_config(module)
  File "/Users/dacodekid/dev/django/django/apps/registry.py", line 239, in get_containing_app_config
    self.check_apps_ready()
  File "/Users/dacodekid/dev/django/django/apps/registry.py", line 124, in check_apps_ready
    raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

Bug? Hourly schedule_type runs in reverse

I'm using Django Q to send emails on an hourly basis, and after the job runs, the "next_run" setting is changed to 2 hours prior to the run just complete, so the next minute the scheduler checks for new jobs if finds that same job, and reruns it, and again schedules it for 2 hours further back in history. Here's a printout taken over several job runs.

run 1. core.tasks.send_mail 2016-01-04 22:57:23.159561
run 2. core.tasks.send_mail 2016-01-04 18:57:23.159561
run 3. core.tasks.send_mail 2016-01-04 10:57:23.159561
run 4. core.tasks.send_mail 2016-01-03 02:57:23.159561
run 5. core.tasks.send_mail 2016-01-02 14:57:23.159561

And I suppose one gets the idea. If I switch the schedule_type to DAILY, then the next_run advances as expected.

Now, change schedule_type = "D":

run 1. core.tasks.send_mail D 2016-01-01 17:57:23.159561
run 2. core.tasks.send_mail D 2016-01-02 12:57:23.159561
run 3. core.tasks.send_mail D 2016-01-03 12:57:23.159561
run 4. core.tasks.send_mail D 2016-01-04 12:57:23.159561
run 5. core.tasks.send_mail D 2016-01-05 12:57:23.159561

And then by run 5 it's fully caught up and stops running every minute. Here's my code, which I'm calling in apps.py.

   class MyAppCore(AppConfig):
       name = 'core'
       verbose_name = 'MyApp Core'

    def ready(self):

        send = 'Cron: send_mail'
        if not Schedule.objects.filter(name=send).exists():
            Schedule.objects.create(
                name=send,
                func='core.tasks.send_mail',
                schedule_type=Schedule.HOURLY,
            )

The task is just a wrapper around the management command:

def send_mail():
    management.call_command('send_mail', **{'cron': 1})

scheduled task do not work as expected

Hello,
I created a task to run each 15 minutes via Django admin form, but it runs each minute.
the setting is like this:

Q_CLUSTER = {
    'name': 'DjangORM',
    'workers': 1,
    'retry': 120,
    'catch_up': False,
    'orm': 'default'
}

"connection already closed" while testing

I'm trying to write tests for an app using django-q, so I thought just use the ORM broker and force tasks to run synchronously:

Q_CLUSTER = {
    'sync': True,
    'orm': 'default'
}

My test calls a function that enqueues a task,

Creating test database for alias 'default'...
10:37:57 [Q] INFO Starting check on expired.identrustssl.com
10:37:58 [Q] INFO MainProcess ready for work at 22161
10:37:58 [Q] INFO MainProcess processing [potato-eight-west-jersey]
10:38:00 [Q] INFO MainProcess stopped doing work
10:38:00 [Q] INFO MainProcess monitoring at 22161
10:38:00 [Q] ERROR connection already closed
10:38:00 [Q] ERROR Failed [potato-eight-west-jersey] - connection already closed

Am I missing something?

Make orm polling interval configurable

Hey, i love this package for it's simplicy. Dont interprete the title as command. It just should be catchy.

I want to discuss to possibility of adding a setting or configuration option for the orm polling interval. Correct me if i'm wrong but this value (https://github.com/Koed00/django-q/blob/v0.7.14/django_q/brokers/orm.py#L64) is the reason for about 5 SELECT queries per second to the django_q_ormq table. I have nothing against this default value, if there's the chance to decrease to pressure to the database. But currently it's not possible to optout this high frequency.

I had some problems as i'm running on pretty low hardware (slower than raspberry, very bad io). I know that's very special but changing this constant works fine for me.

Another possibility would be to inherit from the orm broker, just overwrite the dequeue() with a super-call and an additional sleep(). But there's no possibility to add a custom broker. There should be as this would be the django way, wouldn't?

Knowing that the simplicy cannot preserved by simply adding more and more options I'm pretty open for your thoughts on this and may be able to prepare a PR based on some hints from you.

Workflow need to run qcluster?

I've tried to add a simple task as async:

task_id=async('my_app.tasks.print', self.pk)

it run correctly if I start in a separate shell:

python manage.py qcluster

My question is: for deploy, the qcluster should always be started manually?

Too many workers

Hi again (sorry for bothering you!)

I just pushed django-q to my external server, and saw that with the configuration set to 1 worker, django-q launches five processes. How can I reduce this, since it's a shared server with limited memory?

Thank you!

Allow task custom name

Currently this is not possible:-

task_id = async('send_sms', user_id, sender, recipient, message,
                          msgid, start_sending_time, name=msgid)

This I guess due to this code, which limit what task attributes you can override:-

def async(func, *args, **kwargs):
    """Queue a task for the cluster."""
    keywords = kwargs.copy()
    opt_keys = ('hook', 'group', 'save', 'sync', 'cached', 'iter_count', 'iter_cached', 'chain', 'broker')
    q_options = keywords.pop('q_options', None)
    # get an id
    tag = uuid()
    # build the task package
    task = {'id': tag[1],
            'name': tag[0],
            'func': func,
            'args': args}
    # push optionals
    for key in opt_keys:
        if q_options and key in q_options:
            task[key] = q_options[key]
        elif key in keywords:
            task[key] = keywords.pop(key)

Retry possibility?

I have a task that connects to a iLO

def try_connection(ilo):
    try:
        hpilo = ilo.connect()
        hpilo.get_fw_version()
    except (IloCommunicationError, socket.timeout, socket.error) as exc:
        # raise self.retry(exc=exc)  # used in celery

    ilo.password = None
    ilo.save(update_fields=['password'])

Is there a option that I missed that would make this possible?

`qcluster` command doesn't handle interrupts.

The management command is intercepting interrupts but then not handling them.

Thus I get an error and can't exit...

  File "/Users/carlton/ve/nws/lib/python2.7/site-packages/django_q/cluster.py", line 520, in set_cpu_affinity
    p.cpu_affinity(affinity)
AttributeError: 'Process' object has no attribute 'cpu_affinity'
^C^C^C^C^C^C^C^C^C^C^C

I have to kill the shell and start over. It would be nice if ^C worked here.

Task stays in queue when executing a requests.post

When I try and execute a requests.post the task stays on the queue and never completes the requests.post request.

Pseudo code:

class Auth
  def __init__(self, url, username, password):
      logging.debug('Making call for credentials.')
      r = requests.post(url, data={'username': username, 'password': password})
def queue_get_auth(username, password):
    a = Auth('https://auth', 'username', 'password')

def validate_login(username, password):
    job = async(queue_get_auth, username, password)

Error I am seeing and repeats until I delete it from the database queued tasks:

[DEBUG] | 2015-10-19 14:46:05,510 | auth:  Making call for credentials.
14:46:05 [Q] ERROR reincarnated worker Process-1:3 after death
14:46:05 [Q] INFO Process-1:9 ready for work at 13576
14:46:19 [Q] INFO Process-1:4 processing [mango-two-juliet-edward]

Is there a reason why the requests.post would be causing it to fail? How would I debug this? If I run it in sync: True it works fine. This is running on a Mac OS X 10.11, database sqlite.

Q_CLUSTER = {
    'name': 'auth',
    'workers': 4,
    'recycle': 500,
    'timeout': 60,
    'compress': False,
    'save_limit': 250,
    'queue_limit': 500,
    'sync': False,
    'cpu_affinity': 1,
    'label': 'Django Q',
    'orm': 'default'
}

Windows compatibility

On PyPi the metadata states that this package is OS Independent. After spending half an hour reading the documentation and setting up a test installation I found out that django-q does not support Windows. After adding it to the django project setting, any management command issued fails with:

ImportError: No module named _curses

As it seems, this is due to the dependency on blessed. Please spare others the same frustration and update pypi metadata and/or add a note about operating support to your readme.

If you´d ever wanted to support windows I can recommend https://pypi.python.org/pypi/colorama for crossplatform colored terminal output. I can also recommend http://www.appveyor.com/ if you need travis like free windows ci/testing for the project.

Otherwise this looks like a great project. Thanks for making it open source.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.