Coder Social home page Coder Social logo

wakatime / wakaq Goto Github PK

View Code? Open in Web Editor NEW
565.0 9.0 14.0 206 KB

Background task queue for Python backed by Redis, a super minimal Celery

Home Page: https://pypi.org/project/WakaQ/

License: BSD 3-Clause "New" or "Revised" License

Makefile 0.77% Python 99.23%
task-queue celery queue redis worker async distributed rq background-jobs job-scheduler

wakaq's Introduction

logo WakaQ

wakatime

Background task queue for Python backed by Redis, a super minimal Celery. Read about the motivation behind this project on this blog post and the accompanying Hacker News discussion. WakaQ is currently used in production at WakaTime.com. WakaQ is also available in TypeScript.

Features

  • Queue priority
  • Delayed tasks (run tasks after a timedelta eta)
  • Scheduled periodic tasks
  • Broadcast a task to all workers
  • Task soft and hard timeout limits
  • Optionally retry tasks on soft timeout
  • Combat memory leaks with max_mem_percent or max_tasks_per_worker
  • Super minimal

Want more features like rate limiting, task deduplication, etc? Too bad, feature PRs are not accepted. Maximal features belong in your app’s worker tasks.

Installing

pip install wakaq

Using

import logging
from datetime import timedelta
from wakaq import WakaQ, Queue, CronTask


# use constants to prevent misspelling queue names
Q_HIGH = 'a-high-priority-queue'
Q_MED = 'a-medium-priority-queue'
Q_LOW = 'a-low-priority-queue'
Q_OTHER = 'another-queue'
Q_DEFAULT = 'default-lowest-priority-queue'


wakaq = WakaQ(

    # List your queues and their priorities.
    # Queues can be defined as Queue instances, tuples, or just a str.
    queues=[
        (0, Q_HIGH),
        (1, Q_MED),
        (2, Q_LOW),
        Queue(Q_OTHER, priority=3, max_retries=5, soft_timeout=300, hard_timeout=360),
        Q_DEFAULT,
    ],

    # Number of worker processes. Must be an int or str which evaluates to an
    # int. The variable "cores" is replaced with the number of processors on
    # the current machine.
    concurrency="cores*4",

    # Raise SoftTimeout in a task if it runs longer than 30 seconds. Can also be set per
    # task or queue. If no soft timeout set, tasks can run forever.
    soft_timeout=30,  # seconds

    # SIGKILL a task if it runs longer than 1 minute. Can be set per task or queue.
    hard_timeout=timedelta(minutes=1),

    # If the task soft timeouts, retry up to 3 times. Max retries comes first
    # from the task decorator if set, next from the Queue's max_retries,
    # lastly from the option below. If No max_retries is found, the task
    # is not retried on a soft timeout.
    max_retries=3,

    # Combat memory leaks by reloading a worker (the one using the most RAM),
    # when the total machine RAM usage is at or greater than 98%.
    max_mem_percent=98,

    # Combat memory leaks by reloading a worker after it's processed 5000 tasks.
    max_tasks_per_worker=5000,

    # Schedule two tasks, the first runs every minute, the second once every ten minutes.
    # Scheduled tasks can be passed as CronTask instances or tuples. To run scheduled
    # tasks you must keep a wakaq scheduler running as a daemon.
    schedules=[

        # Runs mytask on the queue with priority 1.
        CronTask('* * * * *', 'mytask', queue=Q_MED, args=[2, 2], kwargs={}),

        # Runs mytask once every 5 minutes.
        ('*/5 * * * *', 'mytask', [1, 1], {}),

        # Runs anothertask on the default lowest priority queue.
        ('*/10 * * * *', 'anothertask'),
    ],
)


# timeouts can be customized per task with a timedelta or integer seconds
@wakaq.task(queue=Q_MED, max_retries=7, soft_timeout=420, hard_timeout=480)
def mytask(x, y):
    print(x + y)


@wakaq.task
def anothertask():
    print("hello world")


@wakaq.wrap_tasks_with
def custom_task_decorator(fn):
    def inner(*args, **kwargs):
        # do something before each task runs
        fn(*args, **kwargs)
        # do something after each task runs
    return inner


