Coder Social home page Coder Social logo

dialoguemd / fastapi-sqla Goto Github PK

View Code? Open in Web Editor NEW
172.0 27.0 11.0 562 KB

SQLAlchemy extension for FastAPI with support for pagination, asyncio, SQLModel and pytest, ready for production.

License: MIT License

Python 100.00%
fastapi fastapi-sqlalchemy sqlalchemy fastapi-sqla library

fastapi-sqla's Introduction

Fastapi-SQLA

codecov CircleCI PyPI Conventional Commits Code style: black

Fastapi-SQLA is an SQLAlchemy extension for FastAPI easy to setup with support for pagination, asyncio, SQLModel and pytest. It supports SQLAlchemy>=1.3 and is fully compliant with SQLAlchemy 2.0. It is developped, maintained and used on production by the team at @dialoguemd with love from Montreal ๐Ÿ‡จ๐Ÿ‡ฆ.

Installing

Using pip:

pip install fastapi-sqla

Note that you need a SQLAlchemy compatible engine adapter. We test with psycopg2 which you can install using the psycopg2 extra.

Quick Example

Assuming it runs against a DB with a table user with 3 columns, id, name and unique email:

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from fastapi_sqla import Base, Item, Page, Paginate, Session, setup_middlewares, startup
from pydantic import BaseModel, EmailStr
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError


@asynccontextmanager
async def lifespan(app: FastAPI):
    await startup()
    yield


app = FastAPI(lifespan=lifespan)
setup_middlewares(app)


class User(Base):
    __tablename__ = "user"


class UserIn(BaseModel):
    name: str
    email: EmailStr


class UserModel(UserIn):
    id: int

    class Config:
        orm_mode = True


@app.get("/users", response_model=Page[UserModel])
def list_users(paginate: Paginate):
    return paginate(select(User))


@app.get("/users/{user_id}", response_model=Item[UserModel])
def get_user(user_id: int, session: Session):
    user = session.get(User, user_id)
    if user is None:
        raise HTTPException(404)
    return {"data": user}


@app.post("/users", response_model=Item[UserModel])
def create_user(new_user: UserIn, session: Session):
    user = User(**new_user.model_dump())
    session.add(user)
    try:
        session.flush()
    except IntegrityError:
        raise HTTPException(409, "Email is already taken.")
    return {"data": user}

Creating a db using sqlite3:

sqlite3 db.sqlite <<EOF
CREATE TABLE user (
    id    INTEGER PRIMARY KEY AUTOINCREMENT,
    email TEXT NOT NULL,
    name  TEXT NOT NULL
);
CREATE UNIQUE INDEX user_email_idx ON user (email);
EOF

Running the app:

sqlalchemy_url=sqlite:///db.sqlite?check_same_thread=false uvicorn main:app

Configuration

Environment variables:

The keys of interest in os.environ are prefixed with sqlalchemy_. Each matching key (after the prefix is stripped) is treated as though it were the corresponding keyword argument to sqlalchemy.create_engine call.

The only required key is sqlalchemy_url, which provides the database URL, example:

export sqlalchemy_url=postgresql://postgres@localhost

Multi-session support

In order to configure multiple sessions for the application, set the environment variables with this prefix format: fastapi_sqla__MY_KEY__.

As with the default session, each matching key (after the prefix is stripped) is treated as though it were the corresponding keyword argument to sqlalchemy.create_engine call.

For example, to configure a session with the read_only key:

export fastapi_sqla__read_only__sqlalchemy_url=postgresql://postgres@localhost

asyncio support using asyncpg

SQLAlchemy >= 1.4 supports asyncio. To enable asyncio support against a Postgres DB, install asyncpg:

pip install asyncpg

And define the environment variable sqlalchemy_url with postgres+asyncpg scheme:

export sqlalchemy_url=postgresql+asyncpg://postgres@localhost

Setup the app AsyncContextManager (recommended):

import fastapi_sqla
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    await fastapi_sqla.startup()
    yield


app = FastAPI(lifespan=lifespan)
fastapi_sqla.setup_middlewares(app)

Setup the app using startup/shutdown events (deprecated):

import fastapi_sqla
from fastapi import FastAPI

app = FastAPI()
fastapi_sqla.setup(app)

SQLAlchemy

Adding a new entity class:

from fastapi_sqla import Base


class Entity(Base):
    __tablename__ = "table-name-in-db"

Getting an sqla session

Using dependency injection

Use FastAPI dependency injection to get a session as a parameter of a path operation function.

