cofin / litestar-saq Goto Github PK
View Code? Open in Web Editor NEWSAQ Plugin for Litestar
License: MIT License
SAQ Plugin for Litestar
License: MIT License
MCVE
# ruff: noqa
import asyncio
import logging
import pytest
from litestar_saq.base import Queue, Worker
from saq.types import Function
pytestmark = pytest.mark.anyio
queue = Queue.from_url("redis://localhost:6379/1")
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
@pytest.fixture(scope="session")
def anyio_backend() -> str:
return "asyncio"
async def some_function(_: dict, *, idnum: int) -> int:
logger.info(f"some_function called with ID = {idnum}")
return idnum
async def apply_some_function() -> None:
# logger.info(f"queue = {queue}, id(queue) = {id(queue)}")
idn = 1
await queue.apply(
"some_function",
idnum=idn,
key=f"test-function-{idn}",
)
async def enqueue_some_function() -> None:
# logger.info(f"queue = {queue}, id(queue) = {id(queue)}")
idn = 1
await queue.enqueue(
"some_function",
idnum=idn,
key=f"test-function-{idn}",
)
async def run_litestar_saq(operation: str) -> None:
try:
logger.info("starting Background worker processes")
# logger.info(f"id(queue) = {id(queue)}")
functions: list[Function] = [some_function]
worker = Worker(queue, functions=functions)
task = asyncio.create_task(worker.start())
match operation:
case "apply":
await apply_some_function()
case "enqueue":
await enqueue_some_function()
finally:
await worker.stop()
await queue._pubsub.close()
await queue.redis.aclose()
await queue.redis.connection_pool.disconnect()
logger.info("Shutdown complete")
async def test_saq(monkeypatch: pytest.MonkeyPatch) -> None:
# works
logger.info("Running enqueue")
await run_litestar_saq("enqueue")
"""
def mock_job_id(self, job_key: str) -> str:
# return f"{self._namespace}:{self.name}:job:{job_key}"
return f"{self._namespace}:job:{self.name}:{job_key}"
monkeypatch.setattr(Queue, "job_id", mock_job_id)
"""
logger.info("")
logger.info("")
logger.info("Running apply")
# hangs unless monkeypatching of job_id is uncommented out
await run_litestar_saq("apply")
With the original job_id
called the apply
operation hangs until it times out (the enqueue
operation seems unaffected):
$ pytest litestar_saq_mcve.py -s -n=0 --log-cli-level=20
=========================================================================== test session starts ============================================================================
litestar_saq_mcve.py::test_saq
------------------------------------------------------------------------------ live log call -------------------------------------------------------------------------------
INFO root:litestar_saq_mcve.py:70 Running enqueue
INFO root:litestar_saq_mcve.py:50 starting Background worker processes
INFO saq:worker.py:123 Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>, name='default'>
INFO saq:worker.py:148 Worker shutting down
INFO root:litestar_saq_mcve.py:65 Shutdown complete
INFO root:litestar_saq_mcve.py:80
INFO root:litestar_saq_mcve.py:81
INFO root:litestar_saq_mcve.py:82 Running apply
INFO root:litestar_saq_mcve.py:50 starting Background worker processes
INFO saq:worker.py:123 Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>, name='default'>
I've traced this down to the format of the string returned by job_id
. If we monkeypatch that to return the string in the same format as the saq
library, then the apply
succeeds:
$ pytest litestar_saq_mcve.py -s -n=0 --log-cli-level=20
=========================================================================== test session starts ============================================================================
litestar_saq_mcve.py::test_saq
------------------------------------------------------------------------------ live log call -------------------------------------------------------------------------------
INFO root:litestar_saq_mcve.py:70 Running enqueue
INFO root:litestar_saq_mcve.py:50 starting Background worker processes
INFO saq:worker.py:123 Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>, name='default'>
INFO saq:worker.py:148 Worker shutting down
INFO root:litestar_saq_mcve.py:65 Shutdown complete
INFO root:litestar_saq_mcve.py:79
INFO root:litestar_saq_mcve.py:80
INFO root:litestar_saq_mcve.py:81 Running apply
INFO root:litestar_saq_mcve.py:50 starting Background worker processes
INFO saq:worker.py:123 Worker starting: Queue<redis=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=1>>>, name='default'>
INFO saq:queue.py:555 Enqueuing Job<function=some_function, queue=default, id=saq:job:default:test-function-1, attempts=0>
INFO saq:worker.py:255 Processing Job<function=some_function, queue=default, id=saq:job:default:test-function-1, start_ms=36, attempts=1>
INFO root:litestar_saq_mcve.py:24 some_function called with ID = 1
INFO saq:queue.py:469 Finished Job<function=some_function, queue=default, id=saq:job:default:test-function-1, process_ms=4, start_ms=36, attempts=1>
INFO saq:worker.py:148 Worker shutting down
INFO root:litestar_saq_mcve.py:65 Shutdown complete
PASSED
I have an app built with litestar that is deployed in production using NGINX Unit so I'm not using the litestar CLI to start the application.
I'd like for when nginx unit starts my application for the saq workers to start as well. Is there a way to do that without running with litestar CLI?
My other idea was to just start and stop saq with systemd but it'd be neat if I didn't have to do that extra step.
Thanks for your work on this,
litestar-saq/litestar_saq/config.py
Line 95 in 988de42
needs a type
Is it possible to use Litestar DI at job functions like:
def job(product_service: ProductService):
pass
?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.