if __name__ == '__main__':

    # add 1 plus 1 on a worker somewhere
    mytask.delay(1, 1)

    # add 1 plus 1 on a worker somewhere, overwriting the task's queue from medium to high
    mytask.delay(1, 1, queue=Q_HIGH)

    # print hello world on a worker somewhere, running on the default lowest priority queue
    anothertask.delay()

    # print hello world on a worker somewhere, after 10 seconds from now
    anothertask.delay(eta=timedelta(minutes=10))

Deploying

Optimizing

See the WakaQ init params for a full list of options, like Redis host and Redis socket timeout values.

When using in production, make sure to increase the max open ports allowed for your Redis server process.

When using eta tasks a Redis sorted set is used, so eta tasks are automatically deduped based on task name, args, and kwargs. If you want multiple pending eta tasks with the same arguments, just add a throwaway random string to the task’s kwargs for ex: str(uuid.uuid1()).

Running as a Daemon

Here’s an example systemd config to run wakaq-worker as a daemon:

[Unit]
Description=WakaQ Worker Service

[Service]
WorkingDirectory=/opt/yourapp
ExecStart=/opt/yourapp/venv/bin/python /opt/yourapp/venv/bin/wakaq-worker --app=yourapp.wakaq
RemainAfterExit=no
Restart=always
RestartSec=30s
KillSignal=SIGINT
LimitNOFILE=99999

[Install]
WantedBy=multi-user.target

Create a file at /etc/systemd/system/wakaqworker.service with the above contents, then run:

systemctl daemon-reload && systemctl enable wakaqworker

wakaq's People

Contributors

alanhamlett avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

wakaq's Issues

Explain the difference with RQ

I saw wakaq today. I think it's great to have lightweight alternatives to celery which is quite complex and not always easy to setup in a reliable way…

In the README the subtitle makes the difference with celery clear: it's lightweight, but what about a framework like https://python-rq.org/ ?
On a project I was to go for RQ but maybe I would have good reason to try wakaq ?

I opened an issue because I think it's worth mentioning in the README.

Redis username, password and database support

Hi 👋
Is there username, password and database (db) support for the Redis broker right now?
If not, is this something you'd want to see as a feature? I'd be willing to submit a PR!

Background on the issue

I'm currently trying to move from Celery to WakaQ due to recurring issues with Celery idling indefinitely while there is a queue of unstarted tasks in Redis. However, our Redis instance is cloud hosted and has a password on it; passing WakaQ the connection string I used with celery (redis://<my-user>:<my-pass>@<redis-host>:<redis-port>/<redis-db>) doesn't seem to work so I'm a bit stuck.

Possible implementation

After a quick look through the source, the Redis package WakaQ is using supports usernames and passwords so it would be as simple as adding a few extra params to __init__.WakaQ.__init__() and dropping in the username, password and db params for the Redis instance if they're not equal to None.

Here's a rough preview of the changes that would need to be done:

   def __init__(
        self,
        queues=[],
        schedules=[],
        host="localhost",
        port=6379,
        username=None,
        password=None,
        db="0" # the default database
        ...
    ):
    ...
    self.broker = redis.Redis(
            host=host,
            port=port,
            username=username,
            password=password,
            db=db,
            charset="utf-8",
            decode_responses=True,
            health_check_interval=health_check_interval,
            socket_timeout=socket_timeout,
            socket_connect_timeout=socket_connect_timeout,
        )

BTW, thanks for building WakaQ! I've used WakaTime for a few years, and I'm sure it's tracked a lot of the time I spent fixing bugs in Celery 😅

Trying to run, got AttributeError: 'method-wrapper' object has no attribute 'worker_log_file'

Hello. Using provided in README example. Trying to run worker wakaq-worker --app=wcheck.__init__

  File "/opt/homebrew/Caskroom/miniforge/base/envs/wcheck/lib/python3.10/site-packages/wakaq/logger.py", line 44, in setup_logging
    log_file = wakaq.scheduler_log_file if is_scheduler else wakaq.worker_log_file
AttributeError: 'method-wrapper' object has no attribute 'worker_log_file'

Any hints? I tried to add worker_log_file into WakaQ instance initialization but still no success.

Parent process logging SoftTimeout

Received this error in logs yesterday, but it's strange because the Parent process should never execute Worker._on_soft_timeout_child. That method should only execute on Child processes when the Parent process sends a SoftTimeout via SIGQUIT to the child. The Parent process executes Worker._on_exit_parent when it receives SIGQUIT.

