Coder Social home page Coder Social logo

async-lru's Introduction

async-lru

info:Simple lru cache for asyncio
GitHub Actions CI/CD workflows status async-lru @ PyPI Matrix Room — #aio-libs:matrix.org Matrix Space — #aio-libs-space:matrix.org

Installation

pip install async-lru

Usage

This package is a port of Python's built-in functools.lru_cache function for asyncio. To better handle async behaviour, it also ensures multiple concurrent calls will only result in 1 call to the wrapped function, with all awaits receiving the result of that call when it completes.

import asyncio

import aiohttp
from async_lru import alru_cache


@alru_cache(maxsize=32)
async def get_pep(num):
    resource = 'http://www.python.org/dev/peps/pep-%04d/' % num
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(resource) as s:
                return await s.read()
        except aiohttp.ClientError:
            return 'Not Found'


async def main():
    for n in 8, 290, 308, 320, 8, 218, 320, 279, 289, 320, 9991:
        pep = await get_pep(n)
        print(n, len(pep))

    print(get_pep.cache_info())
    # CacheInfo(hits=3, misses=8, maxsize=32, currsize=8)

    # closing is optional, but highly recommended
    await get_pep.cache_close()


asyncio.run(main())

TTL (time-to-live, expiration on timeout) is supported by accepting ttl configuration parameter (off by default):

@alru_cache(ttl=5)
async def func(arg):
    return arg * 2

The library supports explicit invalidation for specific function call by cache_invalidate():

@alru_cache(ttl=5)
async def func(arg1, arg2):
    return arg1 + arg2

func.cache_invalidate(1, arg2=2)

The method returns True if corresponding arguments set was cached already, False otherwise.

Python 3.8+ is required

Thanks

The library was donated by Ocean S.A.

Thanks to the company for contribution.

async-lru's People

Contributors

ahivert avatar asvetlov avatar cancan101 avatar dependabot-preview[bot] avatar dependabot-support avatar dependabot[bot] avatar dreamsorcerer avatar hellysmile avatar krkd avatar litwisha avatar pyup-bot avatar sylwia-budzynska avatar webknjaz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

async-lru's Issues

Preserve callstack

After using the lib I realized callstack was not preserved, which is a (minor) issue for my logging system - I put a marker higher up the stack for logging library to set stacklevel to a higher level frame, while lru_cache is set on low level functions.

I understand why and how this happens with the new task pushed in event_loop, but please bear in mind I'm only starting to switch to a system including async parts, so my understanding is limited.

I implemented a radical workaround, dismissing the future + callback mechanism here: 8bf9eea - two features seem to be crippled: ttl does not seem to be possible, and also consecutive calls while first one is not done yet will always miss cache (since cache no longer stores future but result only after it's available)

2 questions :

  • are there any other side effect to this patch?
  • would it be possible to do this properly?

DeprecationWarning in Python 3.8

@asyncio.coroutine was deprecated in Python 3.8 with python/cpython#13346. loop argument is also deprecated. I am not sure async def can be used since the project ensures compatiblity with 3.3 and above whereas the syntax was present from 3.5 I guess. 3.4 also reached end of life.

  /home/karthi/async_lru/async_lru.py:191: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
    def wrapped(*fn_args, **fn_kwargs):

  /home/karthi/async_lru/tests/test_basic.py:146: DeprecationWarning: "@coroutine" decorator is deprecated since Python 3.8, use "async def" instead
    def coro():

  /home/karthi/async_lru/tests/test_close.py:26: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.1, loop=loop)

  /home/karthi/async_lru/tests/test_close.py:14: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.2, loop=loop)

  /home/karthi/async_lru/tests/test_close.py:64: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.1, loop=loop)

  /home/karthi/async_lru/tests/test_close.py:54: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.2, loop=loop)

  /home/karthi/async_lru/tests/test_close.py:94: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.1, loop=loop)

  /home/karthi/async_lru/tests/test_close.py:84: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.2, loop=loop)

  /home/karthi/async_lru/tests/test_close.py:122: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.1, loop=loop)

  /home/karthi/async_lru/tests/test_close.py:112: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.2, loop=loop)

  /home/karthi/async_lru/tests/test_close.py:156: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.1, loop=loop)

  /home/karthi/async_lru/tests/test_close.py:146: DeprecationWarning: The loop argument is deprecated and scheduled for removal in Python 3.10.
    await asyncio.sleep(0.2, loop=loop)

'invalidate' does not work with the class member function

It's a nice project and helped us a lot! But it seems that there's a problem when caching a member function in a class.
For example:

from async_lru import alru_cache
import asyncio

class Test:
    @alru_cache(maxsize=128)
    async def do_sth(self, x: int) -> int:
        print(f'x = {x}, cache not hit!')
        return x+10