The SQLAlchemy session is committed before the response is returned or rollbacked if any exception occurred:

from fastapi import APIRouter
from fastapi_sqla import AsyncSession, Session

router = APIRouter()


@router.get("/example")
def example(session: Session):
    return session.execute("SELECT now()").scalar()


@router.get("/async_example")
async def async_example(session: AsyncSession):
    return await session.scalar("SELECT now()")

In order to get a session configured with a custom key:

from typing import Annotated

from fastapi import APIRouter, Depends
from fastapi_sqla import (
    AsyncSessionDependency,
    SessionDependency,
    SqlaAsyncSession,
    SqlaSession,
)

router = APIRouter()


# Preferred

ReadOnlySession = Annotated[SqlaSession, Depends(SessionDependency(key="read_only"))]
AsyncReadOnlySession = Annotated[
    SqlaAsyncSession, Depends(AsyncSessionDependency(key="read_only"))
]

@router.get("/example")
def example(session: ReadOnlySession):
    return session.execute("SELECT now()").scalar()


@router.get("/async_example")
async def async_example(session: AsyncReadOnlySession):
    return await session.scalar("SELECT now()")


# Alternative

@router.get("/example/alt")
def example_alt(session: SqlaSession = Depends(SessionDependency(key="read_only"))):
    return session.execute("SELECT now()").scalar()


@router.get("/async_example/alt")
async def async_example_alt(
    session: SqlaAsyncSession = Depends(AsyncSessionDependency(key="read_only")),
):
    return await session.scalar("SELECT now()")

Using a context manager

When needing a session outside of a path operation, like when using FastAPI background tasks, use fastapi_sqla.open_session context manager. The SQLAlchemy session is committed when exiting context or rollbacked if any exception occurred:

from fastapi import APIRouter, BackgroundTasks
from fastapi_sqla import open_async_session, open_session

router = APIRouter()


@router.get("/example")
def example(bg: BackgroundTasks):
    bg.add_task(run_bg)
    bg.add_task(run_async_bg)


def run_bg():
    with open_session() as session:
        session.execute("SELECT now()").scalar()

def run_bg_with_key():
    with open_session(key="read_only") as session:
        session.execute("SELECT now()").scalar()

async def run_async_bg():
    async with open_async_session() as session:
        await session.scalar("SELECT now()")

async def run_async_bg_with_key():
    async with open_async_session(key="read_only") as session:
        await session.scalar("SELECT now()")

Pagination

from fastapi import APIRouter
from fastapi_sqla import Base, Page, Paginate
from pydantic import BaseModel
from sqlalchemy import select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str

    class Config:
        orm_mode = True


@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: Paginate):
    return paginate(select(User))

By default:

  • It returns pages of 10 items, up to 100 items;

  • Total number of items in the collection is queried using Query.count.

  • Response example for /users?offset=40&limit=10:

    {
        "data": [
            {
                "id": 41,
                "name": "Pat Thomas"
            },
            {
                "id": 42,
                "name": "Mulatu Astatke"
            }
        ],
        "meta": {
            "offset": 40,
            "total_items": 42,
            "total_pages": 5,
            "page_number": 5
        }
    }

Paginating non-scalar results

To paginate a query which doesn't return scalars, specify scalars=False when invoking paginate:

from fastapi import APIRouter
from fastapi_sqla import Base, Page, Paginate
from pydantic import BaseModel
from sqlalchemy import func, select
from sqlalchemy.orm import relationship

router = APIRouter()


class User(Base):
    __tablename__ = "user"
    notes = relationship("Note")


class Note(Base):
    __tablename__ = "note"


class UserModel(BaseModel):
    id: int
    name: str
    notes_count: int


@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: Paginate):
    query = (
        select(User.id, User.name, func.count(Note.id).label("notes_count"))
        .join(Note)
        .group_by(User)
    )
    return paginate(query, scalars=False)

Customize pagination

You can customize:

  • Minimum and maximum number of items per pages;
  • How the total number of items in the collection is queried;

To customize pagination, create a dependency using fastapi_sqla.Pagination:

from fastapi import APIRouter, Depends
from fastapi_sqla import Base, Page, Pagination, Session
from pydantic import BaseModel
from sqlalchemy import func, select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str


def query_count(session: Session) -> int:
    return session.execute(select(func.count()).select_from(User)).scalar()


CustomPaginate = Pagination(min_page_size=5, max_page_size=500, query_count=query_count)


@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: CustomPaginate = Depends()):
    return paginate(select(User))