full traceback
[2023-12-17 09:29:39,088] worker-03 ERROR in None args=None kwargs=None retry=None: Unable to unpack message from child process 1434680: backup_heartbeat
[2023-12-17 09:29:39,111] worker-03 ERROR in None args=None kwargs=None retry=None: Unable to unpack message from child process 1434689: backup_he
[2023-12-17 09:29:39,111] worker-03 ERROR in None args=None kwargs=None retry=None: Unable to unpack message from child process 1434864: backup_
[2023-12-17 09:33:46,210] worker-03 ERROR in None args=None kwargs=None retry=None: Traceback (most recent call last):
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 192, in _parent
    self._check_child_runtimes()
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 520, in _check_child_runtimes
    elif not child.soft_timeout_reached and soft_timeout and runtime > soft_timeout:
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 358, in _on_soft_timeout_child
    raise SoftTimeout("SoftTimeout")
wakaq.exceptions.SoftTimeout: SoftTimeout

[2023-12-17 09:33:48,849] worker-03 ERROR in None args=None kwargs=None retry=None: Traceback (most recent call last):
  File "./venv/lib/python3.10/site-packages/wakaq/utils.py", line 151, in read_fd
    return os.read(fd, 64000).decode("utf8")
BlockingIOError: [Errno 11] Resource temporarily unavailable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 192, in _parent
    self._check_child_runtimes()
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 496, in _check_child_runtimes
    ping = read_fd(child.pingin)
  File "./venv/lib/python3.10/site-packages/wakaq/utils.py", line 151, in read_fd
    return os.read(fd, 64000).decode("utf8")
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 358, in _on_soft_timeout_child
    raise SoftTimeout("SoftTimeout")
wakaq.exceptions.SoftTimeout: SoftTimeout

[2023-12-17 09:33:51,138] worker-03 ERROR in None args=None kwargs=None retry=None: Traceback (most recent call last):
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 183, in _parent
    self._enqueue_ready_eta_tasks()
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 385, in _enqueue_ready_eta_tasks
    self.wakaq._enqueue_at_front(task_name, queue.name, args, kwargs)
  File "./venv/lib/python3.10/site-packages/wakaq/__init__.py", line 189, in _enqueue_at_front
    payload = serialize(
  File "./venv/lib/python3.10/site-packages/wakaq/serializer.py", line 93, in serialize
    return dumps(*args, **kwargs)
  File "/usr/lib/python3.10/json/__init__.py", line 238, in dumps
    **kw).encode(obj)
  File "/usr/lib/python3.10/json/encoder.py", line 199, in encode
    chunks = self.iterencode(o, _one_shot=True)
  File "/usr/lib/python3.10/json/encoder.py", line 257, in iterencode
    return _iterencode(o, 0)
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 358, in _on_soft_timeout_child
    raise SoftTimeout("SoftTimeout")
wakaq.exceptions.SoftTimeout: SoftTimeout

[2023-12-17 09:33:51,126] worker-03 ERROR in None args=None kwargs=None retry=None: Traceback (most recent call last):
  File "./venv/lib/python3.10/site-packages/wakaq/utils.py", line 151, in read_fd
    return os.read(fd, 64000).decode("utf8")
BlockingIOError: [Errno 11] Resource temporarily unavailable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 192, in _parent
    self._check_child_runtimes()
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 496, in _check_child_runtimes
    ping = read_fd(child.pingin)
  File "./venv/lib/python3.10/site-packages/wakaq/utils.py", line 151, in read_fd
    return os.read(fd, 64000).decode("utf8")
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 358, in _on_soft_timeout_child
    raise SoftTimeout("SoftTimeout")
wakaq.exceptions.SoftTimeout: SoftTimeout

This traceback in particular doesn't make sense, and shouldn't be possible:

Traceback (most recent call last):
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 192, in _parent
    self._check_child_runtimes()
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 496, in _check_child_runtimes
    ping = read_fd(child.pingin)
  File "./venv/lib/python3.10/site-packages/wakaq/utils.py", line 151, in read_fd
    return os.read(fd, 64000).decode("utf8")
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 358, in _on_soft_timeout_child
    raise SoftTimeout("SoftTimeout")
