Coder Social home page Coder Social logo

flamingo-run / django-cloud-tasks Goto Github PK

View Code? Open in Web Editor NEW
40.0 5.0 11.0 725 KB

It's like Celery, but with Serverless Google Cloud products.

License: Apache License 2.0

Makefile 0.60% Python 99.40%
cloud-tasks cloud-scheduler google-cloud-platform

django-cloud-tasks's Introduction

Github CI Maintainability Test Coverage python python

Django Cloud Tasks

Powered by GCP Pilot.

Installation

pip install django-google-cloud-tasks

APIs

The following APIs must be enabled in your project(s):

Configuration

Add DJANGO_CLOUD_TASKS_ENDPOINT to your Django settings or as an environment variable

image

Additionally, you can configure with more settings or environment variables:

  • DJANGO_CLOUD_TASKS_APP_NAME: uses this name as the queue name, prefix to topics and subscriptions. Default: None.
  • DJANGO_CLOUD_TASKS_DELIMITER: uses this name as delimiter to the APP_NAME and the topic/subscription name. Default: --.
  • DJANGO_CLOUD_TASKS_EAGER: force the tasks to always run synchronously. Useful for local development. Default: False.
  • DJANGO_CLOUD_TASKS_URL_NAME: Django URL-name that process On Demand tasks. We provide a view for that, but if you want to create your own, set this value. Default: tasks-endpoint.
  • DJANGO_CLOUD_TASKS_SUBSCRIBERS_URL_NAME: Django URL-name that process Subscribers. We provide a view for that, but if you want to create your own, set this value. Default: subscriptions-endpoint.
  • DJANGO_CLOUD_TASKS_PROPAGATED_HEADERS: . Default: ["traceparent"].
  • DJANGO_CLOUD_TASKS_PROPAGATED_HEADERS_KEY: when propagating headers in PubSub, use a key to store the values in the Message data. Default: _http_headers.

On Demand Task

Tasks can be executed on demand, asynchronously or synchronously.

The service used is Google Cloud Tasks:

from django_cloud_tasks.tasks import Task


class MyTask(Task):
    def run(self, x, y):
        # computation goes here
        print(x**y)


MyTask.asap(x=10, y=3)  # run async (another instance will execute and print)
MyTask.sync(x=10, y=5)  # run sync (the print happens right now)

It's also possible to execute asynchronously, but not immediately:

MyTask.later(task_kwargs=dict(x=10, y=5), eta=3600)  # run async in 1 hour (int, timedelta and datetime are accepted)

MyTask.until(task_kwargs=dict(x=10, y=5), eta=3600)  # run async up to 1 hour, decided randomly (int, timedelta and datetime are accepted)

All of these call methods are wrappers to the fully customizable method push, which supports overriding queue name, headers and more:

MyTask.push(task_kwargs=dict(x=10, y=5), **kwargs)  # run async, but deeper customization is available

Queue

When executing an async task, a new job will be added to a queue in Cloud Tasks to be processed by another Cloud Run instance.

You can choose this queue's name in the following order:

  • Overriding manually when scheduling with push, until or later
  • Defining DJANGO_CLOUD_TASKS_APP_NAME in Django settings
  • otherwise, tasks will be used as queue name

It's also possible to set dynamically with:

from django_cloud_tasks.tasks import Task


class MyTask(Task):
    @classmethod
    def queue(cls) -> str:
        return "my-queue-name-here"

Troubleshooting

When a task if failing in Cloud Tasks and you want to debug locally with the same data, you can get the task ID from Cloud Task UI (the big number in the column NAME) and run the task locally with the same parameters with:

MyTask.debug(task_id="<the task number>")

Cleanup

Google Cloud Tasks will automatically discard any jobs after the max-retries.

If by any reason you need to discard jobs manually, you can provide the Task ID:

MyTask.discard(task_id="<the task number>")

Or you can batch discard many tasks at once:

MyTask.discard()

You can also provide min_retries parameter to filter the tasks that have retried at least some amount (so tasks have some chance to execute):

MyTask.discard(min_retries=5)

Periodic Task

Tasks can be executed recurrently, using a crontab syntax.

The backend used in Google Cloud Scheduler.

from django_cloud_tasks.tasks import PeriodicTask


class RecurrentTask(PeriodicTask):
    run_every = "* * 0 0"  # crontab syntax
    
    def run(self):
        # computation goes here
        ...

For these tasks to be registered in Cloud Scheduler, you must execute the setup once (in production, usually at the same momento you perform database migrations, ou collect static files)

python manage.py schedule_tasks

If you need, you can also run these tasks synchronously or asynchronously, they will behave exactly as a task on demand:

RecurrentTask.asap()  # run async
RecurrentTask.sync()  # run sync

Publisher

Messages can be sent to a Pub/Sub topic, synchronously or asynchronously.

The backend used in Google Cloud PubSub topics.

from django_cloud_tasks.tasks import PublisherTask


class MyPublisher(PublisherTask):
    @classmethod
    def topic_name(cls) -> str:
        return "potato"  # if you don't set one, we'll use the class name (ie. my-publisher)
    

MyPublisher.sync(message={"x": 10, "y": 3})  # publish synchronously
MyPublisher.asap(message={"x": 10, "y": 3})  # publish asynchronously, using Cloud Tasks

For convenience, there's also a dynamic publisher specialized in publishing Django models.

from django_cloud_tasks.tasks import ModelPublisherTask
from django.db.models import Model


class MyModelPublisherTask(ModelPublisherTask):
    @classmethod
    def build_message_content(cls, obj: Model, **kwargs) -> dict:
        return {"pk": obj.pk}  # serialize your model

    @classmethod
    def build_message_attributes(cls, obj: Model, **kwargs) -> dict[str, str]:
        return {}  # any metadata you might want to send as attributes
    
    