Async pagination

When using the asyncio support, use the AsyncPaginate dependency:

from fastapi import APIRouter
from fastapi_sqla import Base, Page, AsyncPaginate
from pydantic import BaseModel
from sqlalchemy import select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str

    class Config:
        orm_mode = True


@router.get("/users", response_model=Page[UserModel])
async def all_users(paginate: AsyncPaginate):
    return await paginate(select(User))

Customize pagination by creating a dependency using fastapi_sqla.AsyncPagination:

from fastapi import APIRouter, Depends
from fastapi_sqla import Base, Page, AsyncPagination, AsyncSession
from pydantic import BaseModel
from sqlalchemy import func, select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str


async def query_count(session: AsyncSession) -> int:
    result = await session.execute(select(func.count()).select_from(User))
    return result.scalar()


CustomPaginate = AsyncPagination(min_page_size=5, max_page_size=500, query_count=query_count)


@router.get("/users", response_model=Page[UserModel])
async def all_users(paginate: CustomPaginate = Depends()):
    return await paginate(select(User))

Multi-session support

Pagination supports multiple sessions as well. To paginate using a session configured with a custom key:

from typing import Annotated

from fastapi import APIRouter, Depends
from fastapi_sqla import (
    AsyncPaginateSignature,
    AsyncPagination,
    Base,
    Page,
    PaginateSignature,
    Pagination,
)
from pydantic import BaseModel
from sqlalchemy import func, select

router = APIRouter()


class User(Base):
    __tablename__ = "user"


class UserModel(BaseModel):
    id: int
    name: str


# Preferred

ReadOnlyPaginate = Annotated[
    PaginateSignature, Depends(Pagination(session_key="read_only"))
]
AsyncReadOnlyPaginate = Annotated[
    AsyncPaginateSignature, Depends(AsyncPagination(session_key="read_only"))
]

@router.get("/users", response_model=Page[UserModel])
def all_users(paginate: ReadOnlyPaginate):
    return paginate(select(User))

@router.get("/async_users", response_model=Page[UserModel])
async def async_all_users(paginate: AsyncReadOnlyPaginate):
    return await paginate(select(User))


# Alternative

@router.get("/users/alt", response_model=Page[UserModel])
def all_users_alt(
    paginate: PaginateSignature = Depends(
        Pagination(session_key="read_only")
    ),
):
    return paginate(select(User))

@router.get("/async_users/alt", response_model=Page[UserModel])
async def async_all_users_alt(
    paginate: AsyncPaginateSignature = Depends(
        AsyncPagination(session_key="read_only")
    ),
):
    return await paginate(select(User))

SQLModel support ๐ŸŽ‰

If your project uses SQLModel, then Session dependency is an SQLModel session::

    from http import HTTPStatus

    from fastapi import FastAPI, HTTPException
    from fastapi_sqla import Item, Page, Paginate, Session, setup
    from sqlmodel import Field, SQLModel, select

    class Hero(SQLModel, table=True):
        id: int | None = Field(default=None, primary_key=True)
        name: str
        secret_name: str
        age: int | None = None


    app = FastAPI()
    setup(app)

    @app.get("/heros", response_model=Page[Hero])
    def list_hero(paginate: Paginate) -> Page[Hero]:
        return paginate(select(Hero))


    @app.get("/heros/{hero_id}", response_model=Item[Hero])
    def get_hero(hero_id: int, session: Session) -> Item[Hero]:
        hero = session.get(Hero, hero_id)
        if hero is None:
            raise HTTPException(HTTPStatus.NOT_FOUND)
        return {"data": hero}

Pytest fixtures

This library provides a set of utility fixtures, through its PyTest plugin, which is automatically installed with the library. Using the plugin requires the pytest_plugin extra.

By default, no records are actually written to the database when running tests. There currently is no way to change this behaviour.

sqla_modules

You must define this fixture, in order for the plugin to reflect table metadata in your SQLAlchemy entities. It should just import all of the application's modules which contain SQLAlchemy models.

Example:

# tests/conftest.py
from pytest import fixture


@fixture
def sqla_modules():
    from app import sqla  # noqa

db_url

The DB url to use.

When CI key is set in environment variables, it defaults to using postgres as the host name:

postgresql://postgres@postgres/postgres

In other cases, the host is set to localhost:

postgresql://postgres@localhost/postgres

Of course, you can override it by overloading the fixture:

from pytest import fixture


@fixture(scope="session")
def db_url():
    return "postgresql://postgres@localhost/test_database"

