我是 Aber,字“子休”,自号“云鹤”。
最擅长的编程语言是 Python,偶尔写写 TypeScript、Rust、Lua、Go。
A ASGI Middleware to rate limit
License: Apache License 2.0
The default_429
function works fine when I launch my FastAPI app normally and send requests to it, but when I test it using FastAPI's (which is Starlette's) TestClient
, the first send
statement (await send({"type": "http.response.start", "status": 429})
) results in a KeyError. Normally when the first send
is called, the following headers are in the response in my app even if I don't specify a headers
parameter in send
:
{'date': 'Wed, 31 Mar 2021 17:24:52 GMT', 'server': 'uvicorn', 'Transfer-Encoding': 'chunked'}
But when the TestClient
is used to test the endpoints, there are no headers and Starlette raises the following exception:
message = {'status': 429, 'type': 'http.response.start'}
async def send(message: Message) -> None:
nonlocal raw_kwargs, response_started, response_complete, template, conext
if message["type"] == "http.response.start":
assert (
not response_started
), 'Received multiple "http.response.start" messages.'
raw_kwargs["version"] = 11
raw_kwargs["status"] = message["status"]
raw_kwargs["reason"] = _get_reason_phrase(message["status"])
raw_kwargs["headers"] = [
> (key.decode(), value.decode()) for key, value in message["headed"]
]
E KeyError: 'headers'
..\..\lib\site-pakages\starlette\testclient.py:208: KeyError
This is because when using TestClient, the message that goes through default_429
has no headers and results in an error. I easily solved this issue by supplying my own function for on_blocked
:
async def yourself_429(scope, receive, send) -> None:
body = json.dumps({"detail": "Too many requests"}).encode("utf8")
headers = [
(b"content-length", str(len(body)).encode("utf8")),
(b"content-type", b"application/json"),
]
await send({"type": "http.response.start", "status": 429, "headers": headers})
await send({"type": "http.response.body", "body": body, "more_body": False})
RateLimitMiddleware(..., on_blocked=yourself_429)
I don't know if this counts as a Starlette issue or an ASGI-Ratelimit one, but thought I should put this here in case someone else faces a similar error.
Access is denied for a short time after the current limit is triggered.
like this:
Rule(minute=30, block_time=60 * 10)
If there are more than 30 visits in one minute, then the visit will be denied directly for the next ten minutes.
In example at Readme:
from ratelimit.backends.redis import MemoryBackend, RedisBackend
For MemoryBackend
should be:
from ratelimit.backends.simple import MemoryBackend
I am working on implementing rate limiting with this library and ran into an issue when multiple endpoints share the same path but different methods. For example:
GET /towns
POST /towns
Using the pattern matching to set up limits would apply the same limit to both endpoints, but this is not desired.
...
config={
r"^/towns": [Rule(second=10)],
}
...
A workaround I have found is in the auth function, getting the method and then using the method in the group name.
async def AUTH_FUNCTION(scope: Scope) -> Tuple[str, str]:
...
method = scope['method'].lower()
...
return user_unique_id, f"{method}-groupname"
Limits per method can then be set by
...
config={
r"^/towns": [
Rule(group="get-groupname", second=10),
Rule(group="post-groupname", second=2),
],
}
...
In the rule.py, I see that the key in redis is
f"{path}:{user}:{name}": (limit, TTL[name])
Adding in the method to this key should take care of this on the backend
f"{path}:{method}:{user}:{name}": (limit, TTL[name])
And then in the Rule implementation, adding a field to specify method could look something like
Rule(group="groupname", method="get", second=10),
Rule(group="groupname", method="post", second=2),
Interested to hear some thoughts around this (and if i missed something obvious). Thanks
I think since the newest release 0.5.1 the https://github.com/abersheeran/asgi-ratelimit#custom-block-handler is not correct anymore. I'm not sure about it but it seems to me it has to be adapted to the new setup, at least I had to do it today at work on mine
I'll try to come up with a test and change docs when time permits if someone doesnt figure it out before L_)
This package looks promising, however I have little trouble understanding how to even start using it...
What/how should I resolve the user/group in AUTH_FUNCTION
? Sorry I don't get it from this documentation.
I also tried to use the built in session
auth method and it fails with AssertionError: Starlette SessionMiddleware must be installed to access request.session
FYI using the FastApi so code looks like this:
from fastapi import FastAPI
from ratelimit import RateLimitMiddleware, Rule
from ratelimit.auths.session import from_session
from ratelimit.backends.redis import RedisBackend
from starlette.middleware.sessions import SessionMiddleware
app = FastAPI()
app.add_middleware(
SessionMiddleware,
secret_key='aaa'
)
app.add_middleware(
RateLimitMiddleware,
authenticate=from_session,
backend=RedisBackend(),
config={
r"^/": [Rule(second=1), Rule(group="admin")],
},
)
this would be a good one to have:
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
``fake```
not sure if that should be considered as a bug but here is this config I was writing in tests to write custom rules ()
rate_limit = RateLimitMiddleware(
hello_world,
auth_func,
redisbackend(),
{
r"/multiple": [FixedRule(second=1, minute=3)],
r"/custom": [CustomRule(rules=[LimitFrequency(limit=3, granularity=2)])],
r"/multiple_custom": [CustomRule(rules=[LimitFrequency(limit=3, granularity=2), LimitFrequency(limit=4, granularity=3)])],
},
)
as the path matching breaks on the 1st match, if you send a
response = await client.get(
"/multiple_custom", headers={"user": "user", "group": "default"}
)
it will think you are using the FixedRule from the /multiple endpoint while in fact you wanted not.
should I use r"^/multiple$": [FixedRule(second=1, minute=3)],
instead the match does not occur as expected.
As a user it could be confusing.
@abersheeran This module doesn't work when I use
app.add_middleware(
RateLimitMiddleware,
authenticate=client_ip,
backend=RedisBackend(),
config={
r"^/": [Rule(second=1), Rule(group="admin")],
r"^/api": [Rule(minute=1), Rule(group="admin")],
},
)
I use https://pypi.org/project/secure/0.1.8/
and it returns error
response.raw_headers = message["headers"]
KeyError: 'headers'
I fixed it by adding any headers to the response
asgi-ratelimit/ratelimit/core.py
Line 54 in 86a8a0b
await send({"type": "http.response.start", "status": 429, 'headers': [(b'content-type', b'application/json')]})
Also how to custom error handler for 429? since its handled by asgi exception, I need to make the error response as JSON
thank you
When using Redis as a backend, the code doesn't handle any exceptions caused by the backend, i.e., Network/Connectivity related, which results in a 5xx internal server error
.
On backend exceptions, the endpoint should not fail and just let it do the standard processing and disregard rate-limiting.
Traceback (most recent call last):
File "../lib/python3.9/site-packages/redis/asyncio/connection.py", line 577, in connect
await self.retry.call_with_retry(
File "../lib/python3.9/site-packages/redis/asyncio/retry.py", line 59, in call_with_retry
return await do()
File "../lib/python3.9/site-packages/redis/asyncio/connection.py", line 922, in _connect
reader, writer = await asyncio.open_connection(
File "/opt/homebrew/Cellar/[email protected]/3.9.15/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/streams.py", line 52, in open_connection
transport, _ = await loop.create_connection(
File "uvloop/loop.pyx", line 2039, in create_connection
File "uvloop/loop.pyx", line 2016, in uvloop.loop.Loop.create_connection
ConnectionRefusedError: [Errno 61] Connection refused
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "../lib/python3.9/site-packages/ratelimit/core.py", line 95, in __call__
retry_after = await self.backend.retry_after(path, user, rule)
File "../lib/python3.9/site-packages/ratelimit/backends/redis.py", line 46, in retry_after
block_time = await self.is_blocking(user)
File "../lib/python3.9/site-packages/ratelimit/backends/redis.py", line 42, in is_blocking
a = await self._redis.ttl(f"blocking:{user}")
File "../python3.9/site-packages/redis/asyncio/client.py", line 513, in execute_command
conn = self.connection or await pool.get_connection(command_name, **options)
File "../lib/python3.9/site-packages/redis/asyncio/connection.py", line 1375, in get_connection
await connection.connect()
File "../lib/python3.9/site-packages/redis/asyncio/connection.py", line 585, in connect
raise ConnectionError(self._error_message(e))
redis.exceptions.ConnectionError: Error 61 connecting to 127.0.0.1:6379. 61.
Python version: 3.9
Library version: 0.10.0
Can I have a documentation so that I can use in FastApi application? Its difficult to figure out how to use this code from given usage.
For example:
RateLimiter(
backend="simple/ratelimiter.backend.simple",
...
)
Because, different env config has the same code.
In CI/CD, use the simple, but use the redis in production.
除了访问频率限制,是否可以出一个网速控制的中间件,针对某些接口,比如下载文件。
为丰富功能,还可以在限制网速上区分VIP用户和普通用户。
I found a little bug.. when using the RedisBackend
as a backend class, and when Redis is not running then no error will be raised in the app, but worst is that the rate limiter has no effect at all! All requests come through. Imagine my local Redis going down unexpectedly and my API becomes unlimited for everyone 😱
I looked at aredis
constructor for StrictRedis and it does not look like one can pass anything to check for available/existing connection.
Redis's official way of ensuring for connection is the ping command.
I think the best solution here is to allow users to pass already established Redis connection to backend
for RateLimitMiddleware
?
Or do you have a better idea?
Something like:
from aredis import StrictRedis
from fastapi import FastAPI
from ratelimit import RateLimitMiddleware
app = FastAPI()
my_redis_conn = StrictRedis()
@app.on_event("startup")
async def startup():
# either command will raise ConnectionError
await my_redis_conn.get(None)
# or await my_redis_conn.ping()
app.add_middleware(
RateLimitMiddleware,
authenticate=my_auth,
backend=RedisBackend(conn=my_redis_conn),
config={
...
},
)
With passing my_redis_conn
we're reusing the connection - no need to make another one.
Obviously, there's some little edge case scenario. E.g. Redis & app is running, and then Redis goes down after X days & the rate limiter will again allow all requests.
So maybe it would be better to overwrite allow_request for Redis Backend?
I successfully set up the rate limiter for my app with JWT
auth and RedisBacked
, but when I test it with requests that miss the Authorization
header, the error isn't picked up by FastAPI and instead it results in an Internal Server Error
.
I tried adding an exception handler for EmptyInformation
to send a 401 response back to the user, but the exception won't get picked up by the exception handler and still results in an Internal Server Error. Am I missing something here? What should I do to handle the EmptyInformation
exception?
@app.exception_handler(EmptyInformation)
async def unicorn_exception_handler(request: Request, exc: EmptyInformation):
return JSONResponse(
status_code=401,
content={"detail": "Unauthorized access."},
)
Code:
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from ratelimit import RateLimitMiddleware, Rule
from ratelimit.auths.jwt import create_jwt_auth
from ratelimit.auths import EmptyInformation
from ratelimit.backends.redis import RedisBackend
SECRET_KEY = "SOME_SECRET_KEY"
ALGORITHM = "HS256"
app = FastAPI()
app.add_middleware(
RateLimitMiddleware,
authenticate=create_jwt_auth(key=SECRET_KEY, algorithms=[ALGORITHM]),
backend=RedisBackend(),
config={
r"^/$": [Rule(second=5, group="default"), Rule(group="unlimited")],
},
)
# @app.exception_handler(EmptyInformation)
# async def emptyinformation_exception_handler(request: Request, exc: EmptyInformation):
# return JSONResponse(
# status_code=401,
# content={"detail": "Unauthorized access."},
# )
@app.get("/")
async def root():
return {"message": "Hello World"}
Version info
To reproduce:
pip install asgi-ratelimiter[full]
uvicorn main:app
It has been a while since we merged in the new feature for rate limiting by method #66 #67
Just checking in to see if we could get this new version released, my use case would really appreciate this feature for a cleaner implementation. Thanks @abersheeran
Please consider this a Q or a feature request
I have 3 endpoints
from typing import Tuple
from fastapi import FastAPI
from ratelimit import RateLimitMiddleware, Rule
from ratelimit.backends.redis import RedisBackend
from ratelimit.types import Scope
app = FastAPI()
async def auth_func(scope: Scope) -> Tuple[str, str]:
print('user1', 'customer1')
return ('user1', 'customer1')
app.add_middleware(
RateLimitMiddleware,
authenticate = auth_func,
backend=RedisBackend(host='localhost', port=6379),
config={
r"^/message": [Rule(minute=2, group='customer1')],
r"^/": [Rule(minute=1, group='customer1')],
},
)
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/hi")
async def root():
return {"message": "Hi World"}
@app.get("/message")
async def root():
return {"message": "message"}
I would like to handle /message
separately and the others separately as a group.
Ex- if there is 1 request to either of /
or /hi
then it should not allow another for that period(1 min) to either of them.
As far as I can understand the code base, we need another backend? or should it be another config parser?
Thoughts?
so far we can have a custom 429 but only one,
what would be cool is to have a custom 429 per limit, e.g.
429 "you reached your daily limit" if the daily limit is passed
however I wonder at what level this kind of config would make sense
would be neat as a backend
https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm
I try to follow FastAPI docs on testing,
and I'm getting RuntimeError: Event loop is closed
error when I have ratelimiter enabled.
In main.py
:
async def _dummy_auth_function(scope: Scope) -> tuple[str, str]:
return "dummy_uid", "default"
app.add_middleware(
RateLimitMiddleware,
authenticate=_dummy_auth_function,
backend=RedisBackend(StrictRedis(
...
)),
config={
r"^/v3/": [Rule(minute=1000, second=20)], # <-- Works when this is deleted
},
)
My test_ping.py
file:
import pytest
from fastapi.testclient import TestClient
from .main import app
client = TestClient(app)
@pytest.mark.integration()
def test_ping():
response = client.get("/v3/test/ping")
response = client.get("/v3/test/ping") # <-- this doesn't work
assert response.status_code == 200
This code fails with Event loop is closed
on second request,
unless the endpoint is not ratelimited.
@abersheeran please.
By now when 'ssl=true', there is no more param to give for ssl details, like 'ssl_keyfile'/'ssl_certfile'.
reference: https://github.com/aio-libs/aioredis-py/blob/master/aioredis/client.py line850-855 (also support in redis 4.2.0rc1+)
using https://indominusbyte.github.io/fastapi-jwt-auth/ for JWT auth. My requirement is to ratelimit users based on client ip address not having token . For endpoint having access token , user_id is in access token and want to ratelimit that user_id. While using another ratelimiter slowapi, folllowing code worked fine. How to define auth_function while using this ratelimit.
from fastapi_jwt_auth import AuthJWT
from starlette.requests import Request
def get_user_id_or_ip(request: Request):
authorize = AuthJWT(request) # initial instance fastapi-jwt-auth
try:
# If JWT Token is present then get_jwt_object otherwise return client IP address
authorize.jwt_optional() # for validation jwt token
return decrypt_data(authorize.get_jwt_subject()) or request.client.host
except AuthJWTException:
return request.client.host
In main.py (using fastApi)
app.add_middleware(
RateLimitMiddleware,
authenticate=get_user_id_or_ip,
backend=RedisBackend(),
config={
r"^/towns": [Rule(second=1, group="default"), Rule(group="admin")],
r"^/forests": [Rule(minute=1, group="default"), Rule(group="admin")],
},
)
Greetings and thanks for this package !
Maybe there is something I understand incorrectly but I thought reading the https://github.com/abersheeran/asgi-ratelimit#customizable-rules link that I could set multiple limits.
If I set this rule for instance :
r"/multiple": [Rule(second=1, hour=2)]
My understanding is that it will allow up to 1 request per second and no more than 2 requests per hour, for everyone, for the /multiple
endpoint.
So if my understanding is correct then this does not return a 429 after the 2nd hit,
I wrote a small test case here that shows it : https://github.com/euri10/asgi-ratelimit/blob/b00fea45c6f78b31b5664f53eee1d6dd811dff62/tests/backends/test_redis.py#L124-L143
Hi!
Is there a way to access hit/miss ratio?
It might be a valuable metric to export via starlette_exporter or similar library
I'm having a rather custom requirement to exclude (not count the rate limit) against certain URL paths.
For example given the endpoint
domain.com/api/find/:country
I'd like to exclude some paths then, e.g.:
config={
r"^/api/find/germany": [ # do nothing ],
r"^/api/find/belgium": [
Rule(minute=50, group="belgium")
],
...
}
I think it's not currently possible? After looking at the code.
Do you think such functionality could benefit other users?
During some experimenting with settings it appeared that creating a Rule(hour=0, group="blocked", zone="sensitive_api")
did not block the client immediately, but allowed accessing the zone once before the block took hold. This seems to be because of how MemoryBackend
is implemented presuming that minimum limit amount is 1
and after setting the new limit counter for the client postponing the validity check to next round of access. This makes limits 0
and 1
equivalent in practise.
Setting zero rate seems to be supported in implementation of RedisBackend
though. If I am correct about differences in implementations I suppose unifying the behaviour would make sense?
In a broader perspective, permanently blocking users might not be part of rate limiting, which is all about 429 and not so much about 401 or 403. However in some cases it is not meaningful to consider blocking independently from rate limiting, duplicating extra layer of URL patters and zones, especially if there is no separate authentication mechanism in place and blocking happens in scope of ensuring sanity of API exploitation, for example, blocking because repeatedly failing to conform to limits, concurrently exceeding multiple limits, spamming with repeated or malformed content etc.
I think besides 0=1 issue above, zero rating works as expected in current implementation, but there might be some additional questions of Retry-After
giving meaningful instructions, for example in case of Rule(second=0)
.
maybe can direct the user
and group
in header or session
def create_jwt_token(key, alg, user_key="user", group_key="group"):
pass
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.