one_obj = MyModel.objects.first()

MyModelPublisherTask.sync(obj=one_obj) # publish synchronously
MyModelPublisherTask.asap(obj=one_obj) # publish asynchronously, using Cloud Tasks

Subscriber

Messages can be received in a Pub/Sub subscription, synchronously or asynchronously.

The backend used in Google Cloud PubSub subscriptions.

from django_cloud_tasks.tasks import SubscriberTask


class MySubscriber(SubscriberTask):
    def run(self, content: dict, attributes: dict[str, str] | None = None):
        ...  # process the message you received
    
    @classmethod
    def topic_name(cls) -> str:
        return "potato"
    
    @classmethod
    def subscription_name(cls) -> str:
        return "tomato"  # if you don't set it, we'll use the class name (eg. my-subscriber)

Contributing

When contributing to this repository, you can setup:

As an application (when contributing)

  • Install packages:
    make dependencies
  • If you have changed the package dependencies in poetry.lock:
    make update

As a package (when inside another application)

  • In the application's pyproject.toml, add the remote private repository and the package with version:
[packages]
django-google-cloud-tasks = {version="<version>"}
  • During development, if you wish to install from a local source (in order to test integration with ease):
    # inside the application
    poetry add -e /path/to/this/lib

Testing

To run tests:

make test

To fix linter issues:

make style

Version

Use Semantic versioning.

django-cloud-tasks's People

Contributors

diegodfreire avatar guilhermearaujo avatar joaodaher avatar jrobichaud avatar lucasgomide avatar rodolfo3 avatar rodrigoalmeidaee avatar spookyuser avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar

django-cloud-tasks's Issues

Add periodic task to admin Django dashboard

Hi,thanks for this amazing repo i am very interested in using it with cloud run but is there a way to be like celerybeat which can i manage scheduling task in django admin. If there a way would be great. Thanks.

Can I Save or update data in Django Models in the PeriodicTask

Hi, this is not an issue. Just want to understand how the scheduler works. Can I do Something like this in the PeriodicTask:

from django_cloud_tasks.tasks import PeriodicTask
from some_app.models import SomeModel


class RecurrentTask(PeriodicTask):
    run_every = "* * 0 0"  # crontab syntax
    
    def run(self):
        # computation goes here
        SomeModel.objects.get_or_create(**some_data)
        ...

Basically, I am looking to create an entry in or update the database.

Question: RecursionError: maximum recursion depth exceeded while decoding a JSON array from a unicode string

When I try to execute a task synchronously I receive this message. Surely I've not configured properly the package. Can you help me to understand better the documentation to run this package on my project?

Currently I have created a queue in GCP with the following name and it setting up in Django:

DJANGO_GOOGLE_CLOUD_TASKS_APP_NAME = "api-backend-default-tasks-queue"

Also, I've added the urls to my project's urls file.

When I try to execute this tasks:
SendWelcomeEmailTask.asap(user_email="xxxxxx@xxxx", activation_key="xxxx")

I've received that error. Can you help me to understand how to get started with the package?
Thank yo so much!

[HELP] How to use it in local development

Hello, I was taking a look at this very nice project and would like to use it, however, I am not sure how I can mock the server behavior in local development. Any help would be appreciated.

Thanks.

IAM Documentation and additional configurations

Hi,

The documentation states we must enable 3 API (Cloud Task, Cloud Scheduler and Admin SDK) in GCP and the rest appears to be django project configuration.

However I believe there is more to do in order to be fully configured unless there is something I missed. Ex: add IAM to the service account and create pubsub.

On Demand Task:

I suppose we must add Cloud Tasks Enqueuer at least https://cloud.google.com/tasks/docs/reference-access-control

Any others?

Periodic Task

What IAM are needed for

python manage.py schedule_tasks

to work?

Publisher

I suppose some pubsub IAM are needed for publisher or subscriber

Its not stated but I assume the topics must be created manually as well (using GCP's interface or with terraform) and are not managed by a django command.

HTTPs issue when running task.

Finished setting up the module as per instructions and comments in Issues section.
Latest issue:

grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INVALID_ARGUMENT
details = "HttpRequest.url must start with 'https://' for request with HttpRequest.authorization_header"
debug_error_string = "UNKNOWN:Error received from peer ipv4:142.251.107.95:443 {created_time:"2024-02-19T02:04:15.887971336+00:00", grpc_status:3, grpc_message:"HttpRequest.url must start with 'https://' for request with HttpRequest.authorization_header"}"

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/django/core/handlers/exception.py", line 55, in inner
response = get_response(request)
^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/django/core/handlers/base.py", line 197, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/django/views/generic/base.py", line 104, in view
return self.dispatch(request, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/django/views/generic/base.py", line 143, in dispatch
return handler(request, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/srv/video_processor/views.py", line 97, in post
VideoProcessingTask.asap(x=10, y=3)
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/django_cloud_tasks/tasks/task.py", line 216, in asap
return cls.push(task_kwargs=kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/django_cloud_tasks/tasks/task.py", line 291, in push
outcome = client.push(**api_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/gcp_pilot/tasks.py", line 92, in push
response = self.client.create_task(parent=queue_path, task=task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/cloud/tasks_v2/services/cloud_tasks/client.py", line 2460, in create_task
response = rpc(
^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/api_core/gapic_v1/method.py", line 131, in call
return wrapped_func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/api_core/timeout.py", line 120, in func_with_timeout
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/api_core/grpc_helpers.py", line 78, in error_remapped_callable
raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.InvalidArgument: 400 HttpRequest.url must start with 'https://' for request with HttpRequest.authorization_header

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.