async_sqlalchemy_url

DB url to use when using asyncio support. Defaults to db_url fixture with postgresql+asyncpg:// scheme.

session & async_session

Sqla sessions to create db fixture:

  • All changes done at test setup or during the test are rollbacked at test tear down;
  • No record will actually be written in the database;
  • Changes in one regular session need to be committed to be available from other regular sessions;
  • Changes in one async session need to be committed to be available from other async sessions;
  • Changes from regular sessions are not available from async session and vice-versa even when committed;

Example:

from pytest import fixture


@fixture
def patient(session):
    from er.sqla import Patient
    patient = Patient(first_name="Bob", last_name="David")
    session.add(patient)
    session.commit()
    return patient


@fixture
async def doctor(async_session):
    from er.sqla import Doctor
    doctor = Doctor(name="who")
    async_session.add(doctor)
    await async_session.commit()
    return doctor

db_migration

A session scope fixture that runs alembic upgrade at test session setup and alembic downgrade at tear down.

It depends on alembic_ini_path fixture to get the path of alembic.ini file.

To use in a test or test module:

from pytest import mark

pytestmark = mark.usefixtures("db_migration")

To use globally, add to pytest options:

[pytest]
usefixtures =
    db_migration

Or depends on it in top-level conftest.py and mark it as auto-used:

from pytest import fixture


@fixture(scope="session", autouse=True)
def db_migration(db_migration):
    pass

alembic_ini_path

It returns the path of alembic.ini configuration file. By default, it returns ./alembic.ini.

Development

Prerequisites

  • Python >=3.9
  • Poetry to install package dependencies.
  • A postgres DB reachable at postgresql://postgres@localhost/postgres

Setup

$ poetry install --all-extras

Running tests

$ poetry run pytest

Runing tests on multiple environments

$ poetry run tox

fastapi-sqla's People

Contributors

alixlahuec avatar arththebird avatar ecotg avatar edouardlp avatar github-renovate-self-hosted avatar hadrien avatar mbelang avatar renovate[bot] avatar tearf001 avatar vicrep avatar vtremblay avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fastapi-sqla's Issues

Action Required: Fix Renovate Configuration

There is an error with this repository's Renovate configuration that needs to be fixed. As a precaution, Renovate will stop PRs until it is resolved.

Error type: Cannot find preset's package (local>dialoguemd/renovate-config)

How to use this package with multiple DBs?

Could not find documentation about whether this is or isn't supported. It would be great if I could get some guidance on how I can set up my service to use 2+ DBs. I specifically need to read from one Postgres DB and write to two Postgres DBs.

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

Detected dependencies

circleci
.circleci/config.yml
  • base 1.23.1
  • release 2.19.1
  • python 3.22.5
  • utils 3.19.1
  • codecov 4.1.0
  • postgres 16.3
poetry
pyproject.toml
  • python >=3.9,<3.13
  • fastapi >=0.95.1,<0.112
  • pydantic >=1,<3
  • sqlalchemy >=1.3,<3
  • structlog >=20,<25
  • deprecated >=1.2,<2
  • alembic >=1.4.3,<2
  • asyncpg >=0.28.0,<0.30.0
  • boto3 >=1.24.74,<2
  • psycopg2 >=2.8.6,<3
  • sqlmodel >=0.0.14,<0.0.21
  • alembic 1.13.2
  • asgi_lifespan 2.1.0
  • Faker 26.0.0
  • greenlet 3.0.3
  • httpx 0.27.0
  • mypy 1.10.1
  • psycopg2 2.9.9
  • pytest 8.2.2
  • pytest-asyncio 0.23.7
  • pytest-cov 5.0.0
  • ruff 0.5.2
  • tox 4.16.0

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Rate-Limited

These updates are currently rate-limited. Click on a checkbox below to force their creation now.

  • fix(deps): update dependency boto3 to v1.26.91
  • fix(deps): update dependency greenlet to v1.1.3
  • fix(deps): update dependency pydantic to v1.10.6
  • fix(deps): update dependency pytest to v7.2.2
  • fix(deps): update dependency sqlalchemy to v2.0.6
  • chore(deps): update cimg/python docker tag to v3.11
  • chore(deps): update postgres docker tag to v14.7
  • fix(deps): update dependency alembic to v1.10.2
  • fix(deps): update dependency asyncpg to ^0.27.0
  • fix(deps): update dependency fastapi to v0.94.1
  • fix(deps): update dependency isort to v5.12.0
  • fix(deps): update dependency pytest-asyncio to ^0.20.0
  • chore(deps): update postgres docker tag to v15
  • fix(deps): update dependency asgi_lifespan to v2
  • fix(deps): update dependency black to v23
  • fix(deps): update dependency faker to v17
  • fix(deps): update dependency greenlet to v2
  • fix(deps): update dependency mypy to v1
  • fix(deps): update dependency pytest-cov to v4
  • fix(deps): update dependency tox to v4
  • chore(deps): lock file maintenance
  • ๐Ÿ” Create all rate-limited PRs at once ๐Ÿ”

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

