Hey there! Me again ๐ตโ๐ซ (btw - big fan of this and the FastStream project!)
Running into this issue in the context of FastStream.
The following code works as expected:
import os
import typing
from redis import Redis, ConnectionPool
from pydantic import BaseModel, Field
from faststream import FastStream, Depends, ContextRepo, Context
from faststream.kafka import KafkaBroker
from faststream.kafka.annotations import (
Logger,
KafkaBroker as BrokerAnnotation,
)
KAFKA_HOST = os.environ.get('KAFKA_HOST', 'localhost')
KAFKA_PORT = os.environ.get('KAFKA_PORT', '19092')
broker = KafkaBroker(f'{KAFKA_HOST}:{KAFKA_PORT}')
app = FastStream(broker)
class SimpleDependency(BaseModel):
id: int
def simple_dependency():
return SimpleDependency(id=1)
class SimpleMessage(BaseModel):
message: str
class ComplexMessage(SimpleMessage):
id: int = Field(..., gt=0)
message: str = Field(..., min_length=1)
class NestedMessage(BaseModel):
message: SimpleMessage
id: int
def inject_redis_client() -> Redis:
return Redis(
connection_pool=ConnectionPool(
host=os.environ.get('REDIS_HOST', 'localhost'),
port=int(os.environ.get('REDIS_PORT', '6379')),
db=int(os.environ.get('REDIS_DB', '0'))
)
)
@broker.subscriber('testing-a')
async def testing_a(
message: ComplexMessage,
logger: Logger,
redis: Redis = Depends(inject_redis_client),
dep: SimpleDependency = Depends(simple_dependency),
):
await broker.publish(
NestedMessage(
message=message,
id=dep.id
),
'testing-b'
)
@broker.subscriber('testing-b')
async def testing_b(
message: NestedMessage,
logger: Logger,
dep: SimpleDependency = Depends(simple_dependency),
):
logger.info(message)
logger.info(dep)
@app.after_startup
async def after_startup(
context: ContextRepo,
logger: Logger,
):
for i in range(0, 10):
await broker.publish(ComplexMessage(id=i+1, message=f'hello {i}'), 'testing-a')
testing_a correctly receives message: ComplexMessage and the redis client instance is available.
However, if change my inject_redis_client
function to accept an argument, for instance:
def inject_redis_client(
redis_db: typing.Optional[int] = None
) -> Redis:
return Redis(
connection_pool=ConnectionPool(
host=os.environ.get('REDIS_HOST', 'localhost'),
port=int(os.environ.get('REDIS_PORT', '6379')),
db=redis_db or int(os.environ.get('REDIS_DB', '0'))
)
)
Then when I run the app I get the following error:
2023-09-22 10:28:19,730 INFO - testing-a | 449-169540 - Received
2023-09-22 10:28:19,731 ERROR - testing-a | 449-169540 - ValidationError: 1 validation error for testing_a
message
Input should be a valid dictionary or instance of ComplexMessage [type=model_type, input_value='hello 9', input_type=str]
For further information visit https://errors.pydantic.dev/2.3/v/model_type
Traceback (most recent call last):
File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py", line 550, in log_wrapper
r = await func(message)
^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/faststream/kafka/broker.py", line 240, in process_wrapper
r = await self._execute_handler(func, message)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py", line 487, in _execute_handler
return await func(message)
^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/faststream/broker/core/asyncronous.py", line 415, in decode_wrapper
return await func(**msg)
^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/fast_depends/use.py", line 135, in injected_wrapper
r = await real_model.asolve(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/fast_depends/core/model.py", line 396, in asolve
final_args, final_kwargs = cast_gen.send(kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/fast_depends/core/model.py", line 211, in _solve
casted_model = self.model(**solved_kw)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/homebrew/Caskroom/miniforge/base/envs/example/lib/python3.11/site-packages/pydantic/main.py", line 165, in __init__
__pydantic_self__.__pydantic_validator__.validate_python(data, self_instance=__pydantic_self__)
pydantic_core._pydantic_core.ValidationError: 1 validation error for testing_a
message
Input should be a valid dictionary or instance of ComplexMessage [type=model_type, input_value='hello 9', input_type=str]
For further information visit https://errors.pydantic.dev/2.3/v/model_type
^C2023-09-22 10:28:21,580 INFO - FastStream app shutting down...
2023-09-22 10:28:21,581 INFO - FastStream app shut down gracefully.
In this context, I'm not passing a redis_db
argument to the testing_a function - but that shouldn't matter as it's optional.
Even if I update the function to set a value:
@broker.subscriber('testing-a')
async def testing_a(
message: ComplexMessage,
logger: Logger,
redis_db: int = 1,
redis: Redis = Depends(inject_redis_client),
dep: SimpleDependency = Depends(simple_dependency),
):
await broker.publish(
NestedMessage(
message=message,
id=dep.id
),
'testing-b'
)
I get the same issue.
I might not 100% be understanding how you're supposed to pass arguments to dependency injection functions but nothing I try seems to make a difference.