When I called:

instance1 = Test()
looper = asyncio.get_event_loop()

looper.run_until_complete(instance1.do_sth(10))

instance1.do_sth.invalidate(10)

looper.run_until_complete(instance1.do_sth(10))

it still has cache when I invoked 'instance1.do_sth(10)' again. I guess there must something wrong when comparing the parameters of a function. I tried to edit your source code to fix it but failed, could you please give some idea?

Thanks a lot!

Does alru_cache prevent concurrent writes?

Suppose we have an alru_cached function with side effects, for example, sending an email

from collections import defaultdict
from async_lru import alru_cache

num_email_confirmations = defaultdict(int)

@alru_cache(maxsize=500)
async def send_email_confirmation(user_email: str) -> int:
    _send_confirmation(user_email)
    num_email_confirmations[user_email] += 1
    return num_email_confirmations[user_email] 

Now, what happens if we concurrently try to send the same user [email protected] an email multiple times, while the callback hasn't completed yet? Example:

send_once = asyncio.create_task(send_email_confirmation("[email protected]"))
send_again = asyncio.create_task(send_email_confirmation("[email protected]"))
await send_once
await send_again

assert send_again == 1

Now, running a minimal test and checking the code confirms that Albert should only receive his email once, since we are populating the cache with a future immediately. Is this behaviour documented or not? If so, would be great to document it explicitly. We are currently relying on this behaviour in our code.

Why was cache_exceptions dropped?

I can't find the PR but this commit dropped support for caching exceptions. The commit message says it makes no sense from a practical perspective. Why? If I know something is deterministic, it makes total sense for me to not want to recompute something I know is going to fail.

Loop Value must agree with future

There is a possibility that multiple threads can access the same value before it is resolved.

Is there a way to ignore this error and just run the function without throwing?

EDIT: I found a temporary fix

def safe_alru_cache(f):
    @functools.wraps(f)
    def wrapped(*args, **kwargs):
        try:
            cached_f = alru_cache(f)
            return cached_f(*args, **kwargs)
        except Exception as e:
            print(e)
            return f(*args, **kwargs)
    return wrapped

Using async_lru with tenacity @retry decorator

I have the following aiohttp app:

import asyncio
import aiohttp
from datetime import datetime

from async_lru import alru_cache
from aiocache.plugins import HitMissRatioPlugin
from aiohttp import web
from tenacity import (retry, retry_if_exception_type, stop_after_attempt, wait_exponential)
import random


@alru_cache(maxsize=1)
@retry(retry=retry_if_exception_type(Exception), stop=stop_after_attempt(5))
async def fetch(url):
    random.seed(int(datetime.now().timestamp()))
    rand_number = random.randint(1, 9)
    print(rand_number)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            if rand_number % 2 == 0:
                print("attempt exception...")
                raise Exception
            
            print("attempt default...")
            await asyncio.sleep(1)
            return await response.text()


async def handle(request):
    print(f"fetching default")
    text = await fetch('http://google.com')
    print(fetch.cache_info())
    return web.Response(text=text)


app = web.Application()
app.add_routes([web.get('/', handle)])


web.run_app(app, port=8081)

But when I try to run this, I get the following:

ujson module not found, using json
msgpack not installed, MsgPackSerializer unavailable
Traceback (most recent call last):
  File "cache.py", line 13, in <module>
    @retry(retry=retry_if_exception_type(Exception), wait=wait_exponential(multiplier=0.2, max=3), stop=stop_after_attempt(5))
  File "/Users/anvith.shivakumara/Workspace/python-playgraound/async_lru.py", line 183, in wrapper
    'Coroutine function is required, got {}'.format(fn))
RuntimeError: Coroutine function is required, got <function fetch at 0x1114c41e0>

But looking at documentation for tenacity here, https://github.com/jd/tenacity#async-and-retry, there seems to be support for asyncio. Can someone help me get these two running together or at least help understand why I cannot?

Thank you.

Custom size function support

cachetools has the feature for a custom size function to support things like caching files in memory and making sure the amount of bytes cached is not too large (as opposed to the amount of files cached).

Would it be possible to add support for this? Since size may actually depend on the result of the cached function, size should be evaluated once the coroutine finished

Would it be possible to add a feature like this?

Suggestion: use standard dict with Python >= 3.7

Since the order of elements in the dictionary is guaranteed in python 3.7, it would be good to use a standard dictionary instead of OrderedDict - this will save memory and slightly improve performance. Though, probably, at the size of 128 elements it does not matter much.

$ ipython
Python 3.7.3 (default, Apr  3 2019, 05:39:12) 
Type 'copyright', 'credits' or 'license' for more information
IPython 7.5.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: from collections import OrderedDict