Detected dependencies

circleci
.circleci/config.yml
  • base 1.15.1
  • release 2.9.1
  • python 3.8.3
  • utils 3.10.1
  • cimg/python 3.10
  • postgres 14.4
poetry
pyproject.toml
  • fastapi >=0.61
  • pydantic <2
  • sqlalchemy >=1.3
  • structlog >=20
  • alembic ^1.4.3
  • asgi_lifespan ^1.0.1
  • asyncpg ^0.25.0
  • black ^22.8.0
  • Faker ^14.2.0
  • httpx ^0.23.0
  • isort ^5.5.3
  • pdbpp ^0.10.2
  • psycopg2 ^2.8.6
  • pylama ^8.4.1
  • pytest ^7.2.1
  • pytest-asyncio ^0.19.0
  • pytest-cov ^2.10.1
  • tox ^3.26.0
  • boto3 ^1.24.74
  • greenlet ^1.1.3
  • mypy ^0.991

  • Check this box to trigger a request for Renovate to run again on this repository

Dependency Dashboard

This issue lists Renovate updates and detected dependencies. Read the Dependency Dashboard docs to learn more.

Rate-Limited

These updates are currently rate-limited. Click on a checkbox below to force their creation now.

  • fix(deps): update dependency boto3 to v1.26.138
  • fix(deps): update dependency greenlet to v1.1.3
  • fix(deps): update dependency psycopg2 to v2.9.6
  • fix(deps): update dependency pydantic to v1.10.7
  • fix(deps): update dependency sqlalchemy to v2.0.15
  • chore(deps): update cimg/python docker tag to v3.11
  • chore(deps): update postgres docker tag to v14.8
  • fix(deps): update dependency alembic to v1.11.1
  • fix(deps): update dependency asyncpg to ^0.27.0
  • fix(deps): update dependency fastapi to v0.95.2
  • fix(deps): update dependency httpx to ^0.24.0
  • fix(deps): update dependency isort to v5.12.0
  • fix(deps): update dependency pytest to v7.3.1
  • fix(deps): update dependency pytest-asyncio to ^0.21.0
  • chore(deps): update postgres docker tag to v15
  • fix(deps): update dependency asgi_lifespan to v2
  • fix(deps): update dependency black to v23
  • fix(deps): update dependency faker to v18
  • fix(deps): update dependency greenlet to v2
  • fix(deps): update dependency mypy to v1
  • fix(deps): update dependency pytest-cov to v4
  • fix(deps): update dependency structlog to v23
  • fix(deps): update dependency tox to v4
  • chore(deps): lock file maintenance
  • ๐Ÿ” Create all rate-limited PRs at once ๐Ÿ”

Open

These updates have all been created already. Click a checkbox below to force a retry/rebase of any.

Detected dependencies

circleci
.circleci/config.yml
  • base 1.17.1
  • release 2.12.1
  • python 3.11.1
  • utils 3.12.1
  • cimg/python 3.10
  • postgres 14.4
poetry
pyproject.toml
  • fastapi >=0.61
  • pydantic <2
  • sqlalchemy >=1.3
  • structlog >=20
  • alembic ^1.4.3
  • asgi_lifespan ^1.0.1
  • asyncpg ^0.25.0
  • black ^22.8.0
  • Faker ^14.2.0
  • httpx ^0.23.0
  • isort ^5.5.3
  • pdbpp ^0.10.2
  • psycopg2 ^2.8.6
  • pylama ^8.4.1
  • pytest ^7.2.1
  • pytest-asyncio ^0.19.0
  • pytest-cov ^2.10.1
  • tox ^3.26.0
  • boto3 ^1.24.74
  • greenlet ^1.1.3
  • mypy ^0.991

Enabling shared in-memory sqlite db to be shared between sync and async engines/sessions

So I just discovered your repo here and it's been very useful considering how difficult it can be to marry sqlalchemy with an async framework and pytest with integration tests running against a sqlite in-memory database. I think many of these issues come from various gotchas around async pytest fixtures, async context managers, and ContextVars, so having such a reference known to work correctly has been really valuable.