wakaq.exceptions.SoftTimeout: SoftTimeout
another traceback
[2023-12-23 07:55:11,392] wakaq-worker-04 ERROR in None args=None kwargs=None retry=None: Traceback (most recent call last):
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 186, in _parent
    self._listen_for_broadcast_task()
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 526, in _listen_for_broadcast_task
    msg = self._pubsub.get_message(ignore_subscribe_messages=True, timeout=self.wakaq.wait_timeout)
  File "./venv/lib/python3.10/site-packages/redis/client.py", line 1013, in get_message
    response = self.parse_response(block=(timeout is None), timeout=timeout)
  File "./venv/lib/python3.10/site-packages/redis/client.py", line 824, in parse_response
    response = self._execute(conn, try_read)
  File "./venv/lib/python3.10/site-packages/redis/client.py", line 800, in _execute
    return conn.retry.call_with_retry(
  File "./venv/lib/python3.10/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
  File "./venv/lib/python3.10/site-packages/redis/client.py", line 801, in <lambda>
    lambda: command(*args, **kwargs),
  File "./venv/lib/python3.10/site-packages/redis/client.py", line 818, in try_read
    if not conn.can_read(timeout=timeout):
  File "./venv/lib/python3.10/site-packages/redis/connection.py", line 478, in can_read
    return self._parser.can_read(timeout)
  File "./venv/lib/python3.10/site-packages/redis/_parsers/base.py", line 128, in can_read
    return self._buffer and self._buffer.can_read(timeout)
  File "./venv/lib/python3.10/site-packages/redis/_parsers/socket.py", line 95, in can_read
    return bool(self.unread_bytes()) or self._read_from_socket(
  File "./venv/lib/python3.10/site-packages/redis/_parsers/socket.py", line 65, in _read_from_socket
    data = self._sock.recv(socket_read_size)
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 358, in _on_soft_timeout_child
    raise SoftTimeout("SoftTimeout")
wakaq.exceptions.SoftTimeout: SoftTimeout
and another traceback
[2023-12-23 07:55:11,393] wakaq-worker-04 ERROR in None args=None kwargs=None retry=None: Traceback (most recent call last):
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 183, in _parent
    self._enqueue_ready_eta_tasks()
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 379, in _enqueue_ready_eta_tasks
    results = script(keys=[queue.broker_eta_key], args=[int(round(time.time()))])
  File "./venv/lib/python3.10/site-packages/redis/commands/core.py", line 5931, in __call__
    return client.evalsha(self.sha, len(keys), *args)
  File "./venv/lib/python3.10/site-packages/redis/commands/core.py", line 5307, in evalsha
    return self._evalsha("EVALSHA", sha, numkeys, *keys_and_args)
  File "./venv/lib/python3.10/site-packages/redis/commands/core.py", line 5291, in _evalsha
    return self.execute_command(command, sha, numkeys, *keys_and_args)
  File "./venv/lib/python3.10/site-packages/redis/client.py", line 536, in execute_command
    return conn.retry.call_with_retry(
  File "./venv/lib/python3.10/site-packages/redis/retry.py", line 46, in call_with_retry
    return do()
  File "./venv/lib/python3.10/site-packages/redis/client.py", line 537, in <lambda>
    lambda: self._send_command_parse_response(
  File "./venv/lib/python3.10/site-packages/redis/client.py", line 513, in _send_command_parse_response
    return self.parse_response(conn, command_name, **options)
  File "./venv/lib/python3.10/site-packages/redis/client.py", line 553, in parse_response
    response = connection.read_response()
  File "./venv/lib/python3.10/site-packages/redis/connection.py", line 500, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
  File "./venv/lib/python3.10/site-packages/redis/_parsers/resp2.py", line 15, in read_response
    result = self._read_response(disable_decoding=disable_decoding)
  File "./venv/lib/python3.10/site-packages/redis/_parsers/resp2.py", line 25, in _read_response
    raw = self._buffer.readline()
  File "./venv/lib/python3.10/site-packages/redis/_parsers/socket.py", line 115, in readline
    self._read_from_socket()
  File "./venv/lib/python3.10/site-packages/redis/_parsers/socket.py", line 65, in _read_from_socket
    data = self._sock.recv(socket_read_size)
  File "./venv/lib/python3.10/site-packages/wakaq/worker.py", line 358, in _on_soft_timeout_child
    raise SoftTimeout("SoftTimeout")
wakaq.exceptions.SoftTimeout: SoftTimeout

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.