Coder Social home page Coder Social logo

litestar-saq's People

Contributors

cofin avatar darinkishore avatar franzforstmayr avatar jacobcoffee avatar sanderwegter avatar ssenior45 avatar thegreatalgo avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar

litestar-saq's Issues

Queue.job_id format causes apply to hang

MCVE

# ruff: noqa
import asyncio
import logging

import pytest
from litestar_saq.base import Queue, Worker
from saq.types import Function

pytestmark = pytest.mark.anyio

queue = Queue.from_url("redis://localhost:6379/1")

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


@pytest.fixture(scope="session")
def anyio_backend() -> str:
    return "asyncio"


async def some_function(_: dict, *, idnum: int) -> int:
    logger.info(f"some_function called with ID = {idnum}")
    return idnum


async def apply_some_function() -> None:
    # logger.info(f"queue = {queue}, id(queue) = {id(queue)}")
    idn = 1
    await queue.apply(
        "some_function",
        idnum=idn,
        key=f"test-function-{idn}",
    )


async def enqueue_some_function() -> None:
    # logger.info(f"queue = {queue}, id(queue) = {id(queue)}")
    idn = 1
    await queue.enqueue(
        "some_function",
        idnum=idn,
        key=f"test-function-{idn}",
    )


async def run_litestar_saq(operation: str) -> None:
    try:
        logger.info("starting Background worker processes")
        # logger.info(f"id(queue) = {id(queue)}")
        functions: list[Function] = [some_function]
        worker = Worker(queue, functions=functions)
        task = asyncio.create_task(worker.start())
        match operation:
            case "apply":
                await apply_some_function()
            case "enqueue":
                await enqueue_some_function()
    finally:
        await worker.stop()
        await queue._pubsub.close()
        await queue.redis.aclose()
        await queue.redis.connection_pool.disconnect()
        logger.info("Shutdown complete")


async def test_saq(monkeypatch: pytest.MonkeyPatch) -> None:
    # works
    logger.info("Running enqueue")
    await run_litestar_saq("enqueue")

    """
    def mock_job_id(self, job_key: str) -> str:
        # return f"{self._namespace}:{self.name}:job:{job_key}"
        return f"{self._namespace}:job:{self.name}:{job_key}"

    monkeypatch.setattr(Queue, "job_id", mock_job_id)
    """
    logger.info("")
    logger.info("")
    logger.info("Running apply")
    # hangs unless monkeypatching of job_id is uncommented out
    await run_litestar_saq("apply")

With the original job_id called the apply operation hangs until it times out (the enqueue operation seems unaffected):

$ pytest litestar_saq_mcve.py -s -n=0 --log-cli-level=20
=========================================================================== test session starts ============================================================================                                                                                                                                                 

litestar_saq_mcve.py::test_saq 
------------------------------------------------------------------------------ live log call -------------------------------------------------------------------------------
INFO     root:litestar_saq_mcve.py:70 Running enqueue
INFO     root:litestar_saq_mcve.py:50 starting Background worker processes
INFO     saq:worker.py:123 Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>, name='default'>
INFO     saq:worker.py:148 Worker shutting down
INFO     root:litestar_saq_mcve.py:65 Shutdown complete
INFO     root:litestar_saq_mcve.py:80 
INFO     root:litestar_saq_mcve.py:81 
INFO     root:litestar_saq_mcve.py:82 Running apply
INFO     root:litestar_saq_mcve.py:50 starting Background worker processes
INFO     saq:worker.py:123 Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>, name='default'>

I've traced this down to the format of the string returned by job_id. If we monkeypatch that to return the string in the same format as the saq library, then the apply succeeds:

$ pytest litestar_saq_mcve.py -s -n=0 --log-cli-level=20
=========================================================================== test session starts ============================================================================

litestar_saq_mcve.py::test_saq 
------------------------------------------------------------------------------ live log call -------------------------------------------------------------------------------
INFO     root:litestar_saq_mcve.py:70 Running enqueue
INFO     root:litestar_saq_mcve.py:50 starting Background worker processes
INFO     saq:worker.py:123 Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>, name='default'>
INFO     saq:worker.py:148 Worker shutting down
INFO     root:litestar_saq_mcve.py:65 Shutdown complete
INFO     root:litestar_saq_mcve.py:79 
INFO     root:litestar_saq_mcve.py:80 
INFO     root:litestar_saq_mcve.py:81 Running apply
INFO     root:litestar_saq_mcve.py:50 starting Background worker processes
INFO     saq:worker.py:123 Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>, name='default'>
INFO     saq:queue.py:555 Enqueuing Job<function=some_function, queue=default, id=saq:job:default:test-function-1, attempts=0>
INFO     saq:worker.py:255 Processing Job<function=some_function, queue=default, id=saq:job:default:test-function-1, start_ms=36, attempts=1>
INFO     root:litestar_saq_mcve.py:24 some_function called with ID = 1
INFO     saq:queue.py:469 Finished Job<function=some_function, queue=default, id=saq:job:default:test-function-1, process_ms=4, start_ms=36, attempts=1>
INFO     saq:worker.py:148 Worker shutting down
INFO     root:litestar_saq_mcve.py:65 Shutdown complete
PASSED

Possible to run workers without using litestar CLI?

I have an app built with litestar that is deployed in production using NGINX Unit so I'm not using the litestar CLI to start the application.

I'd like for when nginx unit starts my application for the saq workers to start as well. Is there a way to do that without running with litestar CLI?

My other idea was to just start and stop saq with systemd but it'd be neat if I didn't have to do that extra step.

Thanks for your work on this,

Litestar DI

Is it possible to use Litestar DI at job functions like:

    def job(product_service: ProductService):
           pass

?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.