I wanted to reciprocate by sharing a method which allows for multiple sqlalchemy engines to share the same underlying in-memory database and cache. It utilizes sqlite's uri filename feature to define a virtual filename handle (no files are created) with a shared cache. Whenever you use the same virtual filename handle and set cache=shared, they map to the same in-memory database. I tend to use this when I want both async and sync engines to see the updates being made from the other, and it supports savepoint based subtransactions. It's really quite simple to implement.

  1. synchronous engine should define the database uri as: "sqlite:///file:temp.db?mode=memory&cache=shared&uri=true".
    create_engine("sqlite:///file:temp.db?mode=memory&cache=shared&uri=true")
  2. asynchronous engine should define the database uri as: "sqlite+aiosqlite:///file:temp.db?mode=memory&cache=shared&uri=true".
    create_async_engine("sqlite+aiosqlite:///file:temp.db?mode=memory&cache=shared&uri=true")

Some details can be read at the following resources:

SQLAlchemy async support issues on fastapi_sqla.setup(app)

When trying to call setup to set a postgres database in async mode the code is not able to work.
main.py:

import fastapi_sqla
from fastapi import FastAPI

app = FastAPI()
fastapi_sqla.setup(app)

I'm getting the following error:

zoterofast-ZF-Server-1  | INFO:     Started server process [10]
zoterofast-ZF-Server-1  | INFO:     Waiting for application startup.
zoterofast-ZF-Server-1  | ERROR:    Traceback (most recent call last):
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 621, in lifespan
zoterofast-ZF-Server-1  |     async with self.lifespan_context(app):
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 518, in __aenter__
zoterofast-ZF-Server-1  |     await self._router.startup()
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/starlette/routing.py", line 600, in startup
zoterofast-ZF-Server-1  |     handler()
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/fastapi_sqla/__init__.py", line 73, in startup
zoterofast-ZF-Server-1  |     Base.prepare(engine)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/ext/declarative/extensions.py", line 392, in prepare
zoterofast-ZF-Server-1  |     with inspection.inspect(engine)._inspection_context() as insp:
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/inspection.py", line 64, in inspect
zoterofast-ZF-Server-1  |     ret = reg(subject)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py", line 182, in _engine_insp
zoterofast-ZF-Server-1  |     return Inspector._construct(Inspector._init_engine, bind)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py", line 117, in _construct
zoterofast-ZF-Server-1  |     init(self, bind)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/reflection.py", line 128, in _init_engine
zoterofast-ZF-Server-1  |     engine.connect().close()
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3210, in connect
zoterofast-ZF-Server-1  |     return self._connection_cls(self, close_with_result=close_with_result)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 96, in __init__
zoterofast-ZF-Server-1  |     else engine.raw_connection()
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3289, in raw_connection
zoterofast-ZF-Server-1  |     return self._wrap_pool_connect(self.pool.connect, _connection)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 3256, in _wrap_pool_connect
zoterofast-ZF-Server-1  |     return fn()
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 310, in connect
zoterofast-ZF-Server-1  |     return _ConnectionFairy._checkout(self)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 868, in _checkout
zoterofast-ZF-Server-1  |     fairy = _ConnectionRecord.checkout(pool)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 476, in checkout
zoterofast-ZF-Server-1  |     rec = pool._do_get()
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 145, in _do_get
zoterofast-ZF-Server-1  |     with util.safe_reraise():
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
zoterofast-ZF-Server-1  |     compat.raise_(
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
zoterofast-ZF-Server-1  |     raise exception
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/impl.py", line 143, in _do_get
zoterofast-ZF-Server-1  |     return self._create_connection()
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 256, in _create_connection
zoterofast-ZF-Server-1  |     return _ConnectionRecord(self)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 371, in __init__
zoterofast-ZF-Server-1  |     self.__connect()
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 665, in __connect
zoterofast-ZF-Server-1  |     with util.safe_reraise():
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
zoterofast-ZF-Server-1  |     compat.raise_(
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 207, in raise_
zoterofast-ZF-Server-1  |     raise exception
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/pool/base.py", line 661, in __connect
zoterofast-ZF-Server-1  |     self.dbapi_connection = connection = pool._invoke_creator(self)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/create.py", line 590, in connect
zoterofast-ZF-Server-1  |     return dialect.connect(*cargs, **cparams)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 597, in connect
zoterofast-ZF-Server-1  |     return self.dbapi.connect(*cargs, **cparams)
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 777, in connect
zoterofast-ZF-Server-1  |     await_only(self.asyncpg.connect(*arg, **kw)),
zoterofast-ZF-Server-1  |   File "/usr/local/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 67, in await_only
zoterofast-ZF-Server-1  |     raise exc.MissingGreenlet(
zoterofast-ZF-Server-1  | sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/14/xd2s)
zoterofast-ZF-Server-1  | 
zoterofast-ZF-Server-1  | ERROR:    Application startup failed. Exiting.
zoterofast-ZF-Server-1  | sys:1: RuntimeWarning: coroutine 'connect' was never awaited
zoterofast-ZF-Server-1  | RuntimeWarning: Enable tracemalloc to get the object allocation traceback

pyproject.toml deps

[tool.poetry.dependencies]
python = "^3.10"
python-dotenv = "^0.20.0"
asyncio = "^3.4.3"

poethepoet = "^0.13.1"

loguru = "^0.6.0"
sentry-sdk = "^1.5.8"

fastapi = "^0.75.0"
typer = "^0.4.0"
uvicorn = "^0.17.6"
gunicorn = "^20.1.0"


fastapi-sqla = "^2.3.1"
fastapi-offline = "^1.3.2"

sqlalchemy = "^1.4.32"
psycopg2-binary = "^2.9.3"
asyncpg = "^0.25.0"

pydantic = {extras = ["dotenv"], version = "^1.9.0"}
ujson = "^5.1.0"
email-validator = "^1.1.3"

alembic = "^1.7.7"


requests = "^2.27.1"
Jinja2 = "^3.1.1"
python-multipart = "^0.0.5"
itsdangerous = "^2.1.2"
PyYAML = "^6.0"

Action Required: Fix Renovate Configuration

There is an error with this repository's Renovate configuration that needs to be fixed. As a precaution, Renovate will stop PRs until it is resolved.

Error type: Cannot find preset's package (local>dialoguemd/renovate-config)

SQLAlchemy unable to find env variable with lowercase letters

I make a simple setup with sqlite3, where I have env SQLALCHEMY_URL = sqlite:///main.sqlite, however during runtime I get an exception:

Traceback (most recent call last):
  File "C:\Python\Python38\lib\site-packages\starlette\routing.py", line 621, in lifespan
    async with self.lifespan_context(app):
  File "C:\Python\Python38\lib\site-packages\starlette\routing.py", line 518, in __aenter__
    await self._router.startup()
  File "C:\Python\Python38\lib\site-packages\starlette\routing.py", line 600, in startup
    handler()
  File "C:\Python\Python38\lib\site-packages\fastapi_sqla\__init__.py", line 66, in startup
    engine = engine_from_config(os.environ, prefix="sqlalchemy_")
  File "C:\Python\Python38\lib\site-packages\sqlalchemy\engine\create.py", line 754, in engine_from_config
    url = options.pop("url")
KeyError: 'url'

Digging a bit deeper I found that in SQLAlchemy engine_from_config use following code to get variable:

options = dict(
    (key[len(prefix) :], configuration[key])
    for key in configuration
    if key.startswith(prefix)
)

where configuration is os.environ and prefix is sqlalchemy_, but when key is being iterated, it get capital SQLALCHEMY_URL which is not match for key.startswith, therefore before passing key it should be lowercased.

This line of code:

engine = engine_from_config(os.environ, prefix="sqlalchemy_")

can be replaced to this to fix this issue:

engine = engine_from_config(dict((k.lower(), v) for k, v in os.environ.items() if k.lower().startswith("sqlalchemy_")), prefix="sqlalchemy_")

Help with alembic

Hello,

I'm replacing all the database access implementations I used previously with Fatsapi-sqla, and now I need to integrate it with alembic.

What would be the best way to use Fastapi-sqla in alembic?

Do you have any examples of a standard implementation?

I'm defining my models as follows:

from datetime import datetime
from fastapi_sqla import Base
from sqlalchemy.dialects.postgresql.json import JSONB
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.sql import expression
from sqlalchemy import (
    Column,
    Integer,
    String,
    Boolean,
    UniqueConstraint,
    PrimaryKeyConstraint,
    DateTime,
    text,
)
import cuid

class EntityModel(Base):
    __tablename__ = "entities"
    __table_args__ = (
        PrimaryKeyConstraint("id", name="entities_pkey"),
        UniqueConstraint("business_id", name="entities_business_id_ukey"),
    )

    id = Column(String(25), default=cuid.cuid, nullable=False, primary_key=True)
    owner = Column(String(25), nullable=False)
    name = Column(String(128), nullable=False)
    email = Column(String(255), nullable=False)
    active = Column(
        Boolean, nullable=False, default=False, server_default=expression.false()
    )
    status = Column(Integer, nullable=False, default=0, server_default=text("0"))
    country = Column(String(3), default="BRA", nullable=False)
    business_id = Column(String(25), nullable=False)
    external_id = Column(String(128), nullable=True)
    partner_id = Column(String(25), nullable=True)
    profile = Column(MutableDict.as_mutable(JSONB))
    created_at = Column(
        DateTime(timezone=False), default=datetime.utcnow, nullable=False
    )
    updated_at = Column(
        DateTime(timezone=False),
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
        nullable=False,
    )

    def __init__(self, *args, **kwargs):
        if "id" not in kwargs:
            kwargs["id"] = cuid.cuid()
        super(EntityModel, self).__init__(*args, **kwargs)

Is this the best way to do it ?

Thanks

sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_only() here

Hello,

Thanks for your module for FastApi, of all the solutions for accessing a Postgres DB that I've researched so far I think it's the closest to what I think is the best solution.

But, now on to my problem: I created a local development test project that uses Uvicorn to run my FastApi app and when it starts up the following error occurs:

sqlalchemy.exc.MissingGreenlet: greenlet_spawn has not been called; can't call await_only() here. Was IO attempted in an unexpected place? (Background on this error at: https://sqlalche.me/e/14/xd2s)

ERROR:    Application startup failed. Exiting.
sys:1: RuntimeWarning: coroutine 'connect' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

I run my program like this:

python3 ./daschat_integration_mautic/uvicorn_wsgi2.py

And the /daschat_integration_mautic/uvicorn_wsgi2.py file has the following content:

# -*- coding: utf-8 -*-
"""Application Web Server Gateway Interface - uvicorn."""
import os
import sys
import uvicorn
from loguru import logger

from daschat_integration_mautic.config.settings import settings

from daschat_integration_mautic.utils.uvicorn_loguru_integration import (
    run_uvicorn_loguru,
)


def main():
    run_uvicorn_loguru(
        uvicorn.Config(
            "daschat_integration_mautic.app.teste1:app",
            host="0.0.0.0",
            port=8000,
            log_level=settings.LOG_LEVEL,
            reload=True,
            workers=1,
        )
    )

if __name__ == "__main__":
    main()

And the app daschat_integration_mautic.app.teste1:app that is called by Uvicorn has the following content:

from fastapi import APIRouter, Depends
from fastapi_sqla import Session
from fastapi_sqla.asyncio_support import AsyncSession

import fastapi_sqla
from fastapi import FastAPI

app = FastAPI()
fastapi_sqla.setup(app)

router = APIRouter()


@router.get("/example")
def example(session: Session = Depends()):
    return session.execute("SELECT now()").scalar()


@router.get("/async_example")
async def async_example(session: AsyncSession = Depends()):
    return await session.scalar("SELECT now()")

Do you have any idea what the problem is?

thanks

Pytest override session dependency

When using the fastapi.testclient.TestClient with pytest, the app doesn't use the session fixture resulting in writes to the database.

To solve this problem, pytest needs to tell the app to use the same session as used for testing. This can be done by using dependency_overrides.

@pytest.fixture(autouse=True)
def _app_session(app, session):
    app.dependency_overrides[fastapi_sqla.sqla.default_session_dep] = lambda: session

At least, this is how I solved the problem. Is there a beter approach or should I send a PR to document this?

Async dependencies of the pytest plugin are required even when you only use the sync variants

In my project I chose to use pyscopg2 instead of asyncpg. I did not have any issues with this until I got to the tests. I had to install pytest-asyncio and asyncpg to please the pytest fixtures.

Unfortunately even after installing the dependencies I still get an error:

    @fixture(scope="session")
    def asyncpg_url(db_url):
>       scheme, parts = db_url.split(":")
E       ValueError: too many values to unpack (expected 2)

My connection string looks like postgresql+psycopg2://app:*****@localhost:4200/app_test, so there's more than 2 parts created by splitting on ':'.

I found a work-around in the meantime, it's to override the patch_async_sessionmaker fixture which has autouse=True, with this

@fixture(autouse=True)
async def patch_async_sessionmaker():
    pass

It doesn't cause me trouble to disable it because I only use the sync version of the fixtures.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.