In [2]: import sys

In [3]: std_dict = {i: object() for i in range(128)}

In [4]: ord_dict = OrderedDict((i, object()) for i in range(128))

In [5]: sys.getsizeof(std_dict)
Out[5]: 4704

In [6]: sys.getsizeof(ord_dict)
Out[6]: 10912

In [7]: %timeit for i in range(128): i in std_dict
3.79 µs ± 6.01 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In [8]: %timeit for i in range(128): i in ord_dict
4.17 µs ± 44.1 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)

In addition, it seems that there is an error in the _cache_hit method: probably, in this case, the key should be moved to the beginning, not the end of the dictionary. Although I do not really understand what in general can seriously affect the movement of the key on small amounts of data.

Exceptions are cached?

Hello,

I'm seeing some weird thing with Exception... The lru_cache decorator would not cache anything in case an exception is raised, so something like this:

# not very useful but shows well the difference
foo = 0
@lru_cache(maxsize=10)
def return_something_sync(a):
    global foo
    foo += 1
    raise Exception(foo, time.time())

Would raise something like this:

return_something_sync(1)
>>> Exception: (1, 1564754537.844053)
return_something_sync(1)
>>> Exception: (2, 1564754539.949204)

So a new exception each time.
But with alru_cache, the exception itself is cached and raised back!

foo = 0
@alru_cache(maxsize=10)
async def return_something(a):
    global foo
    foo += 1
    raise Exception(foo, time.time())

Will raise:

await return_something(1)
>>> Exception: (1, 1564754484.767096)
await return_something(1)
>>> Exception: (1, 1564754484.767096)

I would expect the exception not to be kept in cache, like for the sync equivalent; otherwise we have basically no way to call again the same function in case of transient issue (which can frequently happen if involving network access...).

Or am I missing something?

Thanks

asyncio.shield 'loop' parameter deprecated in python 3.10

Hi, in python 3.10 the asyncio.shield 'loop' parameter has been removed, so it will raise 'TypeError: shield() got an unexpected keyword argument 'loop'

File "C:\Users\Userr\AppData\Roaming\Python\Python310\site-packages\async_lru.py", line 237, in wrapped
return (yield from asyncio.shield(fut, loop=_loop))
TypeError: shield() got an unexpected keyword argument 'loop'

alru_cache caches the created Task objects

This test program prints Tasks: 101:

import asyncio
from async_lru import alru_cache
import random


@alru_cache(maxsize=100)
async def cached_test(arg):
    return arg


async def main():
    while True:
        arg = random.random()
        value = await cached_test(arg)
        assert arg == value
        print("Tasks: ", len(asyncio.Task.all_tasks()))


asyncio.run(main())

While it's OK to wrap the coroutine in a Task, it is not OK to keep the Task around. It should be keeping only the arg and result in its cache, not the Task.

New release

Could it be possible to publish a new release on pypi since this PR is merged ?
#481

Adding type annotations?

Can type annotations be added to this module.

In #70 it was mentioned that they couldn't be used as there was no mechanism for decorators. I don't believe that's true anymore.

Add type annotations

Test program:

import asyncio
from async_lru import alru_cache


@alru_cache(maxsize=16384)
async def my_cached_func(arg1: str, arg2: int) -> str:
    return arg1 * arg2


async def main() -> None:
    result = await my_cached_func("abc")
    print(result)


asyncio.run(main())

Now, the program has a bug, but mypy normally can't find it because of missing type annotations:

# mypy test.py --ignore-missing-imports

If I comment out the @alru_cache(maxsize=16384) line, I get the expected mypy error:

# mypy test.py --ignore-missing-imports
test.py:11: error: Too few arguments for "my_cached_func"

Relevant PEP: https://www.python.org/dev/peps/pep-0561/

`KeyError` is raised while using both: TTL and maxsize params.

Description
_LRUCacheWrapper._task_done_callback() fails with KeyError during concurrent processing when using ttl and maxsize. The latter is set to 128 by default so the problem occurs without maxsize explicitly set.

This seems to happening due to the fact that cached item has already been removed from self.__cache at the time when self._task_done_callback() is triggered. If this happens cache_item = self.__cache[key] fails.

Module version

$ python3 -m pip freeze | grep async-lru
async-lru==2.0.2

How to reproduce:

import asyncio
from async_lru import alru_cache


@alru_cache(ttl=60, maxsize=5)
async def cached_func(uuid: int):
    return uuid


async def main():
    tasks = [cached_func(i) for i in range(20)]
    cached_results = await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

Expected behavior
cached_results == list(range(20))

Actual behavior
Bunch of KeyErrors raised.

Proposed solution
Couldn't push it out as PR since there is no write access to the repo.

@@ -161,10 +161,12 @@ class _LRUCacheWrapper(Generic[_R]):
         if self.__ttl is not None:
             loop = asyncio.get_running_loop()
-            cache_item = self.__cache[key]
-            cache_item.later_call = loop.call_later(
-                self.__ttl, self.__cache.pop, key, None
-            )
+            # using safe `.get()` here since cached item might have been removed from cache due to maxsize already
+            cache_item = self.__cache.get(key)
+            if cache_item is not None:
+                cache_item.later_call = loop.call_later(
+                    self.__ttl, self.__cache.pop, key, None
+                )

decorator is causing '422 Unprocessable Entity' error in FastAPI+Swagger

I'm learning FastAPI. One of my API endpoints is using AsyncLRU for caching some results.
However if I use AsyncLRU decorator on API endpoint, FastAPI can't detect query parameters.
Swagger is showing *args, **kwards as query parameters.
using functools.wrap solves this problem.

class MyAsyncLRU(AsyncLRU):
    def __call__(self, func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            key = self._Key(args, kwargs)
            if key in self.lru:
                return self.lru[key]
            else:
                self.lru[key] = await func(*args, **kwargs)
                return self.lru[key]

        wrapper.__name__ += func.__name__

        return wrapper

Please make a new release

This package is now required by denonavr, and the current release version is 2 years old and broken. New release would be helpful.

A little sparse on documentation eg in README

What kinds of things can this cache be used for?
What should the example be expected to do?
What is the rationale behind the example?
etc

For example: how does this sound?

Asynchronous "Least Recently Used"-style cache for input/output use.
For example, using async-lru with resources available via HTTP allows for
those resources to be downloaded, cached and returned when remote HTTP
resources are actually available. async-lru helps programmers build a higher
level view of these network calls for APIs to be built on.

See example below:

( The webpage at "https://www.python.org/" is downloaded (via HTTP) and stored 6 times.
It is both run once, as a proof that you can control how many times a resource or set of resources are requested, or as many times as needed within a loop using run_until_complete() )

Distribute py.typed file for mypy typings

See:

Otherwise you get:

error: Skipping analyzing "async_lru": module is installed, but missing library stubs or py.typed marker  [import
note: See https://mypy.readthedocs.io/en/stable/running_mypy.html#missing-imports

Error when the cache is cleared before the done callback is called

I'm sometimes getting a KeyError on self.__tasks.remove(task) on line 165 in the init.py. I think it's because the cache_clear() method is called before the task finishes, which clears the __tasks set and then the task can no longer be found there. The code in the _task_done_callback method should be probably wrapped in the try/except block, or something like that.

[cpython 3.9] Decorating a classmethod doesn't work

It appears that alru_cache does not seem to work for classmethods. Here's a contrived example.

import asyncio

from async_lru import alru_cache


class Incrementer:
    inc = 1

    @classmethod
    @alru_cache
    async def increment(cls, num):
        return cls.inc + num


async def main():
    print(await Incrementer.increment(5))

asyncio.run(main())

Error:

TypeError: increment() missing 1 required positional argument: 'num'

I would expect it to work since lru_cache works for classmethods.

Signature of decorated function is not type checked by mypy

The following has mypy (expectedly) failing with error: Argument 1 to "f1" has incompatible type "int"; expected "str" [arg-type]

async def f1(x: str, y: int) -> str:
    return x + str(y)


async def main():
    await f1(1, 1)

But decorating f1 with @alru_cache prevents mypy from detecting any issue.

PytestUnraisableExceptionWarning

The tests are not passing.

============================= test session starts ==============================
platform linux -- Python 3.8.9, pytest-6.2.3, py-1.10.0, pluggy-0.13.1 -- /nix/store/q6gfck5czr67090pwm53xrdyhpg6bx67-python3-3.8.9/bin/python3.8
cachedir: .pytest_cache
rootdir: /build/source, configfile: setup.cfg
plugins: asyncio-0.14.0
collected 42 items                                                             
[...]

                        cm.unraisable.exc_type,
                        cm.unraisable.exc_value,
                        cm.unraisable.exc_traceback,
                    )
                )
>               warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
E               pytest.PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object test_alru_cache_deco_called.<locals>.coro at 0x7ffff5e05bc0>

Invalidate doesn't work in instance methods of classes

from async_lru import alru_cache
class X:
  @alru_cache(maxsize=None)
  async def fetch_member(self, x):
    return x * x

i = X()

for _ in range(100):
  await i.fetch_member(1)
  i.fetch_member.invalidate(1)
i.fetch_member.cache_info()

# Expectations
CacheInfo(hits=0, misses=100, maxsize=None, currsize=0)

# Got
CacheInfo(hits=99, misses=1, maxsize=None, currsize=1)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.